DBZ-7047 Clean-up more cache iterator/stream call sites.

This commit is contained in:
Chris Cranford 2023-10-25 00:12:53 -04:00 committed by Chris Cranford
parent de0969899f
commit 3803a277d6
3 changed files with 55 additions and 13 deletions

View File

@ -13,10 +13,14 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,7 +65,7 @@ public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics);
this.jdbcConnection = jdbcConnection;
this.metrics = (LogMinerStreamingChangeEventSourceMetrics) metrics;
this.metrics = metrics;
this.partition = partition;
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
@ -74,9 +78,9 @@ public void displayCacheStatistics() {
LOGGER.info("\tRecent Transactions : {}", getProcessedTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty()) {
for (String eventKey : getEventCache().keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
if (!getEventCache().isEmpty() && LOGGER.isDebugEnabled()) {
try (Stream<String> stream = getEventCache().keySet().stream()) {
stream.forEach(eventKey -> LOGGER.debug("\t\tFound Key: {}", eventKey));
}
}
}
@ -140,7 +144,9 @@ else if (!getConfig().isLobEnabled()) {
}
private List<String> getTransactionKeysWithPrefix(String prefix) {
return getEventCache().keySet().stream().filter(k -> k.startsWith(prefix)).collect(Collectors.toList());
try (Stream<String> stream = getEventCache().keySet().stream()) {
return stream.filter(k -> k.startsWith(prefix)).collect(Collectors.toList());
}
}
@Override
@ -283,11 +289,9 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
@Override
protected int getTransactionEventCount(InfinispanTransaction transaction) {
// todo: implement indexed keys when ISPN supports them
return (int) getEventCache()
.keySet()
.stream()
.filter(k -> k.startsWith(transaction.getTransactionId() + "-"))
.count();
try (Stream<String> stream = getEventCache().keySet().stream()) {
return (int) stream.filter(k -> k.startsWith(transaction.getTransactionId() + "-")).count();
}
}
@Override
@ -315,8 +319,7 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
}
if (!minCacheScn.isNull()) {
getProcessedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
purgeCache(minCacheScn);
}
else {
getProcessedTransactionsCache().clear();
@ -356,6 +359,33 @@ protected Scn calculateNewStartScn(Scn endScn, Scn maxCommittedScn) throws Inter
}
}
/**
* Purge the necessary caches with all entries that occurred prior to the specified change number.
*
* @param minCacheScn the minimum system change number to keep entries until
*/
protected abstract void purgeCache(Scn minCacheScn);
/**
* Helper method to remove entries that match the given predicate from a closeable iterator.
* This method guarantees that the underlying resources are released at the end of the operation.
*
* @param iterator the iterator
* @param filter the predicate
* @param <K> the key type
* @param <V> the value type
*/
protected <K, V> void removeIf(CloseableIterator<Map.Entry<K, V>> iterator, Predicate<Map.Entry<K, V>> filter) {
try (CloseableIterator<Map.Entry<K, V>> it = iterator) {
while (it.hasNext()) {
final Map.Entry<K, V> entry = it.next();
if (filter.test(entry)) {
it.remove();
}
}
}
}
private void removeEventsWithTransaction(InfinispanTransaction transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {

View File

@ -180,6 +180,12 @@ protected String getFirstActiveTransactionKey() {
return null;
}
@Override
protected void purgeCache(Scn minCacheScn) {
removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) > 0);
removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) > 0);
}
private <K, V> Cache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);

View File

@ -51,7 +51,7 @@
*
* @author Chris Cranford
*/
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor implements CacheProvider {
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInfinispanLogMinerEventProcessor.class);
@ -191,6 +191,12 @@ protected String getFirstActiveTransactionKey() {
return null;
}
@Override
protected void purgeCache(Scn minCacheScn) {
removeIf(processedTransactionsCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) > 0);
removeIf(schemaChangesCache.entrySet().iterator(), entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) {
final Map<String, String> clientSettings = connectorConfig.getConfig()
.subset(HOTROD_CLIENT_LOOKUP_PREFIX, true)