DBZ-7169 - populating processed transaction cache only if lobs enabled - in sync with memory event processor

This commit is contained in:
Jiri Kulhanek 2023-11-23 20:15:40 +01:00 committed by Chris Cranford
parent f76a78ee2f
commit 71d80af0b7

View File

@ -254,7 +254,9 @@ public LogMinerEvent next() {
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
// cache recently committed transactions by transaction id
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
}
}
@Override
@ -264,7 +266,9 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
}
@Override
@ -346,8 +350,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
purgeCache(minCacheScn);
}
else {
getProcessedTransactionsCache().clear();
getSchemaChangesCache().clear();
getSchemaChangesCache().entrySet().removeIf(e -> true);
}
if (getConfig().isLobEnabled()) {
@ -357,6 +360,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
}
else {
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}