From ff828a340e3d886fa103ce21aae70c5edf98f57e Mon Sep 17 00:00:00 2001 From: Jeremy Ford Date: Tue, 18 Jun 2024 19:26:09 -0400 Subject: [PATCH] DBZ-8054 Refactor caching into common base class Provide new LogMinerCache interface for swapping out alternative caches --- .../AbstractLogMinerEventProcessor.java | 128 +++-- ...nsactionCachingLogMinerEventProcessor.java | 446 ++++++++++++++++++ .../{infinispan => }/CacheProvider.java | 14 +- .../InMemoryPendingTransactionsCache.java | 10 +- .../logminer/processor/LogMinerCache.java | 104 ++++ .../logminer/processor/Transaction.java | 15 +- ...tractInfinispanLogMinerEventProcessor.java | 445 +---------------- ...eddedInfinispanLogMinerEventProcessor.java | 88 +--- .../infinispan/InfinispanLogMinerCache.java | 92 ++++ ...emoteInfinispanLogMinerEventProcessor.java | 90 +--- .../memory/MemoryBasedLogMinerCache.java | 83 ++++ .../memory/MemoryLogMinerEventProcessor.java | 83 ++-- .../connector/oracle/util/TestHelper.java | 2 +- 13 files changed, 903 insertions(+), 697 deletions(-) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java rename debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/{infinispan => }/CacheProvider.java (82%) rename debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/{infinispan => }/InMemoryPendingTransactionsCache.java (81%) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerCache.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerCache.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryBasedLogMinerCache.java diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 4c9abccba..2d20fde14 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -23,9 +23,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.infinispan.commons.util.CloseableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +79,7 @@ * * @author Chris Cranford */ -public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor { +public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class); private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff"; @@ -113,14 +111,14 @@ public abstract class AbstractLogMinerEventProcessor abandonedTransactionsCache = new HashSet<>(); - public AbstractLogMinerEventProcessor(ChangeEventSourceContext context, - OracleConnectorConfig connectorConfig, - OracleDatabaseSchema schema, - OraclePartition partition, - OracleOffsetContext offsetContext, - EventDispatcher dispatcher, - LogMinerStreamingChangeEventSourceMetrics metrics, - OracleConnection jdbcConnection) { + protected AbstractLogMinerEventProcessor(ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleDatabaseSchema schema, + OraclePartition partition, + OracleOffsetContext offsetContext, + EventDispatcher dispatcher, + LogMinerStreamingChangeEventSourceMetrics metrics, + OracleConnection jdbcConnection) { this.context = context; this.connectorConfig = connectorConfig; this.schema = schema; @@ -192,7 +190,7 @@ protected Instant getLastProcessedScnChangeTime() { * Returns the {@code TransactionCache} implementation. * @return the transaction cache, never {@code null} */ - protected abstract Map getTransactionCache(); + protected abstract LogMinerCache getTransactionCache(); /** * Creates a new transaction based on the supplied {@code START} event. @@ -260,11 +258,11 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc if (metrics.getNumberOfActiveTransactions() > 0 && LOGGER.isDebugEnabled()) { // This is wrapped in try-with-resources specifically for Infinispan performance - try (Stream stream = getTransactionCache().values().stream()) { + getTransactionCache().values(values -> { LOGGER.debug("All active transactions: {}", - stream.map(t -> t.getTransactionId() + " (" + t.getStartScn() + ")") + values.map(t -> t.getTransactionId() + " (" + t.getStartScn() + ")") .collect(Collectors.joining(","))); - } + }); } metrics.setLastProcessedRowsCount(counters.rows); @@ -1476,7 +1474,7 @@ private ParsedLobWriteSql parseLobWriteSql(String sql) { return new ParsedLobWriteSql(offset, length, data); } - private class ParsedLobWriteSql { + private static class ParsedLobWriteSql { final int offset; final int length; final String data; @@ -1556,44 +1554,38 @@ public void abandonTransactions(Duration retention) throws InterruptedException Scn thresholdScn = lastScnToAbandonTransactions.get(); Scn smallestScn = getTransactionCacheMinimumScn(); if (!smallestScn.isNull() && thresholdScn.compareTo(smallestScn) >= 0) { + LogMinerCache transactionCache = getTransactionCache(); + + Map abandonedT = transactionCache.streamAndReturn(stream -> stream + .filter(e -> e.getValue().getStartScn().compareTo(thresholdScn) <= 0) + .collect(Collectors.toMap(LogMinerCache.Entry::getKey, LogMinerCache.Entry::getValue))); + boolean first = true; - Iterator> iterator = getTransactionCache().entrySet().iterator(); - try { - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getValue().getStartScn().compareTo(thresholdScn) <= 0) { - if (first) { - LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn); - if (LOGGER.isDebugEnabled()) { - try (Stream s = getTransactionCache().keySet().stream()) { - LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", - s.collect(Collectors.joining(","))); - } - } - first = false; - } - LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events) is being abandoned.", - entry.getKey(), entry.getValue().getStartScn(), entry.getValue().getChangeTime(), - entry.getValue().getRedoThreadId(), entry.getValue().getNumberOfEvents()); - - cleanupAfterTransactionRemovedFromCache(entry.getValue(), true); - iterator.remove(); - - metrics.addAbandonedTransactionId(entry.getKey()); - metrics.setActiveTransactionCount(getTransactionCache().size()); - } - } - } - finally { - if (iterator instanceof CloseableIterator) { - ((CloseableIterator>) iterator).close(); + for (Map.Entry entry : abandonedT.entrySet()) { + if (first) { + LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn); + first = false; } + String key = entry.getKey(); + T value = entry.getValue(); + + LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events) is being abandoned.", + key, value.getStartScn(), value.getChangeTime(), + value.getRedoThreadId(), value.getNumberOfEvents()); + + cleanupAfterTransactionRemovedFromCache(value, true); + + transactionCache.remove(key); + metrics.addAbandonedTransactionId(key); + metrics.setActiveTransactionCount(transactionCache.size()); } + if (LOGGER.isDebugEnabled()) { - try (Stream s = getTransactionCache().keySet().stream()) { - LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]", - s.collect(Collectors.joining(","))); - } + LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", + String.join(",", abandonedT.keySet())); + + transactionCache.keys(keys -> LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]", + keys.collect(Collectors.joining(",")))); } // Update the oldest scn metric are transaction abandonment @@ -1661,23 +1653,27 @@ private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration retentio LOGGER.debug("Getting abandon SCN breakpoint based on change time {} (retention {} minutes).", getLastProcessedScnChangeTime(), retention.toMinutes()); - Scn calculatedLastScn = Scn.NULL; - for (Transaction transaction : getTransactionCache().values()) { - final Instant changeTime = transaction.getChangeTime(); - final long diffMinutes = Duration.between(getLastProcessedScnChangeTime(), changeTime).abs().toMinutes(); - if (diffMinutes > 0 && diffMinutes > retention.toMinutes()) { - // We either now will capture the transaction's SCN because it is the first detected transaction - // outside the configured retention period or the transaction has a start SCN that is more recent - // than the current calculated SCN but is still outside the configured retention period. - LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.", - transaction.getTransactionId(), transaction.getStartScn(), changeTime, diffMinutes); - if (calculatedLastScn.isNull() || calculatedLastScn.compareTo(transaction.getStartScn()) < 0) { - calculatedLastScn = transaction.getStartScn(); - } - } - } + return getTransactionCache().streamAndReturn(stream -> { + return stream.map(LogMinerCache.Entry::getValue) + .filter(t -> { + final Instant changeTime = t.getChangeTime(); + final long diffMinutes = Duration.between(getLastProcessedScnChangeTime(), changeTime).abs().toMinutes(); - return calculatedLastScn; + // We either now will capture the transaction's SCN because it is the first detected transaction + // outside the configured retention period or the transaction has a start SCN that is more recent + // than the current calculated SCN but is still outside the configured retention period. + LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.", + t.getTransactionId(), t.getStartScn(), changeTime, diffMinutes); + return diffMinutes > 0 && diffMinutes > retention.toMinutes(); + }) + .max(this::compareStartScn) + .map(Transaction::getStartScn) + .orElse(Scn.NULL); + }); + } + + protected int compareStartScn(T first, T second) { + return first.getStartScn().compareTo(second.getStartScn()); } /** diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java new file mode 100644 index 000000000..a9c018f42 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java @@ -0,0 +1,446 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleDatabaseSchema; +import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; +import io.debezium.relational.TableId; +import io.debezium.util.Loggings; + +/** + * An implementation of {@link LogMinerEventProcessor} + * that uses Infinispan to persist the transaction cache across restarts on disk. + * + * @author Chris Cranford + */ +public abstract class AbstractTransactionCachingLogMinerEventProcessor extends AbstractLogMinerEventProcessor implements CacheProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTransactionCachingLogMinerEventProcessor.class); + private final OracleConnection jdbcConnection; + private final LogMinerStreamingChangeEventSourceMetrics metrics; + private final OraclePartition partition; + private final OracleOffsetContext offsetContext; + private final EventDispatcher dispatcher; + + private final InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache(); + + protected AbstractTransactionCachingLogMinerEventProcessor( + ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + EventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + LogMinerStreamingChangeEventSourceMetrics metrics) { + super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection); + this.jdbcConnection = jdbcConnection; + this.metrics = metrics; + this.partition = partition; + this.offsetContext = offsetContext; + this.dispatcher = dispatcher; + } + + protected void reCreateInMemoryCache() { + getTransactionCache().keys(trStream -> { + trStream.forEach(tr -> { + getEventCache().keys(eventStream -> { + int count = (int) eventStream.filter(e -> e.startsWith(tr + "-")).count(); + LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count); + inMemoryPendingTransactionsCache.initKey(tr, count); + }); + }); + }); + } + + @Override + public void displayCacheStatistics() { + LOGGER.info("Overall Cache Statistics:"); + LOGGER.info("\tTransactions : {}", getTransactionCache().size()); + LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size()); + LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size()); + LOGGER.info("\tEvents : {}", getEventCache().size()); + if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) { + getEventCache().keys(stream -> { + stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey)); + }); + } + } + + @Override + protected boolean isRecentlyProcessed(String transactionId) { + return getProcessedTransactionsCache().containsKey(transactionId); + } + + @Override + protected Scn getTransactionCacheMinimumScn() { + return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue) + .map(Transaction::getStartScn) + .min(Scn::compareTo) + .orElse(Scn.NULL)); + } + + protected Optional getOldestTransactionInCache() { + return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue) + .min(this::compareStartScn)); + } + + @Override + protected String getFirstActiveTransactionKey() { + return getTransactionCache() + .streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null); + } + + @Override + protected void removeEventWithRowId(LogMinerEventRow row) { + List eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-"); + if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) { + // This means that Oracle LogMiner found an event that should be undone but its corresponding + // undo entry was read in a prior mining session and the transaction's sequence could not be + // resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT. + final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); + LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); + LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); + eventKeys = getTransactionKeysWithPrefix(transactionPrefix); + if (!eventKeys.isEmpty()) { + // Enforce that the keys are always reverse sorted. + eventKeys.sort(EventKeySortComparator.INSTANCE.reversed()); + + for (String eventKey : eventKeys) { + final LogMinerEvent event = getEventCache().get(eventKey); + if (event != null && event.getRowId().equals(row.getRowId())) { + Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey); + getEventCache().remove(eventKey); + inMemoryPendingTransactionsCache.decrement(row.getTransactionId()); + return; + } + } + Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), + row.getRowId()); + } + else if (!getConfig().isLobEnabled()) { + Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(), + row.getTransactionId()); + } + } + else { + // Enforce that the keys are always reverse sorted. + eventKeys.sort(EventKeySortComparator.INSTANCE.reversed()); + + for (String eventKey : eventKeys) { + final LogMinerEvent event = getEventCache().get(eventKey); + if (event != null && event.getRowId().equals(row.getRowId())) { + LOGGER.debug("Undo applied for event {}.", event); + getEventCache().remove(eventKey); + inMemoryPendingTransactionsCache.decrement(row.getTransactionId()); + return; + } + } + Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), + row.getRowId()); + } + } + + protected List getTransactionKeysWithPrefix(String prefix) { + AtomicReference> result = new AtomicReference<>(); + getEventCache().keys(stream -> { + result.set(stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList())); + }); + return result.get(); + } + + @Override + protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException { + final String transactionId = row.getTransactionId(); + if (isRecentlyProcessed(transactionId)) { + LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId); + return; + } + super.processRow(partition, row); + } + + @Override + protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) { + return getSchemaChangesCache().containsKey(row.getScn().toString()); + } + + @Override + protected T getAndRemoveTransactionFromCache(String transactionId) { + // // todo: Infinispan bug? + // // When interacting with ISPN with a remote server configuration, the expected + // // behavior was that calling the remove method on the cache would return the + // // existing entry and remove it from the cache; however it always returned null. + // // + // // For now, we're going to use get to obtain the value and then remove it after-the-fact. + // final T transaction = getTransactionCache().get(transactionId); + // if (transaction != null) { + // getTransactionCache().remove(transactionId); + // } + // return transaction; + return getTransactionCache().remove(transactionId); + } + + @Override + protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean isAbandoned) { + super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned); + removeEventsWithTransaction(transaction); + } + + @Override + protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { + getAbandonedTransactionsCache().remove(transactionId); + // cache recently committed transactions by transaction id + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, commitScn.toString()); + } + } + + @Override + protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) { + final T transaction = getTransactionCache().get(transactionId); + if (transaction != null) { + removeEventsWithTransaction(transaction); + getTransactionCache().remove(transactionId); + } + getAbandonedTransactionsCache().remove(transactionId); + if (getConfig().isLobEnabled()) { + getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); + } + } + + @Override + protected void resetTransactionToStart(T transaction) { + super.resetTransactionToStart(transaction); + // Flush the change created by the super class to the transaction cache + getTransactionCache().put(transaction.getTransactionId(), transaction); + } + + @Override + protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException { + super.handleSchemaChange(row); + if (row.getTableName() != null) { + getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier()); + } + } + + @Override + protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { + if (getAbandonedTransactionsCache().contains(transactionId)) { + LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId); + return; + } + if (!isRecentlyProcessed(transactionId)) { + T transaction = getTransactionCache().get(transactionId); + if (transaction == null) { + LOGGER.trace("Transaction {} is not in cache, creating.", transactionId); + transaction = createTransaction(row); + } + + if (isTransactionOverEventThreshold(transaction)) { + abandonTransactionOverEventThreshold(transaction); + return; + } + + String eventKey = transaction.getEventId(transaction.getNextEventId()); + if (!getEventCache().containsKey(eventKey)) { + // Add new event at eventId offset + LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); + getEventCache().put(eventKey, eventSupplier.get()); + metrics.calculateLagFromSource(row.getChangeTime()); + inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId()); + } + // When using Infinispan, this extra put is required so that the state is properly synchronized + getTransactionCache().put(transactionId, transaction); + metrics.setActiveTransactionCount(getTransactionCache().size()); + } + else { + LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId); + } + } + + @Override + protected int getTransactionEventCount(T transaction) { + return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId()); + } + + @Override + protected PreparedStatement createQueryStatement() throws SQLException { + return jdbcConnection.connection().prepareStatement(getQueryString(), + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, + ResultSet.HOLD_CURSORS_OVER_COMMIT); + } + + private void removeEventsWithTransaction(T transaction) { + // Clear the event queue for the transaction + for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { + getEventCache().remove(transaction.getEventId(i)); + } + inMemoryPendingTransactionsCache.remove(transaction.getTransactionId()); + } + + /** + * A comparator that guarantees that the sort order applied to event keys is such that + * they are treated as numerical values, sorted as numeric values rather than strings + * which would allow "100" to come before "9". + */ + private static class EventKeySortComparator implements Comparator { + + public static EventKeySortComparator INSTANCE = new EventKeySortComparator(); + + @Override + public int compare(String o1, String o2) { + if (o1 == null || !o1.contains("-")) { + throw new IllegalStateException("Event Key must be in the format of -"); + } + if (o2 == null || !o2.contains("-")) { + throw new IllegalStateException("Event Key must be in the format of -"); + } + final String[] s1 = o1.split("-"); + final String[] s2 = o2.split("-"); + + // Compare transaction ids, these should generally be identical. + int result = s1[0].compareTo(s2[0]); + if (result == 0) { + result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1])); + } + return result; + } + } + + /** + * Purge the necessary caches with all entries that occurred prior to the specified change number. + *

+ * NOTE: This method is abstract despite the code used by both all implementations being identical. + * This is because the method needed {@code entrySet()} is made available on two different concrete + * interfaces between the embedded and remote cache implementations, and therefore we need to access + * this method from the concrete implementation classes (RemoteCache and Cache) rather than from + * the common class used by CacheProvider (BasicCache). + * + * @param minCacheScn the minimum system change number to keep entries until + */ + protected void purgeCache(Scn minCacheScn) { + getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); + getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); + } + + @Override + protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException { + + // Cleanup caches based on current state of the transaction cache + final Optional oldestTransaction = getOldestTransactionInCache(); + final Scn minCacheScn; + final Instant minCacheScnChangeTime; + if (oldestTransaction.isPresent()) { + minCacheScn = oldestTransaction.get().getStartScn(); + minCacheScnChangeTime = oldestTransaction.get().getChangeTime(); + } + else { + minCacheScn = Scn.NULL; + minCacheScnChangeTime = null; + } + + if (!minCacheScn.isNull()) { + abandonTransactions(getConfig().getLogMiningTransactionRetention()); + purgeCache(minCacheScn); + } + else { + getSchemaChangesCache().removeIf(e -> true); + } + + if (getConfig().isLobEnabled()) { + if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) { + offsetContext.setScn(maxCommittedScn); + dispatcher.dispatchHeartbeatEvent(partition, offsetContext); + } + else { + if (!minCacheScn.isNull()) { + getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); + offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1))); + dispatcher.dispatchHeartbeatEvent(partition, offsetContext); + } + } + return offsetContext.getScn(); + } + else { + + if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) { + // If the last processed SCN is before the endScn we need to use the last processed SCN as the + // next starting point as the LGWR buffer didn't flush all entries from memory to disk yet. + endScn = getLastProcessedScn(); + } + + // update offsets + offsetContext.setScn(endScn); + metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime); + metrics.setOffsetScn(endScn); + + // optionally dispatch a heartbeat event + dispatcher.dispatchHeartbeatEvent(partition, offsetContext); + + return endScn; + } + } + + @Override + protected Iterator getTransactionEventIterator(T transaction) { + return new Iterator<>() { + private final int count = transaction.getNumberOfEvents(); + + private LogMinerEvent nextEvent; + private int index = 0; + + @Override + public boolean hasNext() { + while (index < count) { + nextEvent = getEventCache().get(transaction.getEventId(index)); + if (nextEvent == null) { + LOGGER.debug("Event {} must have been undone, skipped.", index); + // There are situations where an event will be removed from the cache when it is + // undone by the undo-row flag. The event id isn't re-used in this use case so + // the iterator automatically detects null entries and skips them by advancing + // to the next entry until either we've reached the number of events or detected + // a non-null entry available for return + index++; + continue; + } + break; + } + return index < count; + } + + @Override + public LogMinerEvent next() { + index++; + return nextEvent; + } + }; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/CacheProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/CacheProvider.java similarity index 82% rename from debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/CacheProvider.java rename to debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/CacheProvider.java index 794a09a34..d045688e0 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/CacheProvider.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/CacheProvider.java @@ -3,16 +3,14 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.connector.oracle.logminer.processor.infinispan; - -import org.infinispan.commons.api.BasicCache; +package io.debezium.connector.oracle.logminer.processor; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; /** * @author Chris Cranford */ -public interface CacheProvider extends AutoCloseable { +public interface CacheProvider extends AutoCloseable { /** * The name for the transaction cache @@ -49,7 +47,7 @@ public interface CacheProvider extends AutoCloseable { * * @return the transaction cache, never {@code null} */ - BasicCache getTransactionCache(); + LogMinerCache getTransactionCache(); /** * Get the LogMiner events cache @@ -61,7 +59,7 @@ public interface CacheProvider extends AutoCloseable { * * @return the evnts cache, never {@code null} */ - BasicCache getEventCache(); + LogMinerCache getEventCache(); /** * Get the Schema Changes cache @@ -73,7 +71,7 @@ public interface CacheProvider extends AutoCloseable { * * @return the schema changes cache, never {@code null} */ - BasicCache getSchemaChangesCache(); + LogMinerCache getSchemaChangesCache(); /** * Get the processed transactions cache @@ -85,5 +83,5 @@ public interface CacheProvider extends AutoCloseable { * * @return the processed transactions cache, never {@code null} */ - BasicCache getProcessedTransactionsCache(); + LogMinerCache getProcessedTransactionsCache(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InMemoryPendingTransactionsCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/InMemoryPendingTransactionsCache.java similarity index 81% rename from debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InMemoryPendingTransactionsCache.java rename to debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/InMemoryPendingTransactionsCache.java index 80f92cb39..0428e8129 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InMemoryPendingTransactionsCache.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/InMemoryPendingTransactionsCache.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.connector.oracle.logminer.processor.infinispan; +package io.debezium.connector.oracle.logminer.processor; import java.util.HashMap; import java.util.Map; @@ -11,23 +11,23 @@ /** * An in-memory pending transaction cache, used for performance reasons. */ -class InMemoryPendingTransactionsCache { +public class InMemoryPendingTransactionsCache { /*** * Map of transaction ids to the number of events in cache */ private final Map pendingTransactionInEventsCache = new HashMap<>(); - Integer getNumPending(String transactionId) { + public Integer getNumPending(String transactionId) { return pendingTransactionInEventsCache.getOrDefault(transactionId, 0); } - String putOrIncrement(String transactionId) { + public String putOrIncrement(String transactionId) { final Integer i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0); pendingTransactionInEventsCache.put(transactionId, i + 1); return transactionId; } - void decrement(String transactionId) { + public void decrement(String transactionId) { final int i = pendingTransactionInEventsCache.getOrDefault(transactionId, 0); if (i > 0) { pendingTransactionInEventsCache.put(transactionId, i - 1); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerCache.java new file mode 100644 index 000000000..d0f200311 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/LogMinerCache.java @@ -0,0 +1,104 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +/** + * Interface describing the functionality needed to cache data in during LogMiner processing. + */ +public interface LogMinerCache { + + /** + * Consume a Stream of all the entries in the cache. + * @param entryStream + */ + void entries(Consumer>> entryStream); + + /** + * A Stream of available keys will be provided to the given Consumer. + */ + default void keys(Consumer> keyStreamConsumer) { + entries(entryStream -> keyStreamConsumer.accept(entryStream.map(Entry::getKey))); + } + + /** + * A Stream of available values will be provided to the given Consumer. + */ + default void values(Consumer> valueStreamConsumer) { + entries(entryStream -> valueStreamConsumer.accept(entryStream.map(Entry::getValue))); + } + + /** + * Clear all keys/values from the cache. + */ + void clear(); + + /** + * Retrieve the value for the given key. + */ + V get(K key); + + /** + * Returns true if the cache is empty. + */ + boolean isEmpty(); + + /** + * Returns true if the cache contains the given key. + */ + boolean containsKey(K key); + + /** + * Add the key and value into the cache. + */ + void put(K key, V value); + + /** + * Remove the given key from the cache and return the value that was associated with it. + */ + V remove(K key); + + /** + * Returns the size of the cache. + */ + int size(); + + void forEach(BiConsumer action); + + /** + * Remove all keys/values from the cache when the {@link Predicate} returns true; + */ + void removeIf(Predicate> predicate); + + /** + * Apply the given function to the provided stream and return the result from the function. + */ + T streamAndReturn(Function>, T> function); + + class Entry { + private final K key; + private final V value; + + public Entry(K key, V value) { + this.key = key; + this.value = value; + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + } + +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java index 987d6db86..3e4590be3 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/Transaction.java @@ -50,6 +50,18 @@ public interface Transaction { */ int getNumberOfEvents(); + /** + * Return the id of an event based on its index. + * @param index the index of the event + * @return the event id + */ + default String getEventId(int index) { + if (index < 0 || index >= getNumberOfEvents()) { + throw new IndexOutOfBoundsException("Index " + index + "outside the transaction " + getTransactionId() + " event list bounds"); + } + return getTransactionId() + "-" + index; + } + /** * Helper method to get the next event identifier for the transaction. * @@ -66,9 +78,10 @@ public interface Transaction { /** * Helper method that resets the event identifier back to {@code 0}. - * + *

* This should be called when a transaction {@code START} event is detected in the event stream. * This is required when LOB support is enabled to facilitate the re-mining of existing events. */ void start(); + } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index c7ba26005..30b46eff9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -5,38 +5,18 @@ */ package io.debezium.connector.oracle.logminer.processor.infinispan; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.time.Instant; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Predicate; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import org.infinispan.commons.util.CloseableIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; -import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; -import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; -import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.CacheProvider; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; -import io.debezium.util.Loggings; /** * An implementation of {@link io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor} @@ -44,421 +24,22 @@ * * @author Chris Cranford */ -public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor implements CacheProvider { +public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor + implements CacheProvider { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class); - private final OracleConnection jdbcConnection; - private final LogMinerStreamingChangeEventSourceMetrics metrics; - private final OraclePartition partition; - private final OracleOffsetContext offsetContext; - private final EventDispatcher dispatcher; - - private InMemoryPendingTransactionsCache inMemoryPendingTransactionsCache = new InMemoryPendingTransactionsCache(); - - private static AbstractInfinispanLogMinerEventProcessor instance; - - public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context, - OracleConnectorConfig connectorConfig, - OracleConnection jdbcConnection, - EventDispatcher dispatcher, - OraclePartition partition, - OracleOffsetContext offsetContext, - OracleDatabaseSchema schema, - LogMinerStreamingChangeEventSourceMetrics metrics) { - super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics, jdbcConnection); - this.jdbcConnection = jdbcConnection; - this.metrics = metrics; - this.partition = partition; - this.offsetContext = offsetContext; - this.dispatcher = dispatcher; - AbstractInfinispanLogMinerEventProcessor.instance = this; - } - - protected void reCreateInMemoryCache() { - try (Stream trStream = getTransactionCache().keySet().stream()) { - trStream.forEach(tr -> { - try (Stream eventStream = getEventCache().keySet().stream()) { - int count = (int) eventStream.filter(k -> k.startsWith(tr + "-")).count(); - LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count); - inMemoryPendingTransactionsCache.initKey(tr, count); - } - }); - } - } - - /** - * Can be used for reporting in Debezium Embedded mode - */ - public static void logCacheStats() { - if (instance != null) { - AbstractInfinispanLogMinerEventProcessor.instance.displayCacheStatistics(); - } - else { - LOGGER.trace("AbstractInfinispanLogMinerEventProcessor is not initialized, skipping logging stats."); - } - } - - @Override - public void displayCacheStatistics() { - LOGGER.info("Overall Cache Statistics:"); - LOGGER.info("\tTransactions : {}", getTransactionCache().size()); - LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size()); - LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size()); - LOGGER.info("\tEvents : {}", getEventCache().size()); - if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) { - try (Stream stream = getEventCache().keySet().stream()) { - stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey)); - } - } - } - - @Override - protected boolean isRecentlyProcessed(String transactionId) { - return getProcessedTransactionsCache().containsKey(transactionId); + protected AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + EventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + LogMinerStreamingChangeEventSourceMetrics metrics) { + super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics); } @Override protected InfinispanTransaction createTransaction(LogMinerEventRow row) { return new InfinispanTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName(), row.getThread()); } - - @Override - protected void removeEventWithRowId(LogMinerEventRow row) { - List eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-"); - if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) { - // This means that Oracle LogMiner found an event that should be undone but its corresponding - // undo entry was read in a prior mining session and the transaction's sequence could not be - // resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT. - final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); - LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); - LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); - eventKeys = getTransactionKeysWithPrefix(transactionPrefix); - if (!eventKeys.isEmpty()) { - // Enforce that the keys are always reverse sorted. - eventKeys.sort(EventKeySortComparator.INSTANCE.reversed()); - - for (String eventKey : eventKeys) { - final LogMinerEvent event = getEventCache().get(eventKey); - if (event != null && event.getRowId().equals(row.getRowId())) { - Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' applied to transaction '{}'", row.getTableId(), eventKey); - getEventCache().remove(eventKey); - inMemoryPendingTransactionsCache.decrement(row.getTransactionId()); - return; - } - } - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), - row.getRowId()); - } - else if (!getConfig().isLobEnabled()) { - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since transaction '{}' was not found.", row.getTableId(), - row.getTransactionId()); - } - } - else { - // Enforce that the keys are always reverse sorted. - eventKeys.sort(EventKeySortComparator.INSTANCE.reversed()); - - for (String eventKey : eventKeys) { - final LogMinerEvent event = getEventCache().get(eventKey); - if (event != null && event.getRowId().equals(row.getRowId())) { - LOGGER.debug("Undo applied for event {}.", event); - getEventCache().remove(eventKey); - inMemoryPendingTransactionsCache.decrement(row.getTransactionId()); - return; - } - } - Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), - row.getRowId()); - } - } - - private List getTransactionKeysWithPrefix(String prefix) { - try (Stream stream = getEventCache().keySet().stream()) { - return stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList()); - } - } - - @Override - protected void processRow(OraclePartition partition, LogMinerEventRow row) throws SQLException, InterruptedException { - final String transactionId = row.getTransactionId(); - if (isRecentlyProcessed(transactionId)) { - LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId); - return; - } - super.processRow(partition, row); - } - - @Override - protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) { - return getSchemaChangesCache().containsKey(row.getScn().toString()); - } - - @Override - protected InfinispanTransaction getAndRemoveTransactionFromCache(String transactionId) { - // todo: Infinispan bug? - // When interacting with ISPN with a remote server configuration, the expected - // behavior was that calling the remove method on the cache would return the - // existing entry and remove it from the cache; however it always returned null. - // - // For now, we're going to use get to obtain the value and then remove it after-the-fact. - final InfinispanTransaction transaction = getTransactionCache().get(transactionId); - if (transaction != null) { - getTransactionCache().remove(transactionId); - } - return transaction; - } - - @Override - protected void cleanupAfterTransactionRemovedFromCache(InfinispanTransaction transaction, boolean isAbandoned) { - super.cleanupAfterTransactionRemovedFromCache(transaction, isAbandoned); - removeEventsWithTransaction(transaction); - } - - @Override - protected Iterator getTransactionEventIterator(InfinispanTransaction transaction) { - return new Iterator() { - private final int count = transaction.getNumberOfEvents(); - - private LogMinerEvent nextEvent; - private int index = 0; - - @Override - public boolean hasNext() { - while (index < count) { - nextEvent = getEventCache().get(transaction.getEventId(index)); - if (nextEvent == null) { - LOGGER.debug("Event {} must have been undone, skipped.", index); - // There are situations where an event will be removed from the cache when it is - // undone by the undo-row flag. The event id isn't re-used in this use case so - // the iterator automatically detects null entries and skips them by advancing - // to the next entry until either we've reached the number of events or detected - // a non-null entry available for return - index++; - continue; - } - break; - } - return index < count; - } - - @Override - public LogMinerEvent next() { - index++; - return nextEvent; - } - }; - } - - @Override - protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { - getAbandonedTransactionsCache().remove(transactionId); - // cache recently committed transactions by transaction id - if (getConfig().isLobEnabled()) { - getProcessedTransactionsCache().put(transactionId, commitScn.toString()); - } - } - - @Override - protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn) { - final InfinispanTransaction transaction = getTransactionCache().get(transactionId); - if (transaction != null) { - removeEventsWithTransaction(transaction); - getTransactionCache().remove(transactionId); - } - getAbandonedTransactionsCache().remove(transactionId); - if (getConfig().isLobEnabled()) { - getProcessedTransactionsCache().put(transactionId, rollbackScn.toString()); - } - } - - @Override - protected void resetTransactionToStart(InfinispanTransaction transaction) { - super.resetTransactionToStart(transaction); - // Flush the change created by the super class to the transaction cache - getTransactionCache().put(transaction.getTransactionId(), transaction); - } - - @Override - protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException { - super.handleSchemaChange(row); - if (row.getTableName() != null) { - getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier()); - } - } - - @Override - protected void addToTransaction(String transactionId, LogMinerEventRow row, Supplier eventSupplier) { - if (getAbandonedTransactionsCache().contains(transactionId)) { - LOGGER.warn("Event for abandoned transaction {}, skipped.", transactionId); - return; - } - if (!isRecentlyProcessed(transactionId)) { - InfinispanTransaction transaction = getTransactionCache().get(transactionId); - if (transaction == null) { - LOGGER.trace("Transaction {} is not in cache, creating.", transactionId); - transaction = createTransaction(row); - } - - if (isTransactionOverEventThreshold(transaction)) { - abandonTransactionOverEventThreshold(transaction); - return; - } - - String eventKey = transaction.getEventId(transaction.getNextEventId()); - if (!getEventCache().containsKey(eventKey)) { - // Add new event at eventId offset - LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); - getEventCache().put(eventKey, eventSupplier.get()); - metrics.calculateLagFromSource(row.getChangeTime()); - inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId()); - } - // When using Infinispan, this extra put is required so that the state is properly synchronized - getTransactionCache().put(transactionId, transaction); - metrics.setActiveTransactionCount(getTransactionCache().size()); - } - else { - LOGGER.warn("Event for transaction {} skipped as transaction has been processed.", transactionId); - } - } - - @Override - protected int getTransactionEventCount(InfinispanTransaction transaction) { - return inMemoryPendingTransactionsCache.getNumPending(transaction.getTransactionId()); - } - - @Override - protected PreparedStatement createQueryStatement() throws SQLException { - return jdbcConnection.connection().prepareStatement(getQueryString(), - ResultSet.TYPE_FORWARD_ONLY, - ResultSet.CONCUR_READ_ONLY, - ResultSet.HOLD_CURSORS_OVER_COMMIT); - } - - @Override - protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException { - - // Cleanup caches based on current state of the transaction cache - final Optional oldestTransaction = getOldestTransactionInCache(); - final Scn minCacheScn; - final Instant minCacheScnChangeTime; - if (oldestTransaction.isPresent()) { - minCacheScn = oldestTransaction.get().getStartScn(); - minCacheScnChangeTime = oldestTransaction.get().getChangeTime(); - } - else { - minCacheScn = Scn.NULL; - minCacheScnChangeTime = null; - } - - if (!minCacheScn.isNull()) { - abandonTransactions(getConfig().getLogMiningTransactionRetention()); - purgeCache(minCacheScn); - } - else { - getSchemaChangesCache().entrySet().removeIf(e -> true); - } - - if (getConfig().isLobEnabled()) { - if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) { - offsetContext.setScn(maxCommittedScn); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - else { - if (!minCacheScn.isNull()) { - getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); - offsetContext.setScn(minCacheScn.subtract(Scn.valueOf(1))); - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - } - } - return offsetContext.getScn(); - } - else { - - if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(endScn) < 0) { - // If the last processed SCN is before the endScn we need to use the last processed SCN as the - // next starting point as the LGWR buffer didn't flush all entries from memory to disk yet. - endScn = getLastProcessedScn(); - } - - // update offsets - offsetContext.setScn(endScn); - metrics.setOldestScnDetails(minCacheScn, minCacheScnChangeTime); - metrics.setOffsetScn(endScn); - - // optionally dispatch a heartbeat event - dispatcher.dispatchHeartbeatEvent(partition, offsetContext); - - return endScn; - } - } - - /** - * Purge the necessary caches with all entries that occurred prior to the specified change number. - * - * NOTE: This method is abstract despite the code used by both all implementations being identical. - * This is because the method needed {@code entrySet()} is made available on two different concrete - * interfaces between the embedded and remote cache implementations, and therefore we need to access - * this method from the concrete implementation classes (RemoteCache and Cache) rather than from - * the common class used by CacheProvider (BasicCache). - * - * @param minCacheScn the minimum system change number to keep entries until - */ - protected abstract void purgeCache(Scn minCacheScn); - - /** - * Helper method to remove entries that match the given predicate from a closeable iterator. - * This method guarantees that the underlying resources are released at the end of the operation. - * - * @param iterator the iterator - * @param filter the predicate - * @param the key type - * @param the value type - */ - protected void removeIf(CloseableIterator> iterator, Predicate> filter) { - try (CloseableIterator> it = iterator) { - while (it.hasNext()) { - final Map.Entry entry = it.next(); - if (filter.test(entry)) { - it.remove(); - } - } - } - } - - private void removeEventsWithTransaction(InfinispanTransaction transaction) { - // Clear the event queue for the transaction - for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { - getEventCache().remove(transaction.getEventId(i)); - } - inMemoryPendingTransactionsCache.remove(transaction.getTransactionId()); - } - - /** - * A comparator that guarantees that the sort order applied to event keys is such that - * they are treated as numerical values, sorted as numeric values rather than strings - * which would allow "100" to come before "9". - */ - private static class EventKeySortComparator implements Comparator { - - public static EventKeySortComparator INSTANCE = new EventKeySortComparator(); - - @Override - public int compare(String o1, String o2) { - if (o1 == null || !o1.contains("-")) { - throw new IllegalStateException("Event Key must be in the format of -"); - } - if (o2 == null || !o2.contains("-")) { - throw new IllegalStateException("Event Key must be in the format of -"); - } - final String[] s1 = o1.split("-"); - final String[] s2 = o2.split("-"); - - // Compare transaction ids, these should generally be identical. - int result = s1[0].compareTo(s2[0]); - if (result == 0) { - result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1])); - } - return result; - } - } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java index 2cbf99e27..7a6c76ea7 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java @@ -12,11 +12,7 @@ import java.util.Map; import java.util.Objects; -import java.util.Optional; -import org.infinispan.Cache; -import org.infinispan.commons.api.BasicCache; -import org.infinispan.commons.util.CloseableIterator; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfiguration; @@ -35,9 +31,10 @@ import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; -import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.processor.CacheProvider; +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; @@ -58,10 +55,10 @@ public class EmbeddedInfinispanLogMinerEventProcessor extends AbstractInfinispan private final EmbeddedCacheManager cacheManager; private final boolean dropBufferOnStop; - private final Cache transactionCache; - private final Cache eventCache; - private final Cache processedTransactionsCache; - private final Cache schemaChangesCache; + private final LogMinerCache transactionCache; + private final LogMinerCache eventCache; + private final LogMinerCache processedTransactionsCache; + private final LogMinerCache schemaChangesCache; public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, @@ -107,87 +104,26 @@ public void close() throws Exception { } @Override - public BasicCache getTransactionCache() { + public LogMinerCache getTransactionCache() { return transactionCache; } @Override - public BasicCache getEventCache() { + public LogMinerCache getEventCache() { return eventCache; } @Override - public BasicCache getSchemaChangesCache() { + public LogMinerCache getSchemaChangesCache() { return schemaChangesCache; } @Override - public BasicCache getProcessedTransactionsCache() { + public LogMinerCache getProcessedTransactionsCache() { return processedTransactionsCache; } - @Override - protected Scn getTransactionCacheMinimumScn() { - Scn minimumScn = Scn.NULL; - try (CloseableIterator iterator = transactionCache.values().iterator()) { - while (iterator.hasNext()) { - final Scn transactionScn = iterator.next().getStartScn(); - if (minimumScn.isNull()) { - minimumScn = transactionScn; - } - else { - if (transactionScn.compareTo(minimumScn) < 0) { - minimumScn = transactionScn; - } - } - } - } - return minimumScn; - } - - @Override - protected Optional getOldestTransactionInCache() { - InfinispanTransaction transaction = null; - try (CloseableIterator iterator = transactionCache.values().iterator()) { - if (iterator.hasNext()) { - // Seed with the first element - transaction = iterator.next(); - while (iterator.hasNext()) { - final InfinispanTransaction entry = iterator.next(); - int comparison = entry.getStartScn().compareTo(transaction.getStartScn()); - if (comparison < 0) { - // if entry has a smaller scn, it came before. - transaction = entry; - } - else if (comparison == 0) { - // if entry has an equal scn, compare the change times. - if (entry.getChangeTime().isBefore(transaction.getChangeTime())) { - transaction = entry; - } - } - } - } - } - return Optional.ofNullable(transaction); - } - - @Override - protected String getFirstActiveTransactionKey() { - try (CloseableIterator iterator = transactionCache.keySet().iterator()) { - if (iterator.hasNext()) { - return iterator.next(); - } - } - return null; - } - - @Override - protected void purgeCache(Scn minCacheScn) { - removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); - removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); - } - - private Cache createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) { + private LogMinerCache createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) { Objects.requireNonNull(cacheName); final String cacheConfiguration = connectorConfig.getConfig().getString(field); @@ -195,7 +131,7 @@ private Cache createCache(String cacheName, OracleConnectorConfig c // define the cache, parsing the supplied XML configuration cacheManager.defineConfiguration(cacheName, parseAndGetConfiguration(cacheName, cacheConfiguration)); - return cacheManager.getCache(cacheName); + return new InfinispanLogMinerCache<>(cacheManager.getCache(cacheName)); } private Configuration parseAndGetConfiguration(String cacheName, String configuration) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerCache.java new file mode 100644 index 000000000..4bef86bb5 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/InfinispanLogMinerCache.java @@ -0,0 +1,92 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.infinispan; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.infinispan.client.hotrod.Flag; +import org.infinispan.client.hotrod.RemoteCache; +import org.infinispan.commons.api.BasicCache; + +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; + +public class InfinispanLogMinerCache implements LogMinerCache { + + private final BasicCache cache; + + public InfinispanLogMinerCache(BasicCache cache) { + this.cache = cache; + } + + @Override + public boolean containsKey(K key) { + return cache.containsKey(key); + } + + @Override + public int size() { + return cache.size(); + } + + @Override + public boolean isEmpty() { + return cache.isEmpty(); + } + + @Override + public V remove(K key) { + if (cache instanceof RemoteCache remoteCache) { + return remoteCache.withFlags(Flag.FORCE_RETURN_VALUE).remove(key); + } + return cache.remove(key); + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public void clear() { + cache.clear(); + } + + @Override + public void forEach(BiConsumer action) { + cache.forEach(action); + } + + @Override + public void removeIf(Predicate> predicate) { + this.cache.entrySet().removeIf(kvEntry -> predicate.test(new Entry<>(kvEntry.getKey(), kvEntry.getValue()))); + } + + @Override + public void entries(Consumer>> streamConsumer) { + try (Stream> stream = this.cache.entrySet() + .stream() + .map(e -> new Entry<>(e.getKey(), e.getValue()))) { + streamConsumer.accept(stream); + } + } + + @Override + public T streamAndReturn(Function>, T> function) { + try (Stream> stream = this.cache.entrySet().stream()) { + return function.apply(stream.map(e -> new Entry<>(e.getKey(), e.getValue()))); + } + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java index 872b33c48..a49ba83ab 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java @@ -12,18 +12,14 @@ import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Properties; -import org.checkerframework.checker.units.qual.C; import org.infinispan.client.hotrod.RemoteCache; import org.infinispan.client.hotrod.RemoteCacheManager; import org.infinispan.client.hotrod.configuration.Configuration; import org.infinispan.client.hotrod.configuration.ConfigurationBuilder; import org.infinispan.client.hotrod.impl.ConfigurationProperties; -import org.infinispan.commons.api.BasicCache; import org.infinispan.commons.configuration.XMLStringConfiguration; -import org.infinispan.commons.util.CloseableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,9 +30,10 @@ import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; import io.debezium.connector.oracle.OraclePartition; -import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.processor.CacheProvider; +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl; import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl; import io.debezium.pipeline.EventDispatcher; @@ -64,10 +61,10 @@ public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLo private final RemoteCacheManager cacheManager; private final boolean dropBufferOnStop; - private final RemoteCache transactionCache; - private final RemoteCache eventCache; - private final RemoteCache processedTransactionsCache; - private final RemoteCache schemaChangesCache; + private final LogMinerCache transactionCache; + private final LogMinerCache eventCache; + private final LogMinerCache processedTransactionsCache; + private final LogMinerCache schemaChangesCache; public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context, OracleConnectorConfig connectorConfig, @@ -119,86 +116,25 @@ public void close() throws Exception { } @Override - public BasicCache getTransactionCache() { + public LogMinerCache getTransactionCache() { return transactionCache; } @Override - public BasicCache getEventCache() { + public LogMinerCache getEventCache() { return eventCache; } @Override - public BasicCache getSchemaChangesCache() { + public LogMinerCache getSchemaChangesCache() { return schemaChangesCache; } @Override - public BasicCache getProcessedTransactionsCache() { + public LogMinerCache getProcessedTransactionsCache() { return processedTransactionsCache; } - @Override - protected Scn getTransactionCacheMinimumScn() { - Scn minimumScn = Scn.NULL; - try (CloseableIterator iterator = transactionCache.values().iterator()) { - while (iterator.hasNext()) { - final Scn transactionScn = iterator.next().getStartScn(); - if (minimumScn.isNull()) { - minimumScn = transactionScn; - } - else { - if (transactionScn.compareTo(minimumScn) < 0) { - minimumScn = transactionScn; - } - } - } - } - return minimumScn; - } - - @Override - protected Optional getOldestTransactionInCache() { - InfinispanTransaction transaction = null; - try (CloseableIterator iterator = transactionCache.values().iterator()) { - if (iterator.hasNext()) { - // Seed with the first element - transaction = iterator.next(); - while (iterator.hasNext()) { - final InfinispanTransaction entry = iterator.next(); - int comparison = entry.getStartScn().compareTo(transaction.getStartScn()); - if (comparison < 0) { - // if entry has a smaller scn, it came before. - transaction = entry; - } - else if (comparison == 0) { - // if entry has an equal scn, compare the change times. - if (entry.getChangeTime().isBefore(transaction.getChangeTime())) { - transaction = entry; - } - } - } - } - } - return Optional.ofNullable(transaction); - } - - @Override - protected String getFirstActiveTransactionKey() { - try (CloseableIterator iterator = transactionCache.keySet().iterator()) { - if (iterator.hasNext()) { - return iterator.next(); - } - } - return null; - } - - @Override - protected void purgeCache(Scn minCacheScn) { - removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); - removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); - } - private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) { final Map clientSettings = connectorConfig.getConfig() .subset(HOTROD_CLIENT_LOOKUP_PREFIX, true) @@ -215,14 +151,14 @@ private Properties getHotrodClientProperties(OracleConnectorConfig connectorConf return properties; } - private RemoteCache createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) { + private LogMinerCache createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) { Objects.requireNonNull(cacheName); RemoteCache cache = cacheManager.getCache(cacheName); if (cache != null) { // cache is already defined, simply return it LOGGER.info("Remote cache '{}' already defined.", cacheName); - return cache; + return new InfinispanLogMinerCache<>(cache); } final String cacheConfiguration = connectorConfig.getConfig().getString(field); @@ -234,6 +170,6 @@ private RemoteCache createCache(String cacheName, OracleConnectorCo } LOGGER.info("Created remote infinispan cache: {}", cacheName); - return cache; + return new InfinispanLogMinerCache<>(cache); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryBasedLogMinerCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryBasedLogMinerCache.java new file mode 100644 index 000000000..9335c58b7 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryBasedLogMinerCache.java @@ -0,0 +1,83 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.memory; + +import java.util.Map; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; + +public class MemoryBasedLogMinerCache implements LogMinerCache { + + private final Map map; + + public MemoryBasedLogMinerCache(Map map) { + this.map = map; + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(K key) { + return map.containsKey(key); + } + + @Override + public V remove(K key) { + return map.remove(key); + } + + @Override + public V get(K key) { + return map.get(key); + } + + @Override + public void put(K key, V value) { + map.put(key, value); + } + + @Override + public void forEach(BiConsumer action) { + map.forEach(action); + } + + @Override + public void removeIf(Predicate> predicate) { + this.map.entrySet().removeIf(kvEntry -> predicate.test(new Entry<>(kvEntry.getKey(), kvEntry.getValue()))); + } + + @Override + public void entries(Consumer>> streamConsumer) { + streamConsumer.accept(map.entrySet().stream() + .map(e -> new LogMinerCache.Entry<>(e.getKey(), e.getValue()))); + } + + @Override + public T streamAndReturn(Function>, T> function) { + try (Stream> stream = map.entrySet().stream()) { + return function.apply(stream.map(e -> new Entry<>(e.getKey(), e.getValue()))); + } + } + +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index 15159aabe..e3c2ef282 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -14,6 +14,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.slf4j.Logger; @@ -29,6 +30,7 @@ import io.debezium.connector.oracle.logminer.events.LogMinerEvent; import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; @@ -41,6 +43,7 @@ * * @author Chris Cranford */ +// TODO: can this be a caching impl now as well, just with map public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(MemoryLogMinerEventProcessor.class); @@ -49,10 +52,11 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor private final OracleOffsetContext offsetContext; private final LogMinerStreamingChangeEventSourceMetrics metrics; + private final HashMap transactionHashMap = new HashMap<>(); /** * Cache of transactions, keyed based on the transaction's unique identifier */ - private final Map transactionCache = new HashMap<>(); + private final LogMinerCache transactionCache = new MemoryBasedLogMinerCache<>(transactionHashMap); /** * Cache of processed transactions (committed or rolled back), keyed based on the transaction's unique identifier. */ @@ -75,7 +79,7 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context, } @Override - protected Map getTransactionCache() { + public LogMinerCache getTransactionCache() { return transactionCache; } @@ -95,17 +99,23 @@ protected void removeEventWithRowId(LogMinerEventRow row) { final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); - for (String transactionKey : getTransactionCache().keySet()) { + + getTransactionCache().forEach((transactionKey, v) -> { if (transactionKey.startsWith(transactionPrefix)) { - transaction = getTransactionCache().get(transactionKey); - if (transaction != null && transaction.removeEventWithRowId(row.getRowId())) { + MemoryTransaction found = getTransactionCache().get(transactionKey); + // TODO: isn't found the same as v? + if (found != v) { + LOGGER.warn("HOW DID THIS HAPPEN?"); + } + + if (found != null && found.removeEventWithRowId(row.getRowId())) { // We successfully found a transaction with the same XISUSN and XIDSLT and that // transaction included a change for the specified row id. Loggings.logDebugAndTraceRecord(LOGGER, row, "Undo change on table '{}' was applied to transaction '{}'", row.getTableId(), transactionKey); - return; } } - } + }); + Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found", row.getTableId(), row.getRowId()); } @@ -167,8 +177,14 @@ protected void finalizeTransactionRollback(String transactionId, Scn rollbackScn @Override protected String getFirstActiveTransactionKey() { - final Iterator keyIterator = transactionCache.keySet().iterator(); - return keyIterator.hasNext() ? keyIterator.next() : null; + AtomicReference result = new AtomicReference<>(); + transactionCache.keys(keys -> { + Iterator iterator = keys.iterator(); + if (iterator.hasNext()) { + result.set(iterator.next()); + } + }); + return result.get(); } @Override @@ -273,33 +289,38 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter @Override protected Scn getTransactionCacheMinimumScn() { - return transactionCache.values().stream() - .map(MemoryTransaction::getStartScn) - .min(Scn::compareTo) - .orElse(Scn.NULL); + AtomicReference result = new AtomicReference<>(); + transactionCache.values(stream -> { + result.set(stream.map(MemoryTransaction::getStartScn) + .min(Scn::compareTo) + .orElse(Scn.NULL)); + }); + return result.get(); } - @Override + // TODO: extend cache processor? protected Optional getOldestTransactionInCache() { - MemoryTransaction transaction = null; - if (!transactionCache.isEmpty()) { - // Seed with the first element - transaction = transactionCache.values().iterator().next(); - for (MemoryTransaction entry : transactionCache.values()) { - int comparison = entry.getStartScn().compareTo(transaction.getStartScn()); - if (comparison < 0) { - // if entry has a smaller scn, it came before. - transaction = entry; - } - else if (comparison == 0) { - // if entry has an equal scn, compare the change times. - if (entry.getChangeTime().isBefore(transaction.getChangeTime())) { - transaction = entry; - } - } + + return getTransactionCache().streamAndReturn(entryStream -> entryStream + .map(LogMinerCache.Entry::getValue) + .min(this::compareTransactions)); + } + + protected MemoryTransaction compareOldest(MemoryTransaction first, MemoryTransaction second) { + int comparison = first.getStartScn().compareTo(second.getStartScn()); + if (comparison < 0) { + return second; + } + else if (comparison == 0) { + if (second.getChangeTime().isBefore(first.getChangeTime())) { + return second; } } - return Optional.ofNullable(transaction); + return first; } + protected int compareTransactions(MemoryTransaction first, MemoryTransaction second) { + return first.getStartScn().compareTo(second.getStartScn()); + + } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index b551ed366..5f2e691f2 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -33,7 +33,7 @@ import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType; import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningStrategy; import io.debezium.connector.oracle.Scn; -import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider; +import io.debezium.connector.oracle.logminer.processor.CacheProvider; import io.debezium.connector.oracle.rest.DebeziumOracleConnectorResourceIT; import io.debezium.embedded.async.AsyncEmbeddedEngine; import io.debezium.jdbc.JdbcConfiguration;