From 18c672ab6ff5087e8a1ee91a2a6ac3ea9927723f Mon Sep 17 00:00:00 2001 From: Jiri Kulhanek Date: Wed, 29 Nov 2023 20:50:01 +0100 Subject: [PATCH] DBZ-7192: move abandonedTransactionsCache to common abstract layer also makes sure that events are correctly removed in ISPN event processor after transaction is abandoned. Also fixes scenario with event number based threshold abandonment in ISPN - events comming afterwards would be still processed. --- .../AbstractLogMinerEventProcessor.java | 41 ++++++++++++++----- .../processor/LogMinerEventProcessor.java | 1 - ...tractInfinispanLogMinerEventProcessor.java | 18 ++++---- .../memory/MemoryLogMinerEventProcessor.java | 39 ++---------------- 4 files changed, 43 insertions(+), 56 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 1508f09ff..3aa0db7c2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -14,9 +14,11 @@ import java.time.Instant; import java.time.ZoneOffset; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -102,6 +104,9 @@ public abstract class AbstractLogMinerEventProcessor abandonedTransactionsCache = new HashSet<>(); + public AbstractLogMinerEventProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema, @@ -126,6 +131,11 @@ public AbstractLogMinerEventProcessor(ChangeEventSourceContext context, this.jdbcConnection = jdbcConnection; } + protected Set getAbandonedTransactionsCache() { + return abandonedTransactionsCache; + } + + protected OracleConnectorConfig getConfig() { return connectorConfig; } @@ -450,7 +460,7 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr + "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn); } - removeTransactionAndEventsFromCache(transaction); + cleanupAfterTransactionRemovedFromCache(transaction, false); metrics.setActiveTransactionCount(getTransactionCache().size()); return; } @@ -559,7 +569,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted metrics.calculateLagFromSource(row.getChangeTime()); finalizeTransactionCommit(transactionId, commitScn); - removeTransactionAndEventsFromCache(transaction); + cleanupAfterTransactionRemovedFromCache(transaction, false); metrics.incrementCommittedTransactionCount(); metrics.setActiveTransactionCount(getTransactionCache().size()); @@ -575,7 +585,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted * @param row the result set row */ protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) { - // no-op + // In the event the transaction was prematurely removed due to retention policy, when we do find + // the transaction's commit in the logs in the future, we should remove the entry if it exists + // to avoid any potential memory-leak with the cache. + abandonedTransactionsCache.remove(row.getTransactionId()); } /** @@ -585,7 +598,10 @@ protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) { * @param row the result set row */ protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) { - // no-op + // In the event the transaction was prematurely removed due to retention policy, when we do find + // the transaction's rollback in the logs in the future, we should remove the entry if it exists + // to avoid any potential memory-leak with the cache. + abandonedTransactionsCache.remove(row.getTransactionId()); } /** @@ -597,11 +613,18 @@ protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) { protected abstract T getAndRemoveTransactionFromCache(String transactionId); /** - * Removes the transaction and all its associated event entries from the connector's caches. + * Removes the items associated with the transaction (e.g. events if they are stored independently. * * @param transaction the transaction instance, should never be {@code null} + * @param isAbandoned whether the removal is because transaction is being abandoned */ - protected abstract void removeTransactionAndEventsFromCache(T transaction); + protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) { + if (isAbandoned) { + abandonedTransactionsCache.remove(transaction.getTransactionId()); + } else { + abandonedTransactionsCache.add(transaction.getTransactionId()); + } + } /** * Get an iterator over the events that are part of the specified transaction. @@ -1388,6 +1411,7 @@ protected void abandonTransactionOverEventThreshold(T transaction) { LOGGER.warn("Transaction {} exceeds maximum allowed number of events, transaction will be abandoned.", transaction.getTransactionId()); metrics.incrementWarningCount(); getAndRemoveTransactionFromCache(transaction.getTransactionId()); + abandonedTransactionsCache.add(transaction.getTransactionId()); metrics.incrementOversizedTransactionCount(); } @@ -1412,9 +1436,7 @@ public void abandonTransactions(Duration retention) throws InterruptedException entry.getKey(), entry.getValue().getStartScn(), entry.getValue().getChangeTime(), entry.getValue().getRedoThreadId(), entry.getValue().getNumberOfEvents()); - if (getAbandonedTransactionsCache() != null) { - getAbandonedTransactionsCache().add(entry.getKey()); - } + cleanupAfterTransactionRemovedFromCache(entry.getValue(), true); iterator.remove(); metrics.addAbandonedTransactionId(entry.getKey()); @@ -1422,7 +1444,6 @@ public void abandonTransactions(Duration retention) throws InterruptedException } } - // Update the oldest scn metric are transaction abandonment final Optional oldestTransaction = getOldestTransactionInCache(); if (oldestTransaction.isPresent()) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerEventProcessor.java index e191a4cc3..84f3eba9c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerEventProcessor.java @@ -33,5 +33,4 @@ public interface LogMinerEventProcessor extends AutoCloseable { */ void abandonTransactions(Duration retention) throws InterruptedException; - Set getAbandonedTransactionsCache(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index 4f435fe72..b965cce67 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -205,9 +204,9 @@ protected InfinispanTransaction getAndRemoveTransactionFromCache(String transact } @Override - protected void removeTransactionAndEventsFromCache(InfinispanTransaction transaction) { + protected void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction transaction, boolean isAbandoned) { + super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned); removeEventsWithTransaction(transaction); - getTransactionCache().remove(transaction.getTransactionId()); } @Override @@ -247,6 +246,7 @@ public LogMinerEvent next() { @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { + getAbandonedTransactionsCache().remove(transactionId); // cache recently committed transactions by transaction id if (getConfig().isLobEnabled()) { getProcessedTransactionsCache().put(transactionId, commitScn.toString()); @@ -260,6 +260,7 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn removeEventsWithTransaction(transaction); getTransactionCache().remove(transactionId); } + getAbandonedTransactionsCache().remove(transactionId); if (getConfig().isLobEnabled()) { getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); } @@ -282,6 +283,10 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept @Override protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { + if (getAbandonedTransactionsCache().contains(transactionId)) { + LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId); + return; + } if (!isRecentlyProcessed(transactionId)) { InfinispanTransaction transaction = getTransactionCache().get(transactionId); if (transaction == null) { @@ -341,7 +346,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } if (!minCacheScn.isNull()) { - abandonTransactions(getConfig().getLogMiningTransactionRetention()); + abandonTransactions(getConfig().getLogMiningTransactionRetention()); purgeCache(minCacheScn); } else { @@ -423,11 +428,6 @@ private void removeEventsWithTransaction(InfinispanTransaction transaction) { inMemoryPendingTransactionsCache.remove(transaction.getTransactionId()); } - @Override - public Set getAbandonedTransactionsCache() { - return null; - } - /** * A comparator that guarantees that the sort order applied to event keys is such that * they are treated as numerical values, sorted as numeric values rather than strings diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index 676ecd987..15159aabe 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -59,8 +59,6 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor private final Map recentlyProcessedTransactionsCache = new HashMap<>(); private final Set schemaChangesCache = new HashSet<>(); - private final Set abandonedTransactionsCache = new HashSet<>(); - public MemoryLogMinerEventProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, OracleConnection jdbcConnection, @@ -144,11 +142,6 @@ protected MemoryTransaction getAndRemoveTransactionFromCache(String transactionI return getTransactionCache().remove(transactionId); } - @Override - protected void removeTransactionAndEventsFromCache(MemoryTransaction transaction) { - abandonedTransactionsCache.remove(transaction.getTransactionId()); - } - @Override protected Iterator getTransactionEventIterator(MemoryTransaction transaction) { return transaction.getEvents().iterator(); @@ -156,7 +149,7 @@ protected Iterator getTransactionEventIterator(MemoryTransaction @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { - abandonedTransactionsCache.remove(transactionId); + getAbandonedTransactionsCache().remove(transactionId); if (getConfig().isLobEnabled()) { // cache recently committed transactions by transaction id recentlyProcessedTransactionsCache.put(transactionId, commitScn); @@ -166,7 +159,7 @@ protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { @Override protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) { transactionCache.remove(transactionId); - abandonedTransactionsCache.remove(transactionId); + getAbandonedTransactionsCache().remove(transactionId); if (getConfig().isLobEnabled()) { recentlyProcessedTransactionsCache.put(transactionId, rollbackScn); } @@ -186,25 +179,9 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept } } - @Override - protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) { - // In the event the transaction was prematurely removed due to retention policy, when we do find - // the transaction's commit in the logs in the future, we should remove the entry if it exists - // to avoid any potential memory-leak with the cache. - abandonedTransactionsCache.remove(row.getTransactionId()); - } - - @Override - protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) { - // In the event the transaction was prematurely removed due to retention policy, when we do find - // the transaction's rollback in the logs in the future, we should remove the entry if it exists - // to avoid any potential memory-leak with the cache. - abandonedTransactionsCache.remove(row.getTransactionId()); - } - @Override protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { - if (abandonedTransactionsCache.contains(transactionId)) { + if (getAbandonedTransactionsCache().contains(transactionId)) { LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId); return; } @@ -294,12 +271,6 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } } - @Override - protected void abandonTransactionOverEventThreshold(MemoryTransaction transaction) { - super.abandonTransactionOverEventThreshold(transaction); - abandonedTransactionsCache.add(transaction.getTransactionId()); - } - @Override protected Scn getTransactionCacheMinimumScn() { return transactionCache.values().stream() @@ -331,8 +302,4 @@ else if (comparison == 0) { return Optional.ofNullable(transaction); } - public Set getAbandonedTransactionsCache() { - return abandonedTransactionsCache; - } - }