DBZ-7153 - memory cache recreated during startup (from ISPN cache)

This commit is contained in:
Jiri Kulhanek 2023-11-17 12:48:15 +01:00 committed by Chris Cranford
parent 3508913a3d
commit 371237d167
4 changed files with 24 additions and 2 deletions

View File

@ -76,6 +76,21 @@ public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context
AbstractInfinispanLogMinerEventProcessor.instance = this; AbstractInfinispanLogMinerEventProcessor.instance = this;
} }
protected void reCreateInMemoryCache() {
try (Stream<String> trStream = getTransactionCache().keySet().stream()) {
trStream.forEach(tr -> {
try (Stream<String> eventStream = getEventCache().keySet().stream()) {
int count = (int) eventStream.filter(k -> k.startsWith(tr + "-")).count();
LOGGER.info("Re-creating in memory cache of event count for transaction '" + tr + "'. No of events found: " + count);
inMemoryPendingTransactionsCache.initKey(tr, count);
}
});
}
}
/**
* Can be used for reporting in Debezium Embedded mode
*/
public static void logCacheStats() { public static void logCacheStats() {
AbstractInfinispanLogMinerEventProcessor.instance.displayCacheStatistics(); AbstractInfinispanLogMinerEventProcessor.instance.displayCacheStatistics();
} }
@ -287,7 +302,7 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey); LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getEventCache().put(eventKey, eventSupplier.get()); getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagFromSource(row.getChangeTime()); metrics.calculateLagFromSource(row.getChangeTime());
inMemoryPendingTransactionsCache.put(transaction.getTransactionId()); inMemoryPendingTransactionsCache.putOrIncrement(transaction.getTransactionId());
} }
// When using Infinispan, this extra put is required so that the state is properly synchronized // When using Infinispan, this extra put is required so that the state is properly synchronized
getTransactionCache().put(transactionId, transaction); getTransactionCache().put(transactionId, transaction);

View File

@ -83,6 +83,7 @@ public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES); this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS); this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
reCreateInMemoryCache();
displayCacheStatistics(); displayCacheStatistics();
} }

View File

@ -23,7 +23,7 @@ Integer getNumPending(String transactionId) {
} }
} }
String put(String transactionId) { String putOrIncrement(String transactionId) {
Integer i = pendingTransactionInEventsCache.get(transactionId); Integer i = pendingTransactionInEventsCache.get(transactionId);
if (i == null) { if (i == null) {
pendingTransactionInEventsCache.put(transactionId, 1); pendingTransactionInEventsCache.put(transactionId, 1);
@ -34,7 +34,12 @@ String put(String transactionId) {
return transactionId; return transactionId;
} }
public void initKey(String transactionId, int count) {
pendingTransactionInEventsCache.put(transactionId, count);
}
public Integer remove(String transactionId) { public Integer remove(String transactionId) {
return pendingTransactionInEventsCache.remove(transactionId); return pendingTransactionInEventsCache.remove(transactionId);
} }
} }

View File

@ -94,6 +94,7 @@ public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES); this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS); this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
reCreateInMemoryCache();
displayCacheStatistics(); displayCacheStatistics();
} }