From 71d80af0b71477bc82915e93c508120ad8c9c4b4 Mon Sep 17 00:00:00 2001 From: Jiri Kulhanek Date: Thu, 23 Nov 2023 20:15:40 +0100 Subject: [PATCH] DBZ-7169 - populating processed transaction cache only if lobs enabled - in sync with memory event processor --- .../AbstractInfinispanLogMinerEventProcessor.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index dcff1be38..29f8d6eb7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -254,7 +254,9 @@ public LogMinerEvent next() { @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { // cache recently committed transactions by transaction id - getProcessedTransactionsCache().put(transactionId, commitScn.toString()); + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, commitScn.toString()); + } } @Override @@ -264,7 +266,9 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn removeEventsWithTransaction(transaction); getTransactionCache().remove(transactionId); } - getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); + } } @Override @@ -346,8 +350,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter purgeCache(minCacheScn); } else { - getProcessedTransactionsCache().clear(); - getSchemaChangesCache().clear(); + getSchemaChangesCache().entrySet().removeIf(e -> true); } if (getConfig().isLobEnabled()) { @@ -357,6 +360,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } else { if (!minCacheScn.isNull()) { + getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1))); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); }