diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index d55ad78ab..e54877558 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -1546,6 +1546,7 @@ protected void abandonTransactionOverEventThreshold(T transaction) { metrics.incrementOversizedTransactionCount(); } + //this implementation is different @Override public void abandonTransactions(Duration retention) throws InterruptedException { if (!Duration.ZERO.equals(retention)) { @@ -1556,12 +1557,12 @@ public void abandonTransactions(Duration retention) throws InterruptedException if (!smallestScn.isNull() && thresholdScn.compareTo(smallestScn) >= 0) { LogMinerCache transactionCache = getTransactionCache(); - Map abandonedT = transactionCache.streamAndReturn(stream -> stream + Map abandoned = transactionCache.streamAndReturn(stream -> stream .filter(e -> e.getValue().getStartScn().compareTo(thresholdScn) <= 0) .collect(Collectors.toMap(LogMinerCache.Entry::getKey, LogMinerCache.Entry::getValue))); boolean first = true; - for (Map.Entry entry : abandonedT.entrySet()) { + for (Map.Entry entry : abandoned.entrySet()) { if (first) { LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn); first = false; @@ -1582,7 +1583,7 @@ public void abandonTransactions(Duration retention) throws InterruptedException if (LOGGER.isDebugEnabled()) { LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", - String.join(",", abandonedT.keySet())); + String.join(",", abandoned.keySet())); transactionCache.keys(keys -> LOGGER.debug("List of transactions in the cache after transactions being abandoned: [{}]", keys.collect(Collectors.joining(",")))); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java index 5f39dee4d..3d48d8a36 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractTransactionCachingLogMinerEventProcessor.java @@ -97,6 +97,7 @@ protected boolean isRecentlyProcessed(String transactionId) { return getProcessedTransactionsCache().containsKey(transactionId); } + //from EmbeddedInfinispanLogMinerEventProcessor, impl is different @Override protected Scn getTransactionCacheMinimumScn() { return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue) @@ -105,17 +106,20 @@ protected Scn getTransactionCacheMinimumScn() { .orElse(Scn.NULL)); } + //from EmbeddedInfinispanLogMinerEventProcessor, impl is different protected Optional getOldestTransactionInCache() { return getTransactionCache().streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getValue) .min(this::oldestTransactionComparison)); } + //from EmbeddedInfinispanLogMinerEventProcessor, impl is different @Override protected String getFirstActiveTransactionKey() { return getTransactionCache() .streamAndReturn(stream -> stream.map(LogMinerCache.Entry::getKey).findFirst()).orElse(null); } + //this impl is different @Override protected void removeEventWithRowId(LogMinerEventRow row) { // locate the events based solely on XIDUSN and XIDSLT. @@ -137,6 +141,9 @@ protected void removeEventWithRowId(LogMinerEventRow row) { LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); LOGGER.debug("Checking all transactions with prefix '{}'", basePrefix); eventKeys = eventKeysForBasePrefix; + + Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), + row.getRowId()); } if (!eventKeys.isEmpty()) { @@ -172,6 +179,7 @@ protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) { return getSchemaChangesCache().containsKey(row.getScn().toString()); } + //different from AbstractInfinispanLogMinerEventProcessor @Override protected T getAndRemoveTransactionFromCache(String transactionId) { return getTransactionCache().remove(transactionId); @@ -183,6 +191,41 @@ protected void cleanupAfterTransactionRemovedFromCache(T transaction, boolean is removeEventsWithTransaction(transaction); } + @Override + protected Iterator getTransactionEventIterator(T transaction) { + return new Iterator<>() { + private final int count = transaction.getNumberOfEvents(); + + private LogMinerEvent nextEvent; + private int index = 0; + + @Override + public boolean hasNext() { + while (index < count) { + nextEvent = getEventCache().get(transaction.getEventId(index)); + if (nextEvent == null) { + LOGGER.debug("Event {} must have been undone, skipped.", index); + // There are situations where an event will be removed from the cache when it is + // undone by the undo-row flag. The event id isn't re-used in this use case so + // the iterator automatically detects null entries and skips them by advancing + // to the next entry until either we've reached the number of events or detected + // a non-null entry available for return + index++; + continue; + } + break; + } + return index < count; + } + + @Override + public LogMinerEvent next() { + index++; + return nextEvent; + } + }; + } + @Override protected void finalizeTransactionCommit(String transactionId, Scn commitScn) { getAbandonedTransactionsCache().remove(transactionId); @@ -268,59 +311,6 @@ protected PreparedStatement createQueryStatement() throws SQLException { ResultSet.HOLD_CURSORS_OVER_COMMIT); } - private void removeEventsWithTransaction(T transaction) { - // Clear the event queue for the transaction - for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { - getEventCache().remove(transaction.getEventId(i)); - } - inMemoryPendingTransactionsCache.remove(transaction.getTransactionId()); - } - - /** - * A comparator that guarantees that the sort order applied to event keys is such that - * they are treated as numerical values, sorted as numeric values rather than strings - * which would allow "100" to come before "9". - */ - private static class EventKeySortComparator implements Comparator { - - public static final EventKeySortComparator INSTANCE = new EventKeySortComparator(); - - @Override - public int compare(String o1, String o2) { - if (o1 == null || !o1.contains("-")) { - throw new IllegalStateException("Event Key must be in the format of -"); - } - if (o2 == null || !o2.contains("-")) { - throw new IllegalStateException("Event Key must be in the format of -"); - } - final String[] s1 = o1.split("-"); - final String[] s2 = o2.split("-"); - - // Compare transaction ids, these should generally be identical. - int result = s1[0].compareTo(s2[0]); - if (result == 0) { - result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1])); - } - return result; - } - } - - /** - * Purge the necessary caches with all entries that occurred prior to the specified change number. - *

- * NOTE: This method is abstract despite the code used by both all implementations being identical. - * This is because the method needed {@code entrySet()} is made available on two different concrete - * interfaces between the embedded and remote cache implementations, and therefore we need to access - * this method from the concrete implementation classes (RemoteCache and Cache) rather than from - * the common class used by CacheProvider (BasicCache). - * - * @param minCacheScn the minimum system change number to keep entries until - */ - protected void purgeCache(Scn minCacheScn) { - getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); - getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); - } - @Override protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws InterruptedException { @@ -378,39 +368,22 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } } - @Override - protected Iterator getTransactionEventIterator(T transaction) { - return new Iterator<>() { - private final int count = transaction.getNumberOfEvents(); + /** + * Purge the necessary caches with all entries that occurred prior to the specified change number. + * + * @param minCacheScn the minimum system change number to keep entries until + */ + protected void purgeCache(Scn minCacheScn) { + getProcessedTransactionsCache().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); + getSchemaChangesCache().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); + } - private LogMinerEvent nextEvent; - private int index = 0; - - @Override - public boolean hasNext() { - while (index < count) { - nextEvent = getEventCache().get(transaction.getEventId(index)); - if (nextEvent == null) { - LOGGER.debug("Event {} must have been undone, skipped.", index); - // There are situations where an event will be removed from the cache when it is - // undone by the undo-row flag. The event id isn't re-used in this use case so - // the iterator automatically detects null entries and skips them by advancing - // to the next entry until either we've reached the number of events or detected - // a non-null entry available for return - index++; - continue; - } - break; - } - return index < count; - } - - @Override - public LogMinerEvent next() { - index++; - return nextEvent; - } - }; + private void removeEventsWithTransaction(T transaction) { + // Clear the event queue for the transaction + for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { + getEventCache().remove(transaction.getEventId(i)); + } + inMemoryPendingTransactionsCache.remove(transaction.getTransactionId()); } private void removeEvents(LogMinerEventRow row, List eventKeys) { @@ -426,4 +399,33 @@ private void removeEvents(LogMinerEventRow row, List eventKeys) { Loggings.logWarningAndTraceRecord(LOGGER, row, "Cannot undo change on table '{}' since event with row-id {} was not found.", row.getTableId(), row.getRowId()); } + + /** + * A comparator that guarantees that the sort order applied to event keys is such that + * they are treated as numerical values, sorted as numeric values rather than strings + * which would allow "100" to come before "9". + */ + private static class EventKeySortComparator implements Comparator { + + public static final EventKeySortComparator INSTANCE = new EventKeySortComparator(); + + @Override + public int compare(String o1, String o2) { + if (o1 == null || !o1.contains("-")) { + throw new IllegalStateException("Event Key must be in the format of -"); + } + if (o2 == null || !o2.contains("-")) { + throw new IllegalStateException("Event Key must be in the format of -"); + } + final String[] s1 = o1.split("-"); + final String[] s2 = o2.split("-"); + + // Compare transaction ids, these should generally be identical. + int result = s1[0].compareTo(s2[0]); + if (result == 0) { + result = Long.compare(Long.parseLong(s1[1]), Long.parseLong(s2[1])); + } + return result; + } + } }