diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index dcff1be38..29f8d6eb7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -254,7 +254,9 @@ public LogMinerEvent next() { @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { // cache recently committed transactions by transaction id - getProcessedTransactionsCache().put(transactionId, commitScn.toString()); + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, commitScn.toString()); + } } @Override @@ -264,7 +266,9 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn removeEventsWithTransaction(transaction); getTransactionCache().remove(transactionId); } - getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); + } } @Override @@ -346,8 +350,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter purgeCache(minCacheScn); } else { - getProcessedTransactionsCache().clear(); - getSchemaChangesCache().clear(); + getSchemaChangesCache().entrySet().removeIf(e -> true); } if (getConfig().isLobEnabled()) { @@ -357,6 +360,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } else { if (!minCacheScn.isNull()) { + getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1))); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); }