DBZ-4169 Consolidate CacheProvider impls into Processor impls

This commit is contained in:
Chris Cranford 2021-11-08 18:34:35 -05:00 committed by Gunnar Morling
parent ab40b77120
commit 9e52ae8137
6 changed files with 345 additions and 356 deletions

View File

@ -43,7 +43,7 @@
*
* @author Chris Cranford
*/
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> {
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
@ -76,7 +76,20 @@ public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context
this.context = context;
}
protected abstract CacheProvider getCacheProvider();
@Override
public void displayCacheStatistics() {
LOGGER.info("Overall Cache Statistics:");
LOGGER.info("\tTransactions : {}", getTransactionCache().size());
LOGGER.info("\tCommitted Trxs : {}", getCommittedTransactionsCache().size());
LOGGER.info("\tRollback Trxs : {}", getRollbackTransactionsCache().size());
LOGGER.info("\tSchema Changes : {}", getSchemaChangesCache().size());
LOGGER.info("\tEvents : {}", getEventCache().size());
if (!getEventCache().isEmpty()) {
for (String eventKey : getEventCache().keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
@Override
protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
@ -85,16 +98,16 @@ protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
List<String> eventKeys = getCacheProvider().getEventCache().keySet()
List<String> eventKeys = getEventCache().keySet()
.stream()
.filter(k -> k.startsWith(row.getTransactionId() + "-"))
.collect(Collectors.toList());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getCacheProvider().getEventCache().get(eventKey);
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.trace("Undo applied for event {}.", event);
getCacheProvider().getEventCache().remove(eventKey);
getEventCache().remove(eventKey);
}
}
}
@ -144,7 +157,7 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
@Override
protected void processRow(LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
if (getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.trace("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
@ -158,11 +171,11 @@ public void abandonTransactions(Duration retention) {
@Override
protected boolean isTransactionIdAllowed(String transactionId) {
if (getCacheProvider().getRollbackedTransactionsCache().containsKey(transactionId)) {
if (getRollbackTransactionsCache().containsKey(transactionId)) {
LOGGER.warn("Event for transaction {} skipped as transaction is marked for rollback.", transactionId);
return false;
}
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
if (getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.warn("Event for transaction {} skipped as transaction was recently committed.", transactionId);
return false;
}
@ -171,18 +184,18 @@ protected boolean isTransactionIdAllowed(String transactionId) {
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return getCacheProvider().getSchemaChangesCache().containsKey(row.getScn().toString());
return getSchemaChangesCache().containsKey(row.getScn().toString());
}
@Override
protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
final String transactionId = row.getTransactionId();
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
if (getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.debug("\tTransaction is already committed, skipped.");
return;
}
final InfinispanTransaction transaction = getCacheProvider().getTransactionCache().get(transactionId);
final InfinispanTransaction transaction = getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not found.", transactionId);
return;
@ -194,7 +207,7 @@ protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
// existing entry and remove it from the cache; however it always returned null.
//
// For now, we're going to use get to obtain the value and then remove it after-the-fact.
getCacheProvider().getTransactionCache().remove(transactionId);
getTransactionCache().remove(transactionId);
}
final boolean skipExcludedUserName;
@ -218,8 +231,8 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
if ((offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) > 0) || lastCommittedScn.compareTo(commitScn) > 0) {
LOGGER.debug("Transaction {} has already been processed. Commit SCN in offset is {} while commit SCN of transaction is {} and last seen committed SCN is {}.",
transactionId, offsetCommitScn, commitScn, lastCommittedScn);
getCacheProvider().getTransactionCache().remove(transactionId);
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
getTransactionCache().remove(transactionId);
metrics.setActiveTransactions(getTransactionCache().size());
removeEventsWithTransaction(transaction);
return;
}
@ -273,7 +286,7 @@ public void accept(LogMinerEvent event) throws InterruptedException {
return;
}
final LogMinerEvent event = getCacheProvider().getEventCache().get(transaction.getEventId(i));
final LogMinerEvent event = getEventCache().get(transaction.getEventId(i));
if (event == null) {
// If an event is undone, it gets removed from the cache at undo time.
// This means that the call to get could return a null event and we
@ -301,13 +314,13 @@ public void accept(LogMinerEvent event) throws InterruptedException {
}
// cache recently committed transactions by transaction id
getCacheProvider().getCommittedTransactionsCache().put(transactionId, commitScn.toString());
getCommittedTransactionsCache().put(transactionId, commitScn.toString());
// Clear the event queue for the transaction
removeEventsWithTransaction(transaction);
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
metrics.setActiveTransactions(getTransactionCache().size());
metrics.incrementCommittedDmlCount(eventCount);
metrics.setCommittedScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
@ -316,13 +329,13 @@ public void accept(LogMinerEvent event) throws InterruptedException {
@Override
protected void handleRollback(LogMinerEventRow row) {
final InfinispanTransaction transaction = getCacheProvider().getTransactionCache().get(row.getTransactionId());
final InfinispanTransaction transaction = getTransactionCache().get(row.getTransactionId());
if (transaction != null) {
removeEventsWithTransaction(transaction);
getCacheProvider().getTransactionCache().remove(row.getTransactionId());
getCacheProvider().getRollbackedTransactionsCache().put(row.getTransactionId(), row.getScn().toString());
getTransactionCache().remove(row.getTransactionId());
getRollbackTransactionsCache().put(row.getTransactionId(), row.getScn().toString());
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
metrics.setActiveTransactions(getTransactionCache().size());
metrics.incrementRolledBackTransactions();
metrics.addRolledBackTransactionId(row.getTransactionId());
@ -334,7 +347,7 @@ protected void handleRollback(LogMinerEventRow row) {
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null) {
getCacheProvider().getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@ -347,10 +360,10 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
transaction = createTransaction(row);
}
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!getCacheProvider().getEventCache().containsKey(eventKey)) {
if (!getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
getCacheProvider().getEventCache().put(eventKey, eventSupplier.get());
getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagMetrics(row.getChangeTime());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
@ -362,7 +375,7 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
@Override
protected int getTransactionEventCount(InfinispanTransaction transaction) {
// todo: implement indexed keys when ISPN supports them
return (int) getCacheProvider().getEventCache()
return (int) getEventCache()
.keySet()
.parallelStream()
.filter(k -> k.startsWith(transaction.getTransactionId() + "-"))
@ -382,18 +395,18 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Scn minCacheScn = getTransactionCacheMinimumScn();
if (!minCacheScn.isNull()) {
getCacheProvider().getCommittedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getCacheProvider().getRollbackedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getCacheProvider().getSchemaChangesCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
getCommittedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getRollbackTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getSchemaChangesCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
else {
getCacheProvider().getCommittedTransactionsCache().clear();
getCacheProvider().getRollbackedTransactionsCache().clear();
getCacheProvider().getSchemaChangesCache().clear();
getCommittedTransactionsCache().clear();
getRollbackTransactionsCache().clear();
getSchemaChangesCache().clear();
}
if (getConfig().isLobEnabled()) {
if (getCacheProvider().getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
if (getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
@ -428,7 +441,7 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
private void removeEventsWithTransaction(InfinispanTransaction transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
getCacheProvider().getEventCache().remove(transaction.getEventId(i));
getEventCache().remove(transaction.getEventId(i));
}
}
}

View File

@ -14,25 +14,93 @@
*/
public interface CacheProvider extends AutoCloseable {
// todo: consolidate with processor most likely?
// with the embedded/remote impls of the infinispan processors, there is really not
// much reason to have the separate provider contracts
/**
* The name for the transaction cache
*/
String TRANSACTIONS_CACHE_NAME = "transactions";
/**
* The name for the recently committed transactions cache
*/
String COMMIT_TRANSACTIONS_CACHE_NAME = "committed-transactions";
/**
* The name for the rollback transactions cache
*/
String ROLLBACK_TRANSACTIONS_CACHE_NAME = "rollback-transactions";
/**
* The name for the schema changes cache
*/
String SCHEMA_CHANGES_CACHE_NAME = "schema-changes";
/**
* The name for the LogMiner events cache
*/
String EVENTS_CACHE_NAME = "events";
/**
* Displays cache statistics
*/
void displayCacheStatistics();
/**
* Get the transaction cache
*
* <ul>
* <li>Key - The unique transaction id</li>
* <li>Value - The transaction instance</li>
* </ul>
*
* @return the transaction cache, never {@code null}
*/
BasicCache<String, InfinispanTransaction> getTransactionCache();
/**
* Get the LogMiner events cache
*
* <ul>
* <li>Key - The event id, in the format of {@code transactionId-eventSequence}</li>
* <li>Value - The raw LogMinerEvent object instance</li>
* </ul>
*
* @return the evnts cache, never {@code null}
*/
BasicCache<String, LogMinerEvent> getEventCache();
/**
* Get the Schema Changes cache
*
* <ul>
* <li>Key - The system change number of the schema change</li>
* <li>Value - The table the schema change is related to</li>
* </ul>
*
* @return the schema changes cache, never {@code null}
*/
BasicCache<String, String> getSchemaChangesCache();
/**
* Get the recently committed transactions cache
*
* <ul>
* <li>Key - The unique transaction id</li>
* <li>Value - The transaction's commit system change number</li>
* </ul>
*
* @return the committed transactions cache, never {@code null}
*/
BasicCache<String, String> getCommittedTransactionsCache();
BasicCache<String, String> getRollbackedTransactionsCache();
/**
* Get the rollback transactions cache.
*
* <ul>
* <li>Key - The unique transaction id</li>
* <li>Value - The transaction's rollback system change number</li>
* </ul>
*
* @return the rollback transactions cache, never {@code null}
*/
BasicCache<String, String> getRollbackTransactionsCache();
}

View File

@ -1,130 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Objects;
import org.infinispan.Cache;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* @author Chris Cranford
*/
public class EmbeddedCacheProvider implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedCacheProvider.class);
private final EmbeddedCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final Cache<String, InfinispanTransaction> transactionCache;
private final Cache<String, LogMinerEvent> eventCache;
private final Cache<String, String> recentlyCommittedTransactionsCache;
private final Cache<String, String> rollbackTransactionsCache;
private final Cache<String, String> schemaChangesCache;
public EmbeddedCacheProvider(OracleConnectorConfig connectorConfig) {
this.cacheManager = new DefaultCacheManager();
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
}
@Override
public Cache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public Cache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public Cache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public Cache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public Cache<String, String> getRollbackedTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
public void close() throws Exception {
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan embedded caches");
cacheManager.close();
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <K, V> Cache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
Objects.requireNonNull(cacheConfiguration);
// define the cache, parsing the supplied XML configuration
cacheManager.defineConfiguration(cacheName,
new ParserRegistry().parse(cacheConfiguration)
.getNamedConfigurationBuilders()
.get(cacheName)
.build());
return cacheManager.getCache(cacheName);
}
}

View File

@ -5,10 +5,24 @@
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Map;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Objects;
import org.infinispan.Cache;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@ -16,16 +30,32 @@
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* A concrete implementation of {@link AbstractInfinispanLogMinerEventProcessor} that uses Infinispan in
* embedded mode to store transaction and mined event data in caches.
*
* The cache configurations are supplied via connector configurations and are expected to be valid XML
* that represent parseable local cache setups for Infinispan.
*
* @author Chris Cranford
*/
public class EmbeddedInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor {
private final EmbeddedCacheProvider cacheProvider;
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedInfinispanLogMinerEventProcessor.class);
private final EmbeddedCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final Cache<String, InfinispanTransaction> transactionCache;
private final Cache<String, LogMinerEvent> eventCache;
private final Cache<String, String> recentlyCommittedTransactionsCache;
private final Cache<String, String> rollbackTransactionsCache;
private final Cache<String, String> schemaChangesCache;
public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -37,29 +67,69 @@ public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context
OracleStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
cacheProvider = new EmbeddedCacheProvider(connectorConfig);
cacheProvider.displayCacheStatistics();
LOGGER.info("Using Infinispan in embedded mode.");
this.cacheManager = new DefaultCacheManager();
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
displayCacheStatistics();
}
@Override
public void close() throws Exception {
cacheProvider.close();
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan embedded caches");
cacheManager.close();
}
@Override
protected Map<String, InfinispanTransaction> getTransactionCache() {
return cacheProvider.getTransactionCache();
public BasicCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
protected EmbeddedCacheProvider getCacheProvider() {
return cacheProvider;
public BasicCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public BasicCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public BasicCache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public BasicCache<String, String> getRollbackTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = cacheProvider.getTransactionCache().values().iterator()) {
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
@ -74,4 +144,20 @@ protected Scn getTransactionCacheMinimumScn() {
}
return minimumScn;
}
private <K, V> Cache<K, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
Objects.requireNonNull(cacheConfiguration);
// define the cache, parsing the supplied XML configuration
cacheManager.defineConfiguration(cacheName,
new ParserRegistry().parse(cacheConfiguration)
.getNamedConfigurationBuilders()
.get(cacheName)
.build());
return cacheManager.getCache(cacheName);
}
}

View File

@ -1,166 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl;
/**
* @author Chris Cranford
*/
public class RemoteCacheProvider implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteCacheProvider.class);
private final RemoteCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final RemoteCache<String, InfinispanTransaction> transactionCache;
private final RemoteCache<String, LogMinerEvent> eventCache;
private final RemoteCache<String, String> recentlyCommittedTransactionsCache;
private final RemoteCache<String, String> rollbackTransactionsCache;
private final RemoteCache<String, String> schemaChangesCache;
public RemoteCacheProvider(OracleConnectorConfig connectorConfig) {
Configuration config = new ConfigurationBuilder()
.withProperties(getHotrodClientProperties(connectorConfig))
// todo: why must these be defined manually rather than automated like embedded mode?
.addContextInitializer(TransactionMarshallerImpl.class.getName())
.addContextInitializer(LogMinerEventMarshallerImpl.class.getName())
.build();
this.cacheManager = new RemoteCacheManager(config, true);
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
}
private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) {
final Map<String, String> clientSettings = connectorConfig.getConfig()
.subset("log.mining.buffer.infinispan.client.", true)
.asMap();
final Properties properties = new Properties();
for (Map.Entry<String, String> entry : clientSettings.entrySet()) {
properties.put("infinispan.client." + entry.getKey(), entry.getValue());
if (entry.getKey().toLowerCase().endsWith(ConfigurationProperties.AUTH_USERNAME.toLowerCase())) {
// If an authentication username is supplied, enforce authentication required
properties.put(ConfigurationProperties.USE_AUTH, "true");
}
}
return properties;
}
@Override
public RemoteCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public RemoteCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public RemoteCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public RemoteCache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public RemoteCache<String, String> getRollbackedTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
public void close() throws Exception {
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan remote caches");
cacheManager.close();
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <C, V> RemoteCache<C, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
RemoteCache<C, V> cache = cacheManager.getCache(cacheName);
if (cache != null) {
// cache is already defined, simply return it
LOGGER.info("Remote cache '{}' already defined.", cacheName);
return cache;
}
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
Objects.requireNonNull(cacheConfiguration);
cache = cacheManager.administration().createCache(cacheName, new XMLStringConfiguration(cacheConfiguration));
if (cache == null) {
throw new DebeziumException("Failed to create remote Infinispan cache: " + cacheName);
}
LOGGER.info("Created remote infinispan cache: {}", cacheName);
return cache;
}
}

View File

@ -5,12 +5,29 @@
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Map;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@ -18,17 +35,34 @@
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* A concrete implementation of {@link AbstractInfinispanLogMinerEventProcessor} that uses Infinispan with
* the Hotrod client to store transaction and mined event data in caches.
*
* The cache configurations are supplied via connector configurations and are expected to be valid XML
* that represents parseable distributed cache setups for Infinispan.
*
* @author Chris Cranford
*/
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor {
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInfinispanLogMinerEventProcessor.class);
private final RemoteCacheProvider cacheProvider;
private final RemoteCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final RemoteCache<String, InfinispanTransaction> transactionCache;
private final RemoteCache<String, LogMinerEvent> eventCache;
private final RemoteCache<String, String> recentlyCommittedTransactionsCache;
private final RemoteCache<String, String> rollbackTransactionsCache;
private final RemoteCache<String, String> schemaChangesCache;
public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
@ -40,29 +74,76 @@ public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
cacheProvider = new RemoteCacheProvider(connectorConfig);
cacheProvider.displayCacheStatistics();
Configuration config = new ConfigurationBuilder()
.withProperties(getHotrodClientProperties(connectorConfig))
// todo: why must these be defined manually rather than automated like embedded mode?
.addContextInitializer(TransactionMarshallerImpl.class.getName())
.addContextInitializer(LogMinerEventMarshallerImpl.class.getName())
.build();
LOGGER.info("Using Infinispan in Hotrod client mode");
this.cacheManager = new RemoteCacheManager(config, true);
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.eventCache = createCache(EVENTS_CACHE_NAME, connectorConfig, LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
displayCacheStatistics();
}
@Override
public void close() throws Exception {
cacheProvider.close();
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan remote caches");
cacheManager.close();
}
@Override
protected Map<String, InfinispanTransaction> getTransactionCache() {
return cacheProvider.getTransactionCache();
public BasicCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
protected RemoteCacheProvider getCacheProvider() {
return cacheProvider;
public BasicCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public BasicCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public BasicCache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public BasicCache<String, String> getRollbackTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = cacheProvider.getTransactionCache().values().iterator()) {
try (CloseableIterator<InfinispanTransaction> iterator = transactionCache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
@ -75,7 +156,44 @@ protected Scn getTransactionCacheMinimumScn() {
}
}
}
LOGGER.info("Transaction Cache Min SCN {}", minimumScn);
return minimumScn;
}
private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) {
final Map<String, String> clientSettings = connectorConfig.getConfig()
.subset("log.mining.buffer.infinispan.client.", true)
.asMap();
final Properties properties = new Properties();
for (Map.Entry<String, String> entry : clientSettings.entrySet()) {
properties.put("infinispan.client." + entry.getKey(), entry.getValue());
if (entry.getKey().toLowerCase().endsWith(ConfigurationProperties.AUTH_USERNAME.toLowerCase())) {
// If an authentication username is supplied, enforce authentication required
properties.put(ConfigurationProperties.USE_AUTH, "true");
}
}
return properties;
}
private <C, V> RemoteCache<C, V> createCache(String cacheName, OracleConnectorConfig connectorConfig, Field field) {
Objects.requireNonNull(cacheName);
RemoteCache<C, V> cache = cacheManager.getCache(cacheName);
if (cache != null) {
// cache is already defined, simply return it
LOGGER.info("Remote cache '{}' already defined.", cacheName);
return cache;
}
final String cacheConfiguration = connectorConfig.getConfig().getString(field);
Objects.requireNonNull(cacheConfiguration);
cache = cacheManager.administration().createCache(cacheName, new XMLStringConfiguration(cacheConfiguration));
if (cache == null) {
throw new DebeziumException("Failed to create remote Infinispan cache: " + cacheName);
}
LOGGER.info("Created remote infinispan cache: {}", cacheName);
return cache;
}
}