DBZ-7192: move abandonedTransactionsCache to common abstract layer

also makes sure that events are correctly removed in ISPN event processor after transaction is abandoned.
Also fixes scenario with event number based threshold abandonment in ISPN - events comming afterwards would be still processed.
This commit is contained in:
Jiri Kulhanek 2023-11-29 20:50:01 +01:00 committed by Chris Cranford
parent c0f141f51f
commit 18c672ab6f
4 changed files with 43 additions and 56 deletions

View File

@ -14,9 +14,11 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -102,6 +104,9 @@ public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransacti
private Scn lastProcessedScn = Scn.NULL;
private boolean sequenceUnavailable = false;
private final Set<String> abandonedTransactionsCache = new HashSet<>();
public AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleDatabaseSchema schema,
@ -126,6 +131,11 @@ public AbstractLogMinerEventProcessor(ChangeEventSourceContext context,
this.jdbcConnection = jdbcConnection;
}
protected Set<String> getAbandonedTransactionsCache() {
return abandonedTransactionsCache;
}
protected OracleConnectorConfig getConfig() {
return connectorConfig;
}
@ -450,7 +460,7 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr
+ "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.",
transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn);
}
removeTransactionAndEventsFromCache(transaction);
cleanupAfterTransactionRemovedFromCache(transaction, false);
metrics.setActiveTransactionCount(getTransactionCache().size());
return;
}
@ -559,7 +569,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
metrics.calculateLagFromSource(row.getChangeTime());
finalizeTransactionCommit(transactionId, commitScn);
removeTransactionAndEventsFromCache(transaction);
cleanupAfterTransactionRemovedFromCache(transaction, false);
metrics.incrementCommittedTransactionCount();
metrics.setActiveTransactionCount(getTransactionCache().size());
@ -575,7 +585,10 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted
* @param row the result set row
*/
protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
// no-op
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's commit in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
/**
@ -585,7 +598,10 @@ protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
* @param row the result set row
*/
protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
// no-op
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's rollback in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
/**
@ -597,11 +613,18 @@ protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
protected abstract T getAndRemoveTransactionFromCache(String transactionId);
/**
* Removes the transaction and all its associated event entries from the connector's caches.
* Removes the items associated with the transaction (e.g. events if they are stored independently.
*
* @param transaction the transaction instance, should never be {@code null}
* @param isAbandoned whether the removal is because transaction is being abandoned
*/
protected abstract void removeTransactionAndEventsFromCache(T transaction);
protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) {
if (isAbandoned) {
abandonedTransactionsCache.remove(transaction.getTransactionId());
} else {
abandonedTransactionsCache.add(transaction.getTransactionId());
}
}
/**
* Get an iterator over the events that are part of the specified transaction.
@ -1388,6 +1411,7 @@ protected void abandonTransactionOverEventThreshold(T transaction) {
LOGGER.warn("Transaction {} exceeds maximum allowed number of events, transaction will be abandoned.", transaction.getTransactionId());
metrics.incrementWarningCount();
getAndRemoveTransactionFromCache(transaction.getTransactionId());
abandonedTransactionsCache.add(transaction.getTransactionId());
metrics.incrementOversizedTransactionCount();
}
@ -1412,9 +1436,7 @@ public void abandonTransactions(Duration retention) throws InterruptedException
entry.getKey(), entry.getValue().getStartScn(), entry.getValue().getChangeTime(),
entry.getValue().getRedoThreadId(), entry.getValue().getNumberOfEvents());
if (getAbandonedTransactionsCache() != null) {
getAbandonedTransactionsCache().add(entry.getKey());
}
cleanupAfterTransactionRemovedFromCache(entry.getValue(), true);
iterator.remove();
metrics.addAbandonedTransactionId(entry.getKey());
@ -1422,7 +1444,6 @@ public void abandonTransactions(Duration retention) throws InterruptedException
}
}
// Update the oldest scn metric are transaction abandonment
final Optional<T> oldestTransaction = getOldestTransactionInCache();
if (oldestTransaction.isPresent()) {

View File

@ -33,5 +33,4 @@ public interface LogMinerEventProcessor extends AutoCloseable {
*/
void abandonTransactions(Duration retention) throws InterruptedException;
Set<String> getAbandonedTransactionsCache();
}

View File

@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -205,9 +204,9 @@ protected InfinispanTransaction getAndRemoveTransactionFromCache(String transact
}
@Override
protected void removeTransactionAndEventsFromCache(InfinispanTransaction transaction) {
protected void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction transaction, boolean isAbandoned) {
super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned);
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transaction.getTransactionId());
}
@Override
@ -247,6 +246,7 @@ public LogMinerEvent next() {
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
getAbandonedTransactionsCache().remove(transactionId);
// cache recently committed transactions by transaction id
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, commitScn.toString());
@ -260,6 +260,7 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn
removeEventsWithTransaction(transaction);
getTransactionCache().remove(transactionId);
}
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
getProcessedTransactionsCache().put(transactionId, rollbackScn.toString());
}
@ -282,6 +283,10 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
if (!isRecentlyProcessed(transactionId)) {
InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
@ -423,11 +428,6 @@ private void removeEventsWithTransaction(InfinispanTransaction transaction) {
inMemoryPendingTransactionsCache.remove(transaction.getTransactionId());
}
@Override
public Set<String> getAbandonedTransactionsCache() {
return null;
}
/**
* A comparator that guarantees that the sort order applied to event keys is such that
* they are treated as numerical values, sorted as numeric values rather than strings

View File

@ -59,8 +59,6 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor
private final Map<String, Scn> recentlyProcessedTransactionsCache = new HashMap<>();
private final Set<Scn> schemaChangesCache = new HashSet<>();
private final Set<String> abandonedTransactionsCache = new HashSet<>();
public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
@ -144,11 +142,6 @@ protected MemoryTransaction getAndRemoveTransactionFromCache(String transactionI
return getTransactionCache().remove(transactionId);
}
@Override
protected void removeTransactionAndEventsFromCache(MemoryTransaction transaction) {
abandonedTransactionsCache.remove(transaction.getTransactionId());
}
@Override
protected Iterator<LogMinerEvent> getTransactionEventIterator(MemoryTransaction transaction) {
return transaction.getEvents().iterator();
@ -156,7 +149,7 @@ protected Iterator<LogMinerEvent> getTransactionEventIterator(MemoryTransaction
@Override
protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
abandonedTransactionsCache.remove(transactionId);
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
// cache recently committed transactions by transaction id
recentlyProcessedTransactionsCache.put(transactionId, commitScn);
@ -166,7 +159,7 @@ protected void finalizeTransactionCommit(String transactionId, Scn commitScn) {
@Override
protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) {
transactionCache.remove(transactionId);
abandonedTransactionsCache.remove(transactionId);
getAbandonedTransactionsCache().remove(transactionId);
if (getConfig().isLobEnabled()) {
recentlyProcessedTransactionsCache.put(transactionId, rollbackScn);
}
@ -186,25 +179,9 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
}
}
@Override
protected void handleCommitNotFoundInBuffer(LogMinerEventRow row) {
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's commit in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
@Override
protected void handleRollbackNotFoundInBuffer(LogMinerEventRow row) {
// In the event the transaction was prematurely removed due to retention policy, when we do find
// the transaction's rollback in the logs in the future, we should remove the entry if it exists
// to avoid any potential memory-leak with the cache.
abandonedTransactionsCache.remove(row.getTransactionId());
}
@Override
protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier<LogMinerEvent> eventSupplier) {
if (abandonedTransactionsCache.contains(transactionId)) {
if (getAbandonedTransactionsCache().contains(transactionId)) {
LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId);
return;
}
@ -294,12 +271,6 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
}
}
@Override
protected void abandonTransactionOverEventThreshold(MemoryTransaction transaction) {
super.abandonTransactionOverEventThreshold(transaction);
abandonedTransactionsCache.add(transaction.getTransactionId());
}
@Override
protected Scn getTransactionCacheMinimumScn() {
return transactionCache.values().stream()
@ -331,8 +302,4 @@ else if (comparison == 0) {
return Optional.ofNullable(transaction);
}
public Set<String> getAbandonedTransactionsCache() {
return abandonedTransactionsCache;
}
}