diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index ce65ae9cb..9d8ca0afc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -384,6 +384,7 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr final T transaction = getAndRemoveTransactionFromCache(transactionId); if (transaction == null) { + handleCommitNotFoundInBuffer(row); LOGGER.trace("Transaction {} not found, commit skipped.", transactionId); return; } @@ -519,6 +520,26 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted metrics.setLastCommitDuration(Duration.between(start, Instant.now())); } + /** + * Allow for post-processing of a transaction commit in the stream that was not found in the + * transaction buffer, perhaps because it aged out due to retention policies. + * + * @param row the result set row + */ + protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) { + // no-op + } + + /** + * Allow for post-processing of a transaction rollback in the stream that was not found in + * the transaction buffer, perhaps because it aged out due to retention policies. + * + * @param row the result set row + */ + protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) { + // no-op + } + /** * Gets a transaction instance from the transaction cache while also removing its cache entry. * @@ -586,6 +607,7 @@ protected void handleRollback(LogMinerEventRow row) { } else { LOGGER.trace("Could not rollback transaction {}, was not found in cache.", row.getTransactionId()); + handleRollbackNotFoundInBuffer(row); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index 822639aa5..bc22c7863 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -203,6 +203,7 @@ protected Iterator getTransactionEventIterator(MemoryTransaction @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { + abandonedTransactionsCache.remove(transactionId); if (getConfig().isLobEnabled()) { // cache recently committed transactions by transaction id recentlyProcessedTransactionsCache.put(transactionId, commitScn); @@ -226,6 +227,22 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept } } + @Override + protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) { + // In the event the transaction was prematurely removed due to retention policy, when we do find + // the transaction's commit in the logs in the future, we should remove the entry if it exists + // to avoid any potential memory-leak with the cache. + abandonedTransactionsCache.remove(row.getTransactionId()); + } + + @Override + protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) { + // In the event the transaction was prematurely removed due to retention policy, when we do find + // the transaction's rollback in the logs in the future, we should remove the entry if it exists + // to avoid any potential memory-leak with the cache. + abandonedTransactionsCache.remove(row.getTransactionId()); + } + @Override protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { if (abandonedTransactionsCache.contains(transactionId)) {