From 3803a277d677c58952658b9aa2e8cd955998499a Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 25 Oct 2023 00:12:53 -0400 Subject: [PATCH] DBZ-7047 Clean-up more cache iterator/stream call sites. --- ...tractInfinispanLogMinerEventProcessor.java | 54 ++++++++++++++----- ...eddedInfinispanLogMinerEventProcessor.java | 6 +++ ...emoteInfinispanLogMinerEventProcessor.java | 8 ++- 3 files changed, 55 insertions(+), 13 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index f21690b81..ad1ab557c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -13,10 +13,14 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.infinispan.commons.util.CloseableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +65,7 @@ public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context LogMinerStreamingChangeEventSourceMetrics metrics) { super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics); this.jdbcConnection = jdbcConnection; - this.metrics = (LogMinerStreamingChangeEventSourceMetrics) metrics; + this.metrics = metrics; this.partition = partition; this.offsetContext = offsetContext; this.dispatcher = dispatcher; @@ -74,9 +78,9 @@ public void displayCacheStatistics() { LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size()); LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size()); LOGGER.info("\tEvents : {}", getEventCache().size()); - if (!getEventCache().isEmpty()) { - for (String eventKey : getEventCache().keySet()) { - LOGGER.debug("\t\tFound Key: {}", eventKey); + if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) { + try (Stream stream = getEventCache().keySet().stream()) { + stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey)); } } } @@ -140,7 +144,9 @@ else if (!getConfig().isLobEnabled()) { } private List getTransactionKeysWithPrefix(String prefix) { - return getEventCache().keySet().stream().filter(k -> k.startsWith(prefix)).collect(Collectors.toList()); + try (Stream stream = getEventCache().keySet().stream()) { + return stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList()); + } } @Override @@ -283,11 +289,9 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp @Override protected int getTransactionEventCount(InfinispanTransaction transaction) { // todo: implement indexed keys when ISPN supports them - return (int) getEventCache() - .keySet() - .stream() - .filter(k -> k.startsWith(transaction.getTransactionId() + "-")) - .count(); + try (Stream stream = getEventCache().keySet().stream()) { + return (int) stream.filter(k -> k.startsWith(transaction.getTransactionId() + "-")).count(); + } } @Override @@ -315,8 +319,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } if (!minCacheScn.isNull()) { - getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0); - getSchemaChangesCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); + purgeCache(minCacheScn); } else { getProcessedTransactionsCache().clear(); @@ -356,6 +359,33 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter } } + /** + * Purge the necessary caches with all entries that occurred prior to the specified change number. + * + * @param minCacheScn the minimum system change number to keep entries until + */ + protected abstract void purgeCache(Scn minCacheScn); + + /** + * Helper method to remove entries that match the given predicate from a closeable iterator. + * This method guarantees that the underlying resources are released at the end of the operation. + * + * @param iterator the iterator + * @param filter the predicate + * @param the key type + * @param the value type + */ + protected void removeIf(CloseableIterator> iterator, Predicate> filter) { + try (CloseableIterator> it = iterator) { + while (it.hasNext()) { + final Map.Entry entry = it.next(); + if (filter.test(entry)) { + it.remove(); + } + } + } + } + private void removeEventsWithTransaction(InfinispanTransaction transaction) { // Clear the event queue for the transaction for (int i = 0; i < transaction.getNumberOfEvents(); ++i) { diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java index 6ef9f70db..73b9605b1 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/EmbeddedInfinispanLogMinerEventProcessor.java @@ -180,6 +180,12 @@ protected String getFirstActiveTransactionKey() { return null; } + @Override + protected void purgeCache(Scn minCacheScn) { + removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) > 0); + removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) > 0); + } + private Cache createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) { Objects.requireNonNull(cacheName); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java index df8ef8c8a..48ebfa1ed 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/RemoteInfinispanLogMinerEventProcessor.java @@ -51,7 +51,7 @@ * * @author Chris Cranford */ -public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor implements CacheProvider { +public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInfinispanLogMinerEventProcessor.class); @@ -191,6 +191,12 @@ protected String getFirstActiveTransactionKey() { return null; } + @Override + protected void purgeCache(Scn minCacheScn) { + removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) > 0); + removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0); + } + private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) { final Map clientSettings = connectorConfig.getConfig() .subset(HOTROD_CLIENT_LOOKUP_PREFIX, true)