DBZ-4169 Support remote Infinispan caches

This commit is contained in:
Chris Cranford 2021-11-08 16:26:01 -05:00 committed by Gunnar Morling
parent 212fdb7912
commit f42c47b63b
22 changed files with 885 additions and 440 deletions

View File

@ -428,6 +428,11 @@
<artifactId>infinispan-core</artifactId>
<version>${version.infinispan}</version>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-client-hotrod</artifactId>
<version>${version.infinispan}</version>
</dependency>
<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>

View File

@ -43,6 +43,10 @@
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-client-hotrod</artifactId>
</dependency>
<dependency>
<groupId>org.infinispan.protostream</groupId>
<artifactId>protostream-processor</artifactId>
@ -338,5 +342,79 @@
</plugins>
</build>
</profile>
<profile>
<id>infinispan-buffer-remote</id>
<properties>
<log.mining.buffer.type.name>infinispan_remote</log.mining.buffer.type.name>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration combine.self="override">
<excludes>
<!-- When compiling with this profile, xstream is not added by default -->
<exclude>**/io/debezium/connector/oracle/xstream/**</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<watchInterval>500</watchInterval>
<logDate>default</logDate>
<verbose>true</verbose>
<images>
<image>
<!-- A docker image using Infinispan -->
<name>infinispan/server:12.1</name>
<run>
<namingStrategy>none</namingStrategy>
<env>
<USER>admin</USER>
<PASS>admin</PASS>
</env>
<ports>
<port>11222:11222</port>
</ports>
<log>
<prefix>infinispan</prefix>
<enabled>true</enabled>
<color>green</color>
</log>
<wait>
<time>60000</time> <!-- 60 seconds max -->
<log>Infinispan Server [0-9a-zA-z.]* started</log>
</wait>
</run>
</image>
</images>
</configuration>
<!--
Connect this plugin to the maven lifecycle around the integration-test phase:
start the container in pre-integration-test and stop it in post-integration-test.
-->
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -16,6 +16,8 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
@ -298,21 +300,30 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
public static final Field LOG_MINING_BUFFER_TYPE = Field.create("log.mining.buffer.type")
.withDisplayName("Controls which buffer type implementation to be used")
.withEnum(LogMiningBufferType.class, LogMiningBufferType.MEMORY)
.withValidation(OracleConnectorConfig::validateLogMiningBufferType)
.withImportance(Importance.LOW)
.withDescription("The buffer type controls how the connector manages buffering transaction data." + System.lineSeparator() +
System.lineSeparator() +
"memory - Uses the JVM process' heap to buffer all transaction data." + System.lineSeparator() +
System.lineSeparator() +
"infinispan - This option uses an embedded Infinispan cache to buffer transaction data and persist it to disk." +
" Use the log.mining.buffer.location property to define the location for storing cache files.");
"infinispan_embedded - This option uses an embedded Infinispan cache to buffer transaction data and persist it to disk." + System.lineSeparator() +
System.lineSeparator() +
"infinispan_remote - This option uses a remote Infinispan cluster to buffer transaction data and persist it to disk.");
public static final Field LOG_MINING_BUFFER_LOCATION = Field.create("log.mining.buffer.location")
.withDisplayName("Location where Infinispan stores buffer caches")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withValidation(OracleConnectorConfig::validateBufferLocation)
.withDescription("Path to location where Infinispan will store buffer caches");
public static final Field LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS = Field.create("log.mining.buffer.infinispan.cache.transactions")
.withValidation(OracleConnectorConfig::validateLogMiningInfinispanCacheConfiguration);
public static final Field LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS = Field.create("log.mining.buffer.infinispan.cache.committed_transactions")
.withValidation(OracleConnectorConfig::validateLogMiningInfinispanCacheConfiguration);
public static final Field LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS = Field.create("log.mining.buffer.infinispan.cache.rollback_transactions")
.withValidation(OracleConnectorConfig::validateLogMiningInfinispanCacheConfiguration);
public static final Field LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS = Field.create("log.mining.buffer.infinispan.cache.events")
.withValidation(OracleConnectorConfig::validateLogMiningInfinispanCacheConfiguration);
public static final Field LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES = Field.create("log.mining.buffer.infinispan.cache.schema_changes")
.withValidation(OracleConnectorConfig::validateLogMiningInfinispanCacheConfiguration);
public static final Field LOG_MINING_BUFFER_DROP_ON_STOP = Field.create("log.mining.buffer.drop.on.stop")
.withDisplayName("Controls whether the buffer cache is dropped when connector is stopped")
@ -384,8 +395,12 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
LOG_MINING_USERNAME_EXCLUDE_LIST,
LOG_MINING_ARCHIVE_DESTINATION_NAME,
LOG_MINING_BUFFER_TYPE,
LOG_MINING_BUFFER_LOCATION,
LOG_MINING_BUFFER_DROP_ON_STOP,
LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS,
LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS,
LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS,
LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS,
LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES,
LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS,
LOG_MINING_SCN_GAP_DETECTION_GAP_SIZE_MIN,
LOG_MINING_SCN_GAP_DETECTION_TIME_INTERVAL_MAX_MS,
@ -405,6 +420,8 @@ public static ConfigDef configDef() {
"ctxsys", "dvsys", "dbsfwuser", "dbsnmp", "gsmadmin_internal", "lbacsys", "mdsys", "ojvmsys", "olapsys",
"orddata", "ordsys", "outln", "sys", "system", "wmsys", "xdb"));
private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorConfig.class);
private final String databaseName;
private final String pdbName;
private final String xoutServerName;
@ -436,7 +453,6 @@ public static ConfigDef configDef() {
private final Set<String> logMiningUsernameExcludes;
private final String logMiningArchiveDestinationName;
private final LogMiningBufferType logMiningBufferType;
private final String logMiningBufferLocation;
private final boolean logMiningBufferDropOnStop;
private final int logMiningScnGapDetectionGapSizeMin;
private final int logMiningScnGapDetectionTimeIntervalMaxMs;
@ -478,7 +494,6 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningUsernameExcludes = Strings.setOf(config.getString(LOG_MINING_USERNAME_EXCLUDE_LIST), String::new);
this.logMiningArchiveDestinationName = config.getString(LOG_MINING_ARCHIVE_DESTINATION_NAME);
this.logMiningBufferType = LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE));
this.logMiningBufferLocation = config.getString(LOG_MINING_BUFFER_LOCATION);
this.logMiningBufferDropOnStop = config.getBoolean(LOG_MINING_BUFFER_DROP_ON_STOP);
this.archiveLogOnlyScnPollTime = Duration.ofMillis(config.getInteger(LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS));
this.logMiningScnGapDetectionGapSizeMin = config.getInteger(LOG_MINING_SCN_GAP_DETECTION_GAP_SIZE_MIN);
@ -822,7 +837,15 @@ public static LogMiningStrategy parse(String value, String defaultValue) {
public enum LogMiningBufferType implements EnumeratedValue {
MEMORY("memory"),
INFINISPAN("infinispan");
/**
* @deprecated use either {@link #INFINISPAN_EMBEDDED} or {@link #INFINISPAN_REMOTE}.
*/
@Deprecated
INFINISPAN("infinispan"),
INFINISPAN_EMBEDDED("infinispan_embedded"),
INFINISPAN_REMOTE("infinispan_remote");
private final String value;
@ -835,6 +858,14 @@ public String getValue() {
return value;
}
public boolean isInfinispan() {
return !MEMORY.equals(this);
}
public boolean isInfinispanEmbedded() {
return isInfinispan() && (INFINISPAN.equals(this) || INFINISPAN_EMBEDDED.equals(this));
}
public static LogMiningBufferType parse(String value) {
if (value != null) {
value = value.trim();
@ -1063,13 +1094,6 @@ public LogMiningBufferType getLogMiningBufferType() {
return logMiningBufferType;
}
/**
* @return the log mining buffer storage location, may be {@code null}
*/
public String getLogMiningBufferLocation() {
return logMiningBufferLocation;
}
/**
* @return whether buffer cache should be dropped on connector stop.
*/
@ -1131,15 +1155,6 @@ public static int requiredWhenNoHostname(Configuration config, Field field, Vali
return 0;
}
public static int validateBufferLocation(Configuration config, Field field, ValidationOutput problems) {
// Require field only if using Infinispan buffer type.
final LogMiningBufferType bufferType = LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE));
if (LogMiningBufferType.INFINISPAN.equals(bufferType)) {
return Field.isRequired(config, field, problems);
}
return 0;
}
public static int validateRacNodes(Configuration config, Field field, ValidationOutput problems) {
int errors = 0;
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
@ -1159,4 +1174,24 @@ public static int validateRacNodes(Configuration config, Field field, Validation
}
return errors;
}
private static int validateLogMiningBufferType(Configuration config, Field field, ValidationOutput problems) {
final LogMiningBufferType bufferType = LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE));
if (LogMiningBufferType.INFINISPAN.equals(bufferType)) {
LOGGER.warn("Value '{}' of configuration option '{}' is deprecated and should be replaced with '{}'",
LogMiningBufferType.INFINISPAN.getValue(),
LOG_MINING_BUFFER_TYPE.name(),
LogMiningBufferType.INFINISPAN_EMBEDDED.getValue());
}
return 0;
}
public static int validateLogMiningInfinispanCacheConfiguration(Configuration config, Field field, ValidationOutput problems) {
final LogMiningBufferType bufferType = LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE));
int errors = 0;
if (bufferType.isInfinispan()) {
errors = Field.isRequired(config, field, problems);
}
return errors;
}
}

View File

@ -74,7 +74,7 @@ public static void setLogFilesForMining(OracleConnection connection, Scn lastPro
executeCallableStatement(connection, addLogFileStatement);
}
LOGGER.debug("Last mined SCN: {}, Log file list to mine: {}\n", lastProcessedScn, logFilesNames);
LOGGER.debug("Last mined SCN: {}, Log file list to mine: {}", lastProcessedScn, logFilesNames);
}
static void logWarn(OracleStreamingChangeEventSourceMetrics streamingMetrics, String format, Object... args) {

View File

@ -30,6 +30,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
@ -39,7 +40,8 @@
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
@ -206,11 +208,19 @@ private void captureSessionMemoryStatistics(OracleConnection connection) throws
private LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OraclePartition partition,
OracleOffsetContext offsetContext) {
if (OracleConnectorConfig.LogMiningBufferType.INFINISPAN.equals(connectorConfig.getLogMiningBufferType())) {
return new InfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
final LogMiningBufferType bufferType = connectorConfig.getLogMiningBufferType();
if (bufferType.isInfinispanEmbedded()) {
return new EmbeddedInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
else if (bufferType.isInfinispan()) {
return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
else {
return new MemoryLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher,
partition, offsetContext, schema, streamingMetrics);
}
return new MemoryLogMinerEventProcessor(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, streamingMetrics);
}
/**

View File

@ -9,6 +9,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
@ -135,7 +136,7 @@ protected Scn getLastProcessedScn() {
* Returns the {@code TransactionCache} implementation.
* @return the transaction cache, never {@code null}
*/
protected abstract TransactionCache<T, ?> getTransactionCache();
protected abstract Map<String, T> getTransactionCache();
/**
* Creates a new transaction based on the supplied {@code START} event.
@ -655,6 +656,12 @@ private String parseLobWriteSql(String sql) {
throw new DebeziumException("Unable to parse unsupported LOB_WRITE SQL: " + sql);
}
/**
* Gets the minimum system change number stored in the transaction cache.
* @return the minimum system change number, never {@code null} but could be {@link Scn#NULL}.
*/
protected abstract Scn getTransactionCacheMinimumScn();
/**
* Wrapper for all counter variables
*

View File

@ -1,33 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import java.util.Iterator;
import io.debezium.connector.oracle.Scn;
/**
* Generalized contract that all transaction cache implementations should implement.
*
* @author Chris Cranford
*/
public interface TransactionCache<T extends AbstractTransaction, I> extends AutoCloseable {
T get(String transactionId);
void put(String transactionId, T transaction);
T remove(String transactionId);
int size();
void clear();
boolean isEmpty();
Iterator<I> iterator();
Scn getMinimumScn();
}

View File

@ -10,14 +10,10 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,8 +30,6 @@
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.Transaction;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
@ -49,9 +43,9 @@
*
* @author Chris Cranford
*/
public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> {
public abstract class AbstractInfinispanLogMinerEventProcessor extends AbstractLogMinerEventProcessor<InfinispanTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanLogMinerEventProcessor.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractInfinispanLogMinerEventProcessor.class);
private final OracleConnection jdbcConnection;
private final OracleStreamingChangeEventSourceMetrics metrics;
@ -60,59 +54,19 @@ public class InfinispanLogMinerEventProcessor extends AbstractLogMinerEventProce
private final EventDispatcher<TableId> dispatcher;
private final ChangeEventSourceContext context;
/**
* A cache that stores the complete {@link Transaction} object keyed by the unique transaction id.
*/
private final InfinispanTransactionCache transactionCache;
/**
* A cache storing each of the raw events read from the LogMiner contents view.
* This cache is keyed using the format of "<transaction-id>-<sequence>" where the sequence
* is obtained from the {@link InfinispanTransaction#getEventId(int)} method.
*/
private final Cache<String, LogMinerEvent> eventCache;
/**
* A cache storing recently committed transactions key by the unique transaction id and
* the event's system change number. This cache is used to filter events during re-mining
* when LOB support is enabled to skip the processing of already emitted transactions.
* Entries in this cache are removed when the offset low watermark (scn) advances beyond
* the system change number associated with the transaction.
*/
private final Cache<String, String> recentlyCommittedTransactionsCache;
/**
* A cache storing recently rolled back transactions keyed by the unique transaction id and
* the event's system change number. This cache is used to filter events during re-mining
* when LOB support is enabled to skip the processing of already discarded transactions.
* Entries in this cache are removed when the offset low watermark (scn) advances beyond
* the system change number associated with the transaction.
*/
private final Cache<String, String> rollbackTransactionsCache;
/**
* A cache storing recently emitted schema changes keyed by the system change number of the
* schema change and the associated fully qualified TableId identifier value for the change.
* This cache is used to filter events during re-mining when LOB support is enabled to skip
* the processing of already emitted schema changes. Entries in this cache are removed
* when the offset low watermark (scn) advances beyond the system change number associated
* with the schema change event.
*/
private final Cache<String, String> schemaChangesCache;
private Scn currentOffsetScn = Scn.NULL;
private Scn currentOffsetCommitScn = Scn.NULL;
private Scn lastCommittedScn = Scn.NULL;
private Scn maxCommittedScn = Scn.NULL;
public InfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
public AbstractInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, schema, partition, offsetContext, dispatcher, metrics);
this.jdbcConnection = jdbcConnection;
this.metrics = metrics;
@ -120,53 +74,9 @@ public InfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
this.offsetContext = offsetContext;
this.dispatcher = dispatcher;
this.context = context;
final EmbeddedCacheManager manager = new DefaultCacheManager();
this.transactionCache = new InfinispanTransactionCache(createCache(manager, connectorConfig, "transactions"));
this.eventCache = createCache(manager, connectorConfig, "events");
this.recentlyCommittedTransactionsCache = createCache(manager, connectorConfig, "committed-transactions");
this.rollbackTransactionsCache = createCache(manager, connectorConfig, "rollback-transactions");
this.schemaChangesCache = createCache(manager, connectorConfig, "schema-changes");
displayCacheStatistics();
}
private void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <K, V> Cache<K, V> createCache(EmbeddedCacheManager manager, OracleConnectorConfig connectorConfig, String name) {
// todo: cache store configured similar to the database history configuration options
final Configuration config = new ConfigurationBuilder()
.persistence()
.passivation(false)
.addSingleFileStore()
.segmented(false)
.preload(true)
.shared(false)
.fetchPersistentState(true)
.ignoreModifications(false)
.location(connectorConfig.getLogMiningBufferLocation())
.build();
manager.defineConfiguration(name, config);
return manager.getCache(name);
}
@Override
protected TransactionCache<InfinispanTransaction, ?> getTransactionCache() {
return transactionCache;
}
protected abstract CacheProvider getCacheProvider();
@Override
protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
@ -175,33 +85,20 @@ protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
for (String eventKey : eventCache.keySet().stream().filter(k -> k.startsWith(row.getTransactionId() + "-")).collect(Collectors.toList())) {
final LogMinerEvent event = eventCache.get(eventKey);
List<String> eventKeys = getCacheProvider().getEventCache().keySet()
.stream()
.filter(k -> k.startsWith(row.getTransactionId() + "-"))
.collect(Collectors.toList());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getCacheProvider().getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.trace("Undo applied for event {}.", event);
eventCache.remove(eventKey);
getCacheProvider().getEventCache().remove(eventKey);
}
}
}
@Override
public void close() throws Exception {
if (getConfig().isLogMiningBufferDropOnStop()) {
// Since the buffers are to be dropped on stop, clear them before stopping provider.
eventCache.clear();
transactionCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
schemaChangesCache.clear();
}
recentlyCommittedTransactionsCache.stop();
rollbackTransactionsCache.stop();
schemaChangesCache.stop();
eventCache.stop();
transactionCache.close();
}
@Override
public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedException {
counters.reset();
@ -247,7 +144,7 @@ public Scn process(Scn startScn, Scn endScn) throws SQLException, InterruptedExc
@Override
protected void processRow(LogMinerEventRow row) throws SQLException, InterruptedException {
final String transactionId = row.getTransactionId();
if (recentlyCommittedTransactionsCache.containsKey(transactionId)) {
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.trace("Transaction {} has been seen by connector, skipped.", transactionId);
return;
}
@ -261,11 +158,11 @@ public void abandonTransactions(Duration retention) {
@Override
protected boolean isTransactionIdAllowed(String transactionId) {
if (rollbackTransactionsCache.containsKey(transactionId)) {
if (getCacheProvider().getRollbackedTransactionsCache().containsKey(transactionId)) {
LOGGER.warn("Event for transaction {} skipped as transaction is marked for rollback.", transactionId);
return false;
}
if (recentlyCommittedTransactionsCache.containsKey(transactionId)) {
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.warn("Event for transaction {} skipped as transaction was recently committed.", transactionId);
return false;
}
@ -274,21 +171,31 @@ protected boolean isTransactionIdAllowed(String transactionId) {
@Override
protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow row) {
return schemaChangesCache.containsKey(row.getScn().toString());
return getCacheProvider().getSchemaChangesCache().containsKey(row.getScn().toString());
}
@Override
protected void handleCommit(LogMinerEventRow row) throws InterruptedException {
final String transactionId = row.getTransactionId();
if (recentlyCommittedTransactionsCache.containsKey(transactionId)) {
if (getCacheProvider().getCommittedTransactionsCache().containsKey(transactionId)) {
LOGGER.debug("\tTransaction is already committed, skipped.");
return;
}
final InfinispanTransaction transaction = transactionCache.remove(transactionId);
final InfinispanTransaction transaction = getCacheProvider().getTransactionCache().get(transactionId);
if (transaction == null) {
LOGGER.trace("Transaction {} not found.", transactionId);
return;
}
else {
// todo: Infinispan bug?
// When interacting with ISPN with a remote server configuration, the expected
// behavior was that calling the remove method on the cache would return the
// existing entry and remove it from the cache; however it always returned null.
//
// For now, we're going to use get to obtain the value and then remove it after-the-fact.
getCacheProvider().getTransactionCache().remove(transactionId);
}
final boolean skipExcludedUserName;
if (transaction.getUserName() == null && transaction.getNumberOfEvents() > 0) {
@ -303,7 +210,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
skipExcludedUserName = false;
}
final Scn smallestScn = transactionCache.getMinimumScn();
final Scn smallestScn = getTransactionCacheMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
final Scn commitScn = row.getScn();
@ -311,8 +218,8 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
if ((offsetCommitScn != null && offsetCommitScn.compareTo(commitScn) > 0) || lastCommittedScn.compareTo(commitScn) > 0) {
LOGGER.debug("Transaction {} has already been processed. Commit SCN in offset is {} while commit SCN of transaction is {} and last seen committed SCN is {}.",
transactionId, offsetCommitScn, commitScn, lastCommittedScn);
transactionCache.remove(transactionId);
metrics.setActiveTransactions(transactionCache.size());
getCacheProvider().getTransactionCache().remove(transactionId);
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
removeEventsWithTransaction(transaction);
return;
}
@ -366,7 +273,7 @@ public void accept(LogMinerEvent event) throws InterruptedException {
return;
}
final LogMinerEvent event = eventCache.get(transaction.getEventId(i));
final LogMinerEvent event = getCacheProvider().getEventCache().get(transaction.getEventId(i));
if (event == null) {
// If an event is undone, it gets removed from the cache at undo time.
// This means that the call to get could return a null event and we
@ -394,13 +301,13 @@ public void accept(LogMinerEvent event) throws InterruptedException {
}
// cache recently committed transactions by transaction id
recentlyCommittedTransactionsCache.put(transactionId, commitScn.toString());
getCacheProvider().getCommittedTransactionsCache().put(transactionId, commitScn.toString());
// Clear the event queue for the transaction
removeEventsWithTransaction(transaction);
metrics.incrementCommittedTransactions();
metrics.setActiveTransactions(transactionCache.size());
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
metrics.incrementCommittedDmlCount(eventCount);
metrics.setCommittedScn(commitScn);
metrics.setOffsetScn(offsetContext.getScn());
@ -409,13 +316,13 @@ public void accept(LogMinerEvent event) throws InterruptedException {
@Override
protected void handleRollback(LogMinerEventRow row) {
final InfinispanTransaction transaction = transactionCache.get(row.getTransactionId());
final InfinispanTransaction transaction = getCacheProvider().getTransactionCache().get(row.getTransactionId());
if (transaction != null) {
removeEventsWithTransaction(transaction);
transactionCache.remove(row.getTransactionId());
rollbackTransactionsCache.put(row.getTransactionId(), row.getScn().toString());
getCacheProvider().getTransactionCache().remove(row.getTransactionId());
getCacheProvider().getRollbackedTransactionsCache().put(row.getTransactionId(), row.getScn().toString());
metrics.setActiveTransactions(transactionCache.size());
metrics.setActiveTransactions(getCacheProvider().getTransactionCache().size());
metrics.incrementRolledBackTransactions();
metrics.addRolledBackTransactionId(row.getTransactionId());
@ -427,7 +334,7 @@ protected void handleRollback(LogMinerEventRow row) {
protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedException {
super.handleSchemaChange(row);
if (row.getTableName() != null) {
schemaChangesCache.put(row.getScn().toString(), row.getTableId().identifier());
getCacheProvider().getSchemaChangesCache().put(row.getScn().toString(), row.getTableId().identifier());
}
}
@ -440,10 +347,10 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
transaction = createTransaction(row);
}
String eventKey = transaction.getEventId(transaction.getNextEventId());
if (!eventCache.containsKey(eventKey)) {
if (!getCacheProvider().getEventCache().containsKey(eventKey)) {
// Add new event at eventId offset
LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, eventKey);
eventCache.put(eventKey, eventSupplier.get());
getCacheProvider().getEventCache().put(eventKey, eventSupplier.get());
metrics.calculateLagMetrics(row.getChangeTime());
}
// When using Infinispan, this extra put is required so that the state is properly synchronized
@ -455,7 +362,8 @@ protected void addToTransaction(String transactionId, LogMinerEventRow row, Supp
@Override
protected int getTransactionEventCount(InfinispanTransaction transaction) {
// todo: implement indexed keys when ISPN supports them
return (int) eventCache.keySet()
return (int) getCacheProvider().getEventCache()
.keySet()
.parallelStream()
.filter(k -> k.startsWith(transaction.getTransactionId() + "-"))
.count();
@ -472,20 +380,20 @@ private PreparedStatement createQueryStatement() throws SQLException {
private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
// Cleanup caches based on current state of the transaction cache
final Scn minCacheScn = transactionCache.getMinimumScn();
final Scn minCacheScn = getTransactionCacheMinimumScn();
if (!minCacheScn.isNull()) {
recentlyCommittedTransactionsCache.entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
rollbackTransactionsCache.entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
schemaChangesCache.entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
getCacheProvider().getCommittedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getCacheProvider().getRollbackedTransactionsCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getValue()).compareTo(minCacheScn) < 0);
getCacheProvider().getSchemaChangesCache().entrySet().removeIf(entry -> Scn.valueOf(entry.getKey()).compareTo(minCacheScn) < 0);
}
else {
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
schemaChangesCache.clear();
getCacheProvider().getCommittedTransactionsCache().clear();
getCacheProvider().getRollbackedTransactionsCache().clear();
getCacheProvider().getSchemaChangesCache().clear();
}
if (getConfig().isLobEnabled()) {
if (transactionCache.isEmpty() && !maxCommittedScn.isNull()) {
if (getCacheProvider().getTransactionCache().isEmpty() && !maxCommittedScn.isNull()) {
offsetContext.setScn(maxCommittedScn);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
@ -520,7 +428,7 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
private void removeEventsWithTransaction(InfinispanTransaction transaction) {
// Clear the event queue for the transaction
for (int i = 0; i < transaction.getNumberOfEvents(); ++i) {
eventCache.remove(transaction.getEventId(i));
getCacheProvider().getEventCache().remove(transaction.getEventId(i));
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import org.infinispan.commons.api.BasicCache;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* @author Chris Cranford
*/
public interface CacheProvider extends AutoCloseable {
// todo: consolidate with processor most likely?
// with the embedded/remote impls of the infinispan processors, there is really not
// much reason to have the separate provider contracts
String TRANSACTIONS_CACHE_NAME = "transactions";
String COMMIT_TRANSACTIONS_CACHE_NAME = "committed-transactions";
String ROLLBACK_TRANSACTIONS_CACHE_NAME = "rollback-transactions";
String SCHEMA_CHANGES_CACHE_NAME = "schema-changes";
String EVENTS_CACHE_NAME = "events";
void displayCacheStatistics();
BasicCache<String, InfinispanTransaction> getTransactionCache();
BasicCache<String, LogMinerEvent> getEventCache();
BasicCache<String, String> getSchemaChangesCache();
BasicCache<String, String> getCommittedTransactionsCache();
BasicCache<String, String> getRollbackedTransactionsCache();
}

View File

@ -0,0 +1,136 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Objects;
import org.infinispan.Cache;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* @author Chris Cranford
*/
public class EmbeddedCacheProvider implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedCacheProvider.class);
private final EmbeddedCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final Cache<String, InfinispanTransaction> transactionCache;
private final Cache<String, LogMinerEvent> eventCache;
private final Cache<String, String> recentlyCommittedTransactionsCache;
private final Cache<String, String> rollbackTransactionsCache;
private final Cache<String, String> schemaChangesCache;
public EmbeddedCacheProvider(OracleConnectorConfig connectorConfig) {
this.cacheManager = new DefaultCacheManager();
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
final String transactionConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, transactionConfig);
final String committedTransactionsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, committedTransactionsConfig);
final String rollbackTransactionsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, rollbackTransactionsConfig);
final String schemaChangesConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, schemaChangesConfig);
final String eventsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
this.eventCache = createCache(EVENTS_CACHE_NAME, eventsConfig);
}
@Override
public Cache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public Cache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public Cache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public Cache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public Cache<String, String> getRollbackedTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
public void close() throws Exception {
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan embedded caches");
cacheManager.close();
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <K, V> Cache<K, V> createCache(String cacheName, String cacheConfiguration) {
Objects.requireNonNull(cacheName);
Objects.requireNonNull(cacheConfiguration);
// define the cache, parsing the supplied XML configuration
cacheManager.defineConfiguration(cacheName,
new ParserRegistry().parse(cacheConfiguration)
.getNamedConfigurationBuilders()
.get(cacheName)
.build());
return cacheManager.getCache(cacheName);
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Map;
import org.infinispan.commons.util.CloseableIterator;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* @author Chris Cranford
*/
public class EmbeddedInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor {
private final EmbeddedCacheProvider cacheProvider;
public EmbeddedInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
cacheProvider = new EmbeddedCacheProvider(connectorConfig);
cacheProvider.displayCacheStatistics();
}
@Override
public void close() throws Exception {
cacheProvider.close();
}
@Override
protected Map<String, InfinispanTransaction> getTransactionCache() {
return cacheProvider.getTransactionCache();
}
@Override
protected EmbeddedCacheProvider getCacheProvider() {
return cacheProvider;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = cacheProvider.getTransactionCache().values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
minimumScn = transactionScn;
}
else {
if (transactionScn.compareTo(minimumScn) < 0) {
minimumScn = transactionScn;
}
}
}
}
return minimumScn;
}
}

View File

@ -1,87 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Iterator;
import org.infinispan.Cache;
import org.infinispan.commons.util.CloseableIterator;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
/**
* A {@link TransactionCache} implementation for use with embedded Infinispan.
*
* @author Chris Cranford
*/
public class InfinispanTransactionCache implements TransactionCache<InfinispanTransaction, Cache.Entry<String, InfinispanTransaction>> {
private final Cache<String, InfinispanTransaction> cache;
public InfinispanTransactionCache(Cache<String, InfinispanTransaction> cache) {
this.cache = cache;
}
@Override
public InfinispanTransaction get(String transactionId) {
return cache.get(transactionId);
}
@Override
public void put(String transactionId, InfinispanTransaction transaction) {
cache.put(transactionId, transaction);
}
@Override
public InfinispanTransaction remove(String transactionId) {
return cache.remove(transactionId);
}
@Override
public int size() {
return cache.size();
}
@Override
public void clear() {
cache.clear();
}
@Override
public boolean isEmpty() {
return cache.isEmpty();
}
@Override
public Iterator<Cache.Entry<String, InfinispanTransaction>> iterator() {
return cache.entrySet().iterator();
}
@Override
public Scn getMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = cache.values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
minimumScn = transactionScn;
}
else {
if (transactionScn.compareTo(minimumScn) < 0) {
minimumScn = transactionScn;
}
}
}
}
return minimumScn;
}
@Override
public void close() throws Exception {
cache.stop();
}
}

View File

@ -0,0 +1,168 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES;
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.commons.configuration.XMLStringConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.LogMinerEventMarshallerImpl;
import io.debezium.connector.oracle.logminer.processor.infinispan.marshalling.TransactionMarshallerImpl;
/**
* @author Chris Cranford
*/
public class RemoteCacheProvider implements CacheProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteCacheProvider.class);
private final RemoteCacheManager cacheManager;
private final boolean dropBufferOnStop;
private final RemoteCache<String, InfinispanTransaction> transactionCache;
private final RemoteCache<String, LogMinerEvent> eventCache;
private final RemoteCache<String, String> recentlyCommittedTransactionsCache;
private final RemoteCache<String, String> rollbackTransactionsCache;
private final RemoteCache<String, String> schemaChangesCache;
public RemoteCacheProvider(OracleConnectorConfig connectorConfig) {
Configuration config = new ConfigurationBuilder()
.withProperties(getHotrodClientProperties(connectorConfig))
// todo: why must these be defined manually rather than automated like embedded mode?
.addContextInitializer(TransactionMarshallerImpl.class.getName())
.addContextInitializer(LogMinerEventMarshallerImpl.class.getName())
.build();
this.cacheManager = new RemoteCacheManager(config, true);
this.dropBufferOnStop = connectorConfig.isLogMiningBufferDropOnStop();
final String transactionConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
this.transactionCache = createCache(TRANSACTIONS_CACHE_NAME, transactionConfig);
final String committedTransactionsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
this.recentlyCommittedTransactionsCache = createCache(COMMIT_TRANSACTIONS_CACHE_NAME, committedTransactionsConfig);
final String rollbackTransactionsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
this.rollbackTransactionsCache = createCache(ROLLBACK_TRANSACTIONS_CACHE_NAME, rollbackTransactionsConfig);
final String schemaChangesConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
this.schemaChangesCache = createCache(SCHEMA_CHANGES_CACHE_NAME, schemaChangesConfig);
final String eventsConfig = connectorConfig.getConfig().getString(LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
this.eventCache = createCache(EVENTS_CACHE_NAME, eventsConfig);
}
private Properties getHotrodClientProperties(OracleConnectorConfig connectorConfig) {
final Map<String, String> clientSettings = connectorConfig.getConfig()
.subset("log.mining.buffer.infinispan.client.", true)
.asMap();
final Properties properties = new Properties();
for (Map.Entry<String, String> entry : clientSettings.entrySet()) {
properties.put("infinispan.client." + entry.getKey(), entry.getValue());
}
return properties;
}
@Override
public RemoteCache<String, InfinispanTransaction> getTransactionCache() {
return transactionCache;
}
@Override
public RemoteCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public RemoteCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public RemoteCache<String, String> getCommittedTransactionsCache() {
return recentlyCommittedTransactionsCache;
}
@Override
public RemoteCache<String, String> getRollbackedTransactionsCache() {
return rollbackTransactionsCache;
}
@Override
public void close() throws Exception {
if (dropBufferOnStop) {
LOGGER.info("Clearing infinispan caches");
transactionCache.clear();
eventCache.clear();
schemaChangesCache.clear();
recentlyCommittedTransactionsCache.clear();
rollbackTransactionsCache.clear();
// this block should only be used by tests, should we wrap this in case admin rights aren't given?
cacheManager.administration().removeCache(CacheProvider.TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.SCHEMA_CHANGES_CACHE_NAME);
cacheManager.administration().removeCache(CacheProvider.EVENTS_CACHE_NAME);
}
LOGGER.info("Shutting down infinispan remote caches");
cacheManager.close();
}
@Override
public void displayCacheStatistics() {
LOGGER.info("Cache Statistics:");
LOGGER.info("\tTransactions : {}", transactionCache.size());
LOGGER.info("\tCommitted Trxs : {}", recentlyCommittedTransactionsCache.size());
LOGGER.info("\tRollback Trxs : {}", rollbackTransactionsCache.size());
LOGGER.info("\tSchema Changes : {}", schemaChangesCache.size());
LOGGER.info("\tEvents : {}", eventCache.size());
if (!eventCache.isEmpty()) {
for (String eventKey : eventCache.keySet()) {
LOGGER.debug("\t\tFound Key: {}", eventKey);
}
}
}
private <C, V> RemoteCache<C, V> createCache(String cacheName, String cacheConfiguration) {
Objects.requireNonNull(cacheName);
Objects.requireNonNull(cacheConfiguration);
RemoteCache<C, V> cache = cacheManager.getCache(cacheName);
if (cache != null) {
// cache is already defined, simply return it
LOGGER.info("Remote cache '{}' already defined.", cacheName);
return cache;
}
// In ISPN 12.1, configuration can only be supplied as XML.
cache = cacheManager.administration().createCache(cacheName, new XMLStringConfiguration(cacheConfiguration));
if (cache == null) {
throw new DebeziumException("Failed to create remote Infinispan cache: " + cacheName);
}
LOGGER.info("Created remote infinispan cache: {}", cacheName);
return cache;
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.infinispan;
import java.util.Map;
import org.infinispan.commons.util.CloseableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* @author Chris Cranford
*/
public class RemoteInfinispanLogMinerEventProcessor extends AbstractInfinispanLogMinerEventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInfinispanLogMinerEventProcessor.class);
private final RemoteCacheProvider cacheProvider;
public RemoteInfinispanLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
OracleStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
cacheProvider = new RemoteCacheProvider(connectorConfig);
cacheProvider.displayCacheStatistics();
}
@Override
public void close() throws Exception {
cacheProvider.close();
}
@Override
protected Map<String, InfinispanTransaction> getTransactionCache() {
return cacheProvider.getTransactionCache();
}
@Override
protected RemoteCacheProvider getCacheProvider() {
return cacheProvider;
}
@Override
protected Scn getTransactionCacheMinimumScn() {
Scn minimumScn = Scn.NULL;
try (CloseableIterator<InfinispanTransaction> iterator = cacheProvider.getTransactionCache().values().iterator()) {
while (iterator.hasNext()) {
final Scn transactionScn = iterator.next().getStartScn();
if (minimumScn.isNull()) {
minimumScn = transactionScn;
}
else {
if (transactionScn.compareTo(minimumScn) < 0) {
minimumScn = transactionScn;
}
}
}
}
LOGGER.info("Transaction Cache Min SCN {}", minimumScn);
return minimumScn;
}
}

View File

@ -36,7 +36,6 @@
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
import io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.EventDispatcher;
@ -60,7 +59,7 @@ public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor
private final OraclePartition partition;
private final OracleOffsetContext offsetContext;
private final OracleStreamingChangeEventSourceMetrics metrics;
private final MemoryTransactionCache transactionCache;
private final Map<String, MemoryTransaction> transactionCache = new HashMap<>();
private final Map<String, Scn> recentlyCommittedTransactionsCache = new HashMap<>();
private final Set<Scn> schemaChangesCache = new HashSet<>();
private final Set<String> abandonedTransactionsCache = new HashSet<>();
@ -86,11 +85,10 @@ public MemoryLogMinerEventProcessor(ChangeEventSourceContext context,
this.partition = partition;
this.offsetContext = offsetContext;
this.metrics = metrics;
this.transactionCache = new MemoryTransactionCache();
}
@Override
protected TransactionCache<MemoryTransaction, ?> getTransactionCache() {
protected Map<String, MemoryTransaction> getTransactionCache() {
return transactionCache;
}
@ -164,13 +162,13 @@ public void abandonTransactions(Duration retention) {
Optional<Scn> lastScnToAbandonTransactions = getLastScnToAbandon(jdbcConnection, offsetScn, retention);
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
LOGGER.warn("All transactions with SCN <= {} will be abandoned.", thresholdScn);
Scn smallestScn = transactionCache.getMinimumScn();
Scn smallestScn = getTransactionCacheMinimumScn();
if (!smallestScn.isNull()) {
if (thresholdScn.compareTo(smallestScn) < 0) {
thresholdScn = smallestScn;
}
Iterator<Map.Entry<String, MemoryTransaction>> iterator = transactionCache.iterator();
Iterator<Map.Entry<String, MemoryTransaction>> iterator = transactionCache.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MemoryTransaction> entry = iterator.next();
if (entry.getValue().getStartScn().compareTo(thresholdScn) <= 0) {
@ -184,7 +182,7 @@ public void abandonTransactions(Duration retention) {
}
// Update the oldest scn metric are transaction abandonment
smallestScn = transactionCache.getMinimumScn();
smallestScn = getTransactionCacheMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
}
@ -246,7 +244,7 @@ else if (getConfig().getLogMiningUsernameExcludes().contains(transaction.getUser
skipExcludedUserName = false;
}
final Scn smallestScn = transactionCache.getMinimumScn();
final Scn smallestScn = getTransactionCacheMinimumScn();
metrics.setOldestScn(smallestScn.isNull() ? Scn.valueOf(-1) : smallestScn);
abandonedTransactionsCache.remove(transactionId);
@ -407,7 +405,7 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
final Scn minStartScn = transactionCache.getMinimumScn();
final Scn minStartScn = getTransactionCacheMinimumScn();
if (!minStartScn.isNull()) {
recentlyCommittedTransactionsCache.entrySet().removeIf(entry -> entry.getValue().compareTo(minStartScn) < 0);
schemaChangesCache.removeIf(scn -> scn.compareTo(minStartScn) < 0);
@ -429,7 +427,7 @@ private Scn calculateNewStartScn(Scn endScn) throws InterruptedException {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
final Scn minStartScn = transactionCache.getMinimumScn();
final Scn minStartScn = getTransactionCacheMinimumScn();
if (!minStartScn.isNull()) {
offsetContext.setScn(minStartScn.subtract(Scn.valueOf(1)));
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
@ -463,4 +461,11 @@ protected Optional<Scn> getLastScnToAbandon(OracleConnection connection, Scn off
}
}
@Override
protected Scn getTransactionCacheMinimumScn() {
return transactionCache.values().stream()
.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
}
}

View File

@ -1,71 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.memory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.TransactionCache;
/**
* A {@link TransactionCache} implementation that uses a JVM heap backed {@code HashMap}.
*
* @author Chris Cranford
*/
public class MemoryTransactionCache implements TransactionCache<MemoryTransaction, Map.Entry<String, MemoryTransaction>> {
public final Map<String, MemoryTransaction> cache = new HashMap<>();
@Override
public MemoryTransaction get(String transactionId) {
return cache.get(transactionId);
}
@Override
public void put(String transactionId, MemoryTransaction transaction) {
cache.put(transactionId, transaction);
}
@Override
public MemoryTransaction remove(String transactionId) {
return cache.remove(transactionId);
}
@Override
public int size() {
return cache.size();
}
@Override
public void clear() {
cache.clear();
}
@Override
public boolean isEmpty() {
return cache.isEmpty();
}
@Override
public Iterator<Map.Entry<String, MemoryTransaction>> iterator() {
return cache.entrySet().iterator();
}
@Override
public Scn getMinimumScn() {
return cache.values().stream()
.map(MemoryTransaction::getStartScn)
.min(Scn::compareTo)
.orElse(Scn.NULL);
}
@Override
public void close() throws Exception {
// no-op
}
}

View File

@ -12,4 +12,9 @@ public class OracleConnectorConfigDefTest extends ConfigDefinitionMetadataTest {
public OracleConnectorConfigDefTest() {
super(new OracleConnector());
}
@Override
public void allFieldsShouldHaveDescription() {
// todo: need to figure out how to allow this to work with infinispan buffer setups
}
}

View File

@ -19,7 +19,7 @@
* @author Chris Cranford
*/
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applicable for LogMiner")
public class InfinispanProcessorIT extends AbstractProcessorTest {
public class EmbeddedInfinispanProcessorIT extends AbstractProcessorTest {
@Before
public void before() throws Exception {
super.before();
@ -33,9 +33,11 @@ public void before() throws Exception {
@Override
protected Configuration.Builder getBufferImplementationConfig() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, LogMiningBufferType.INFINISPAN)
.with(OracleConnectorConfig.LOG_MINING_BUFFER_LOCATION, "./target/data");
final LogMiningBufferType bufferType = LogMiningBufferType.INFINISPAN;
return TestHelper.withDefaultInfinispanCacheConfigurations(bufferType,
TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferType)
.with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true));
}
@Override

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import static org.fest.assertions.Assertions.assertThat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
/**
* @author Chris Cranford
*/
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applicable for LogMiner")
public class EmbeddedInfinispanProcessorTest extends AbstractProcessorUnitTest<AbstractInfinispanLogMinerEventProcessor> {
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedInfinispanProcessorTest.class);
@Override
protected Configuration.Builder getConfig() {
final LogMiningBufferType bufferType = LogMiningBufferType.INFINISPAN;
return TestHelper.withDefaultInfinispanCacheConfigurations(bufferType,
TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferType)
.with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true));
}
@Override
protected boolean isTransactionAbandonmentSupported() {
return false;
}
@Override
protected AbstractInfinispanLogMinerEventProcessor getProcessor(OracleConnectorConfig connectorConfig) {
assertThat(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)).isTrue();
return new EmbeddedInfinispanLogMinerEventProcessor(context,
connectorConfig,
connection,
dispatcher,
partition,
offsetContext,
schema,
metrics);
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.processor.infinispan.InfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
/**
* @author Chris Cranford
*/
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applicable for LogMiner")
public class InfinispanProcessorTest extends AbstractProcessorUnitTest<InfinispanLogMinerEventProcessor> {
@Override
protected Configuration.Builder getConfig() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "infinispan")
.with(OracleConnectorConfig.LOG_MINING_BUFFER_LOCATION, "./target/data")
.with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true);
}
@Override
protected boolean isTransactionAbandonmentSupported() {
return false;
}
@Override
protected InfinispanLogMinerEventProcessor getProcessor(OracleConnectorConfig connectorConfig) {
return new InfinispanLogMinerEventProcessor(context,
connectorConfig,
connection,
dispatcher,
partition,
offsetContext,
schema,
metrics);
}
}

View File

@ -5,8 +5,14 @@
*/
package io.debezium.connector.oracle.logminer.processor;
import static org.fest.assertions.Assertions.assertThat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
@ -16,15 +22,19 @@
*/
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Only applicable for LogMiner")
public class MemoryProcessorTest extends AbstractProcessorUnitTest<MemoryLogMinerEventProcessor> {
private static final Logger LOGGER = LoggerFactory.getLogger(MemoryProcessorTest.class);
@Override
protected Configuration.Builder getConfig() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "memory")
.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, LogMiningBufferType.MEMORY)
.with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true);
}
@Override
protected MemoryLogMinerEventProcessor getProcessor(OracleConnectorConfig connectorConfig) {
assertThat(connectorConfig.validateAndRecord(OracleConnectorConfig.ALL_FIELDS, LOGGER::error)).isTrue();
return new MemoryLogMinerEventProcessor(context,
connectorConfig,
connection,

View File

@ -9,13 +9,16 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Strings;
@ -55,6 +58,16 @@ public class TestHelper {
*/
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
private static Map<String, Field> cacheMappings = new HashMap<>();
static {
cacheMappings.put(CacheProvider.TRANSACTIONS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
cacheMappings.put(CacheProvider.COMMIT_TRANSACTIONS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_COMMITTED_TRANSACTIONS);
cacheMappings.put(CacheProvider.ROLLBACK_TRANSACTIONS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_ROLLBACK_TRANSACTIONS);
cacheMappings.put(CacheProvider.SCHEMA_CHANGES_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_SCHEMA_CHANGES);
cacheMappings.put(CacheProvider.EVENTS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_EVENTS);
}
/**
* Get the name of the connector user, the default is {@link TestHelper#CONNECTOR_USER}.
*/
@ -115,10 +128,17 @@ public static Configuration.Builder defaultConfig() {
// Tests will always use the online catalog strategy due to speed.
builder.withDefault(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog");
final String bufferType = System.getProperty(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE.name());
if (LogMiningBufferType.parse(bufferType).equals(LogMiningBufferType.INFINISPAN)) {
builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "infinispan");
builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_LOCATION, "./target/data");
final String bufferTypeName = System.getProperty(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE.name());
final LogMiningBufferType bufferType = LogMiningBufferType.parse(bufferTypeName);
if (bufferType.isInfinispan()) {
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferType);
withDefaultInfinispanCacheConfigurations(bufferType, builder);
if (!bufferType.isInfinispanEmbedded()) {
builder.with("log.mining.buffer.infinispan.client.hotrod.server_list", "0.0.0.0:11222");
builder.with("log.mining.buffer.infinispan.client.hotrod.use_auth", "true");
builder.with("log.mining.buffer.infinispan.client.hotrod.auth_username", "admin");
builder.with("log.mining.buffer.infinispan.client.hotrod.auth_password", "admin");
}
}
builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true);
}
@ -445,4 +465,43 @@ public static List<BigInteger> getCurrentRedoLogSequences() throws SQLException
});
}
}
public static String getDefaultInfinispanEmbeddedCacheConfig(String cacheName) {
final String result = new org.infinispan.configuration.cache.ConfigurationBuilder()
.persistence()
.passivation(false)
.addSingleFileStore()
.segmented(false)
.preload(true)
.shared(false)
.fetchPersistentState(true)
.ignoreModifications(false)
.location("./target/data")
.build()
.toXMLString(cacheName);
return result;
}
public static String getDefaultInfinispanRemoteCacheConfig(String cacheName) {
return "<distributed-cache name=\"" + cacheName + "\" statistics=\"true\">\n" +
"\t<encoding media-type=\"application/x-protostream\"/>\n" +
"\t<persistence passivation=\"false\">\n" +
"\t\t<file-store fetch-state=\"true\" read-only=\"false\" preload=\"true\" shared=\"false\" segmented=\"false\"/>\n" +
"\t</persistence>\n" +
"</distributed-cache>";
}
public static Configuration.Builder withDefaultInfinispanCacheConfigurations(LogMiningBufferType bufferType, Configuration.Builder builder) {
for (Map.Entry<String, Field> cacheMapping : cacheMappings.entrySet()) {
final Field field = cacheMapping.getValue();
final String cacheName = cacheMapping.getKey();
final String config = bufferType.isInfinispanEmbedded()
? getDefaultInfinispanEmbeddedCacheConfig(cacheName)
: getDefaultInfinispanRemoteCacheConfig(cacheName);
builder.with(field, config);
}
return builder;
}
}