DBZ-6275 Correctly sync Infinispan cache on event counter reset
This commit is contained in:
parent
6419c0e704
commit
3bb82d6b73
@ -343,17 +343,21 @@ protected void handleMissingScn(LogMinerEventRow row) {
|
|||||||
*/
|
*/
|
||||||
protected void handleStart(LogMinerEventRow row) {
|
protected void handleStart(LogMinerEventRow row) {
|
||||||
final String transactionId = row.getTransactionId();
|
final String transactionId = row.getTransactionId();
|
||||||
final AbstractTransaction transaction = getTransactionCache().get(transactionId);
|
final T transaction = getTransactionCache().get(transactionId);
|
||||||
if (transaction == null && !isRecentlyProcessed(transactionId)) {
|
if (transaction == null && !isRecentlyProcessed(transactionId)) {
|
||||||
getTransactionCache().put(transactionId, createTransaction(row));
|
getTransactionCache().put(transactionId, createTransaction(row));
|
||||||
metrics.setActiveTransactions(getTransactionCache().size());
|
metrics.setActiveTransactions(getTransactionCache().size());
|
||||||
}
|
}
|
||||||
else if (transaction != null && !isRecentlyProcessed(transactionId)) {
|
else if (transaction != null && !isRecentlyProcessed(transactionId)) {
|
||||||
LOGGER.trace("Transaction {} is not yet committed and START event detected.", transactionId);
|
LOGGER.trace("Transaction {} is not yet committed and START event detected.", transactionId);
|
||||||
transaction.start();
|
resetTransactionToStart(transaction);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void resetTransactionToStart(T transaction) {
|
||||||
|
transaction.start();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle processing a LogMinerEventRow for a {@code COMMIT} event.
|
* Handle processing a LogMinerEventRow for a {@code COMMIT} event.
|
||||||
*
|
*
|
||||||
|
@ -234,6 +234,13 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn
|
|||||||
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
|
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void resetTransactionToStart(InfinispanTransaction transaction) {
|
||||||
|
super.resetTransactionToStart(transaction);
|
||||||
|
// Flush the change created by the super class to the transaction cache
|
||||||
|
getTransactionCache().put(transaction.getTransactionId(), transaction);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
|
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
|
||||||
super.handleSchemaChange(row);
|
super.handleSchemaChange(row);
|
||||||
|
Loading…
Reference in New Issue
Block a user