DBZ-6355 Fix memory-leak with heap-based abandoned transaction cache

This commit is contained in:
Chris Cranford 2023-04-25 00:07:23 -04:00 committed by Chris Cranford
parent 8ef9a6f015
commit 2ee3249d84
2 changed files with 39 additions and 0 deletions

View File

@ -384,6 +384,7 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr
final T transaction = getAndRemoveTransactionFromCache(transactionId);
if (transaction == null) {
handleCommitNotFoundInBuffer(row);
LOGGER.trace("Transaction {} not found, commit skipped.", transactionId);
return;
}
@ -519,6 +520,26 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
metrics.setLastCommitDuration(Duration.between(start, Instant.now()));
}
/**
* Allow for post-processing of a transaction commit in the stream that was not found in the
* transaction buffer, perhaps because it aged out due to retention policies.
*
* @param row the result set row
*/
protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
// no-op
}
/**
* Allow for post-processing of a transaction rollback in the stream that was not found in
* the transaction buffer, perhaps because it aged out due to retention policies.
*
* @param row the result set row
*/
protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
// no-op
}
/**
* Gets a transaction instance from the transaction cache while also removing its cache entry.
*
@ -586,6 +607,7 @@ protected void handleRollback(LogMinerEventRow row) {
}
else {
LOGGER.trace("Could not rollback transaction {}, was not found in cache.", row.getTransactionId());
handleRollbackNotFoundInBuffer(row);
}
}

View File

@ -203,6 +203,7 @@ protected Iterator<LogMinerEvent> getTransactionEventIterator(MemoryTransaction
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
abandonedTransactionsCache.remove(transactionId);
if (getConfig().isLobEnabled()) {
// cache recently committed transactions by transaction id
recentlyProcessedTransactionsCache.put(transactionId, commitScn);
@ -226,6 +227,22 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
}
}
@Override
protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's commit in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
@Override
protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's rollback in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (abandonedTransactionsCache.contains(transactionId)) {