diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java index a9c018f42..87b894c6e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java @@ -193,17 +193,6 @@ protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) { @Override protected T getAndRemoveTransactionFromCache(String transactionId) { - // // todo: Infinispan bug? - // // When interacting with ISPN with a remote server configuration, the expected - // // behavior was that calling the remove method on the cache would return the - // // existing entry and remove it from the cache; however it always returned null. - // // - // // For now, we're going to use get to obtain the value and then remove it after-the-fact. - // final T transaction = getTransactionCache().get(transactionId); - // if (transaction != null) { - // getTransactionCache().remove(transactionId); - // } - // return transaction; return getTransactionCache().remove(transactionId); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index e3c2ef282..f86e55b51 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -5,37 +5,22 @@ */ package io.debezium.connector.oracle.logminer.processor.memory; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; -import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; -import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.LogMinerCache; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; -import io.debezium.util.Loggings; /** * A {@link LogMinerEventProcessor} that uses the JVM heap to store events as they're being @@ -43,25 +28,15 @@ * * @author Chris Cranford */ -// TODO: can this be a caching impl now as well, just with map -public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor { +public class MemoryLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class); - private final EventDispatcher dispatcher; - private final OraclePartition partition; - private final OracleOffsetContext offsetContext; - private final LogMinerStreamingChangeEventSourceMetrics metrics; - - private final HashMap transactionHashMap = new HashMap<>(); /** * Cache of transactions, keyed based on the transaction's unique identifier */ - private final LogMinerCache transactionCache = new MemoryBasedLogMinerCache<>(transactionHashMap); - /** - * Cache of processed transactions (committed or rolled back), keyed based on the transaction's unique identifier. - */ - private final Map recentlyProcessedTransactionsCache = new HashMap<>(); - private final Set schemaChangesCache = new HashSet<>(); + private final LogMinerCache transactionCache = new MemoryBasedLogMinerCache<>(new HashMap<>()); + private final LogMinerCache eventCache = new MemoryBasedLogMinerCache<>(new HashMap<>()); + private final LogMinerCache schemaCache = new MemoryBasedLogMinerCache<>(new HashMap<>()); + private final LogMinerCache processedTransactionsCache = new MemoryBasedLogMinerCache<>(new HashMap<>()); public MemoryLogMinerEventProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, @@ -71,16 +46,7 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, LogMinerStreamingChangeEventSourceMetrics metrics) { - super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection); - this.dispatcher = dispatcher; - this.partition = partition; - this.offsetContext = offsetContext; - this.metrics = (LogMinerStreamingChangeEventSourceMetrics) metrics; - } - - @Override - public LogMinerCache getTransactionCache() { - return transactionCache; + super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics); } @Override @@ -89,238 +55,28 @@ protected MemoryTransaction createTransaction(LogMinerEventRow row) { } @Override - protected void removeEventWithRowId(LogMinerEventRow row) { - MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId()); - if (transaction == null) { - if (isTransactionIdWithNoSequence(row.getTransactionId())) { - // This means that Oracle LogMiner found an event that should be undone but its corresponding - // undo entry was read in a prior mining session and the transaction's sequence could not be - // resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT. - final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); - LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); - LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); + public LogMinerCache getTransactionCache() { + return transactionCache; + } - getTransactionCache().forEach((transactionKey, v) -> { - if (transactionKey.startsWith(transactionPrefix)) { - MemoryTransaction found = getTransactionCache().get(transactionKey); - // TODO: isn't found the same as v? - if (found != v) { - LOGGER.warn("HOW DID THIS HAPPEN?"); - } + @Override + public LogMinerCache getEventCache() { - if (found != null && found.removeEventWithRowId(row.getRowId())) { - // We successfully found a transaction with the same XISUSN and XIDSLT and that - // transaction included a change for the specified row id. - Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' was applied to transaction '{}'", row.getTableId(), transactionKey); - } - } - }); + return eventCache; + } - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found", row.getTableId(), - row.getRowId()); - } - else if (!getConfig().isLobEnabled()) { - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(), - row.getTransactionId()); - } - } - else { - if (!transaction.removeEventWithRowId(row.getRowId())) { - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), - row.getRowId()); - } - } + @Override + public LogMinerCache getSchemaChangesCache() { + return schemaCache; + } + + @Override + public LogMinerCache getProcessedTransactionsCache() { + return processedTransactionsCache; } @Override public void close() throws Exception { - // close any resources used here } - @Override - protected boolean isRecentlyProcessed(String transactionId) { - return recentlyProcessedTransactionsCache.containsKey(transactionId); - } - - @Override - protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) { - return schemaChangesCache.contains(row.getScn()); - } - - @Override - protected MemoryTransaction getAndRemoveTransactionFromCache(String transactionId) { - return getTransactionCache().remove(transactionId); - } - - @Override - protected Iterator getTransactionEventIterator(MemoryTransaction transaction) { - return transaction.getEvents().iterator(); - } - - @Override - protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { - getAbandonedTransactionsCache().remove(transactionId); - if (getConfig().isLobEnabled()) { - // cache recently committed transactions by transaction id - recentlyProcessedTransactionsCache.put(transactionId, commitScn); - } - } - - @Override - protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) { - transactionCache.remove(transactionId); - getAbandonedTransactionsCache().remove(transactionId); - if (getConfig().isLobEnabled()) { - recentlyProcessedTransactionsCache.put(transactionId, rollbackScn); - } - } - - @Override - protected String getFirstActiveTransactionKey() { - AtomicReference result = new AtomicReference<>(); - transactionCache.keys(keys -> { - Iterator iterator = keys.iterator(); - if (iterator.hasNext()) { - result.set(iterator.next()); - } - }); - return result.get(); - } - - @Override - protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException { - super.handleSchemaChange(row); - if (row.getTableName() != null && getConfig().isLobEnabled()) { - schemaChangesCache.add(row.getScn()); - } - } - - @Override - protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { - if (getAbandonedTransactionsCache().contains(transactionId)) { - LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId); - return; - } - if (!isRecentlyProcessed(transactionId)) { - MemoryTransaction transaction = getTransactionCache().get(transactionId); - if (transaction == null) { - LOGGER.trace("Transaction {} not in cache for DML, creating.", transactionId); - transaction = createTransaction(row); - getTransactionCache().put(transactionId, transaction); - } - - if (isTransactionOverEventThreshold(transaction)) { - abandonTransactionOverEventThreshold(transaction); - return; - } - - int eventId = transaction.getNextEventId(); - if (transaction.getEvents().size() <= eventId) { - // Add new event at eventId offset - LOGGER.trace("Transaction {}, adding event reference at index {}", transactionId, eventId); - transaction.getEvents().add(eventSupplier.get()); - metrics.calculateLagFromSource(row.getChangeTime()); - } - - metrics.setActiveTransactionCount(getTransactionCache().size()); - } - else if (!getConfig().isLobEnabled()) { - // Explicitly only log this warning when LobEnabled is false because its commonplace for a - // transaction to be re-mined and therefore seen as already processed until the SCN low - // watermark is advanced after a long transaction is committed. - LOGGER.warn("Event for transaction {} has already been processed, skipped.", transactionId); - } - } - - @Override - protected int getTransactionEventCount(MemoryTransaction transaction) { - return transaction.getEvents().size(); - } - - @Override - protected PreparedStatement createQueryStatement() throws SQLException { - return jdbcConnection.connection().prepareStatement(getQueryString(), - ResultSet.TYPE_FORWARD_ONLY, - ResultSet.CONCUR_READ_ONLY, - ResultSet.HOLD_CURSORS_OVER_COMMIT); - } - - @Override - protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException { - if (getConfig().isLobEnabled()) { - if (transactionCache.isEmpty() && !maxCommittedScn.isNull()) { - offsetContext.setScn(maxCommittedScn); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - else { - abandonTransactions(getConfig().getLogMiningTransactionRetention()); - final Scn minStartScn = getTransactionCacheMinimumScn(); - if (!minStartScn.isNull()) { - recentlyProcessedTransactionsCache.entrySet().removeIf(entry -> entry.getValue().compareTo(minStartScn) < 0); - schemaChangesCache.removeIf(scn -> scn.compareTo(minStartScn) < 0); - offsetContext.setScn(minStartScn.subtract(Scn.valueOf(1))); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - } - return offsetContext.getScn(); - } - else { - if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) { - // If the last processed SCN is before the endScn we need to use the last processed SCN as the - // next starting point as the LGWR buffer didn't flush all entries from memory to disk yet. - endScn = getLastProcessedScn(); - } - - if (transactionCache.isEmpty()) { - offsetContext.setScn(endScn); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - else { - abandonTransactions(getConfig().getLogMiningTransactionRetention()); - final Scn minStartScn = getTransactionCacheMinimumScn(); - if (!minStartScn.isNull()) { - offsetContext.setScn(minStartScn.subtract(Scn.valueOf(1))); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - } - return endScn; - } - } - - @Override - protected Scn getTransactionCacheMinimumScn() { - AtomicReference result = new AtomicReference<>(); - transactionCache.values(stream -> { - result.set(stream.map(MemoryTransaction::getStartScn) - .min(Scn::compareTo) - .orElse(Scn.NULL)); - }); - return result.get(); - } - - // TODO: extend cache processor? - protected Optional getOldestTransactionInCache() { - - return getTransactionCache().streamAndReturn(entryStream -> entryStream - .map(LogMinerCache.Entry::getValue) - .min(this::compareTransactions)); - } - - protected MemoryTransaction compareOldest(MemoryTransaction first, MemoryTransaction second) { - int comparison = first.getStartScn().compareTo(second.getStartScn()); - if (comparison < 0) { - return second; - } - else if (comparison == 0) { - if (second.getChangeTime().isBefore(first.getChangeTime())) { - return second; - } - } - return first; - } - - protected int compareTransactions(MemoryTransaction first, MemoryTransaction second) { - return first.getStartScn().compareTo(second.getStartScn()); - - } }