diff --git a/debezium-connector-oracle/pom.xml b/debezium-connector-oracle/pom.xml index aac0fac6a..35cc5e74c 100644 --- a/debezium-connector-oracle/pom.xml +++ b/debezium-connector-oracle/pom.xml @@ -85,11 +85,28 @@ protobuf-java-util ${version.com.google.protobuf} + + org.ehcache ehcache 3.9.6 + + javax.xml.bind + jaxb-api + 2.3.1 + + + com.sun.xml.bind + jaxb-core + 2.3.0 + + + com.sun.xml.bind + jaxb-impl + 2.3.1 + diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 59f17e513..5fb7fb178 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -624,34 +624,50 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withValidation(Field::isNonNegativeInteger) .withDescription("The number of attempts to retry database errors during snapshots before failing."); - public static final Field LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH = Field.create("log.mining.buffer.ehcache.storage.path") + public static final Field LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG = Field.create("log.mining.buffer.ehcache.global.config") + .withDisplayName("Defines any global configuration for the Ehcache transaction buffer") .withType(Type.STRING) - .withValidation(OracleConnectorConfig::validateEhcacheFieldRequired) - .withDescription("The persistent location where the Ehcache off-heap files should be stored."); + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateEhCacheGlobalConfigField) + .withDescription("Specifies any Ehcache global configurations such as services or persistence. " + + "This cannot include nor tags as these are managed by Debezium."); - public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.transactions.bytes") - .withType(Type.LONG) - .withDefault(0) - .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) - .withDescription("The size of the Ehcache transaction cache in bytes"); + public static final Field LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG = Field.create("log.mining.buffer.ehcache.transactions.config") + .withDisplayName("Defines the partial ehcache configuration for the transaction cache") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired) + .withDescription("Specifies the inner body the Ehcache tag for the transaction cache, but " + + "should not include the nor the attributes as these are managed by Debezium."); - public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.processedtransactions.bytes") - .withType(Type.LONG) - .withDefault(0) - .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) - .withDescription("The size of the Ehcache processed transaction cache in bytes"); + public static final Field LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG = Field.create("log.mining.buffer.ehcache.processedtransactions.config") + .withDisplayName("Defines the partial ehcache configuration for the processed transaction cache") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired) + .withDescription("Specifies the inner body the Ehcache tag for the processed transaction cache, but " + + "should not include the nor the attributes as these are managed by Debezium."); - public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES = Field.create("log.mining.buffer.ehcache.cache.schemachanges.bytes") - .withType(Type.LONG) - .withDefault(0) - .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) - .withDescription("The size of the Ehcache schema changes cache in bytes"); + public static final Field LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG = Field.create("log.mining.buffer.ehcache.schemachanges.config") + .withDisplayName("Defines the partial ehcache configuration for the schema changes cache") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired) + .withDescription("Specifies the inner body the Ehcache tag for the schema changes cache, but " + + "should not include the nor the attributes as these are managed by Debezium."); - public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES = Field.create("log.mining.buffer.ehcache.cache.events.bytes") - .withType(Type.LONG) - .withDefault(0) - .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) - .withDescription("The size of the Ehcache transaction events cache in bytes"); + public static final Field LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG = Field.create("log.mining.buffer.ehcache.events.config") + .withDisplayName("Defines the partial ehcache configuration for the events cache") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired) + .withDescription("Specifies the inner body the Ehcache tag for the events cache, but " + + "should not include the nor the attributes as these are managed by Debezium."); @Deprecated public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine") @@ -738,11 +754,11 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector OLR_PORT, SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES, LOG_MINING_CONTINUOUS_MINE, - LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, - LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, - LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, - LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, - LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES) + LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, + LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, + LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, + LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, + LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG) .events(SOURCE_INFO_STRUCT_MAKER) .create(); @@ -811,12 +827,7 @@ public static ConfigDef configDef() { private final Set logMiningSchemaChangesUsernameExcludes; private final Boolean logMiningIncludeRedoSql; private final boolean logMiningContinuousMining; - - private final String logMiningBufferEhcacheStoragePath; - private final long logMiningBufferEhcacheTransactionCacheSizeBytes; - private final long logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes; - private final long logMiningBufferEhcacheSchemaChangesCacheSizeBytes; - private final long logMiningBufferEhcacheEventsCacheSizeBytes; + private final Configuration logMiningEhCacheConfiguration; private final String openLogReplicatorSource; private final String openLogReplicatorHostname; @@ -887,11 +898,7 @@ public OracleConnectorConfig(Configuration config) { this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL); this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE); - this.logMiningBufferEhcacheStoragePath = config.getString(LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH); - this.logMiningBufferEhcacheTransactionCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES); - this.logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES); - this.logMiningBufferEhcacheSchemaChangesCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES); - this.logMiningBufferEhcacheEventsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES); + this.logMiningEhCacheConfiguration = config.subset("log.mining.buffer.ehcache", false); // OpenLogReplicator this.openLogReplicatorSource = config.getString(OLR_SOURCE); @@ -2000,24 +2007,14 @@ public Integer getOpenLogReplicatorPort() { return openLogReplicatorPort; } - public String getLogMiningBufferEhcacheStoragePath() { - return logMiningBufferEhcacheStoragePath; - } - - public long getLogMiningBufferEhcacheTransactionCacheSizeBytes() { - return logMiningBufferEhcacheTransactionCacheSizeBytes; - } - - public long getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes() { - return logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes; - } - - public long getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes() { - return logMiningBufferEhcacheSchemaChangesCacheSizeBytes; - } - - public long getLogMiningBufferEhcacheEventsCacheSizeBytes() { - return logMiningBufferEhcacheEventsCacheSizeBytes; + /** + * Get the Ehcache buffer configuration, which is all attributes under the configuration prefix + * "log.mining.buffer.ehcache" namespace, with the prefix removed. + * + * @return the ehcache transaction buffer configuration, never {@code null} + */ + public Configuration getLogMiningEhcacheConfiguration() { + return logMiningEhCacheConfiguration; } @Override @@ -2207,7 +2204,24 @@ public static int validateLogMiningStrategy(Configuration config, Field field, V return 0; } - public static int validateEhcacheFieldRequired(Configuration config, Field field, ValidationOutput problems) { + public static int validateEhCacheGlobalConfigField(Configuration config, Field field, ValidationOutput problems) { + if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { + if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) { + // The string cannot include any ` or section"); + return 1; + } + } + } + } + return 0; + } + + public static int validateEhcacheConfigFieldRequired(Configuration config, Field field, ValidationOutput problems) { if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) { return Field.isRequired(config, field, problems); @@ -2215,13 +2229,4 @@ public static int validateEhcacheFieldRequired(Configuration config, Field field } return 0; } - - public static int validateEhcacheBytesFieldRequired(Configuration config, Field field, ValidationOutput problems) { - if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { - if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) { - return Field.isPositiveLong(config, field, problems); - } - } - return 0; - } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java index 102b07b82..ac9713343 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java @@ -5,18 +5,31 @@ */ package io.debezium.connector.oracle.logminer.processor.ehcache; -import org.ehcache.PersistentCacheManager; -import org.ehcache.config.CacheConfiguration; -import org.ehcache.config.builders.CacheConfigurationBuilder; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.util.stream.Collectors; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.ehcache.CacheManager; import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.config.units.MemoryUnit; -import org.ehcache.core.internal.statistics.DefaultStatisticsService; -import org.ehcache.expiry.ExpiryPolicy; -import org.ehcache.spi.serialization.Serializer; +import org.ehcache.xml.XmlConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.xml.sax.InputSource; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; @@ -27,8 +40,6 @@ import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.LogMinerCache; -import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer; -import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; import io.debezium.relational.TableId; @@ -43,7 +54,7 @@ public class EhcacheLogMinerEventProcessor extends AbstractTransactionCachingLog private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheLogMinerEventProcessor.class); - private final PersistentCacheManager cacheManager; + private final CacheManager cacheManager; private final LogMinerCache transactionsCache; private final LogMinerCache eventCache; private final LogMinerCache processedTransactionsCache; @@ -100,39 +111,58 @@ public void close() throws Exception { } } - private PersistentCacheManager createCacheManager(OracleConnectorConfig connectorConfig) { - final String cachePath = connectorConfig.getLogMiningBufferEhcacheStoragePath(); - final long transactionCacheSize = connectorConfig.getLogMiningBufferEhcacheTransactionCacheSizeBytes(); - final long processedTransactionsCacheSize = connectorConfig.getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes(); - final long schemaChangesCacheSize = connectorConfig.getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes(); - final long eventsCacheSize = connectorConfig.getLogMiningBufferEhcacheEventsCacheSizeBytes(); + private CacheManager createCacheManager(OracleConnectorConfig connectorConfig) { + try { + final Configuration ehcacheConfig = connectorConfig.getLogMiningEhcacheConfiguration(); - return CacheManagerBuilder.newCacheManagerBuilder() - .with(CacheManagerBuilder.persistence(cachePath)) - .using(new DefaultStatisticsService()) - .withCache(TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, EhcacheTransaction.class, transactionCacheSize, null)) - .withCache(PROCESSED_TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, String.class, processedTransactionsCacheSize, null)) - .withCache(SCHEMA_CHANGES_CACHE_NAME, createCacheConfiguration(String.class, String.class, schemaChangesCacheSize, null)) - .withCache(EVENTS_CACHE_NAME, createCacheConfiguration(String.class, LogMinerEvent.class, eventsCacheSize, null)) - .withSerializer(EhcacheTransaction.class, EhcacheTransactionSerializer.class) - .withSerializer(LogMinerEvent.class, LogMinerEventSerializer.class) - .build(true); + // Create the full XML configuration based on configuration template + final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + final DocumentBuilder builder = factory.newDocumentBuilder(); + + final String xmlData = getConfigurationWithSubstitutions(ehcacheConfig); + LOGGER.debug("Using Ehcache XML configuration:\n{}", xmlData); + + final Document xmlDocument = builder.parse(new InputSource(new StringReader(xmlData))); + + final CacheManager cacheManager = CacheManagerBuilder.newCacheManager(new XmlConfiguration(xmlDocument)); + cacheManager.init(); + + return cacheManager; + } + catch (Exception e) { + throw new DebeziumException("Failed to create Ehcache cache manager", e); + } } - @SuppressWarnings("SameParameterValue") - private CacheConfiguration createCacheConfiguration(Class keyClass, Class valueClass, long sizeMb, - Class> valueSerializer) { - final CacheConfigurationBuilder builder = CacheConfigurationBuilder.newCacheConfigurationBuilder( - keyClass, - valueClass, - ResourcePoolsBuilder.newResourcePoolsBuilder() - .disk(sizeMb, MemoryUnit.B, !getConfig().isLogMiningBufferDropOnStop())) - .withExpiry(ExpiryPolicy.NO_EXPIRY); + private String getConfigurationWithSubstitutions(Configuration configuration) { + return readConfigurationTemplate() + .replace("${log.mining.buffer.ehcache.global.config}", + configuration.getString(LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, "")) + .replace("${log.mining.buffer.ehcache.transactions.config}", + configuration.getString(LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, "")) + .replace("${log.mining.buffer.ehcache.processedtransactions.config}", + configuration.getString(LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, "")) + .replace("${log.mining.buffer.ehcache.schemachanges.config}", + configuration.getString(LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, "")) + .replace("${log.mining.buffer.ehcache.events.config}", + configuration.getString(LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, "")); + } - if (valueSerializer != null) { - builder.withValueSerializer(valueSerializer); + private String readConfigurationTemplate() { + try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("ehcache/configuration-template.xml")) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(inputStream)); + return reader.lines().collect(Collectors.joining(System.lineSeparator())); + } + finally { + if (reader != null) { + reader.close(); + } + } + } + catch (Exception e) { + throw new DebeziumException("Failed to read Ehcache configuration template", e); } - - return builder.build(); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java index 7f391fc9b..723269f09 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java @@ -36,9 +36,11 @@ public ByteBuffer serialize(T object) throws SerializerException { @Override public T read(ByteBuffer buffer) throws ClassNotFoundException, SerializerException { try (ByteArrayInputStream input = new ByteArrayInputStream(buffer.array())) { - // todo: unsure what this magic "40" bytes represents - if (input.skip(40) != 40) { - throw new SerializerException("Failed to skip initial buffer payload"); + // Depending on how the data is sourced by Ehcache, it may adjust the buffer offset + // to deal with off-heap preamble bytes. This makes sure skip the right number of + // bytes in the stream based on the current array offset in the buffer. + if (input.skip(buffer.arrayOffset()) != buffer.arrayOffset()) { + throw new SerializerException("Failed to adjust buffer offset position before read"); } try (SerializerInputStream stream = new SerializerInputStream(input)) { return deserialize(stream); diff --git a/debezium-connector-oracle/src/main/resources/ehcache/configuration-template.xml b/debezium-connector-oracle/src/main/resources/ehcache/configuration-template.xml new file mode 100644 index 000000000..fefae3752 --- /dev/null +++ b/debezium-connector-oracle/src/main/resources/ehcache/configuration-template.xml @@ -0,0 +1,55 @@ + + + + + ${log.mining.buffer.ehcache.global.config} + + + + + java.lang.String + io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheTransaction + ${log.mining.buffer.ehcache.transactions.config} + + + + + + java.lang.String + java.lang.String + ${log.mining.buffer.ehcache.processedtransactions.config} + + + + + + java.lang.String + java.lang.String + ${log.mining.buffer.ehcache.schemachanges.config} + + + + + + java.lang.String + io.debezium.connector.oracle.logminer.events.LogMinerEvent + ${log.mining.buffer.ehcache.events.config} + + + + + io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer + + + io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer + + + + \ No newline at end of file diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index e12d4ff1b..3ef4c2af3 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -181,11 +181,11 @@ else if (adapter().equals(ConnectorAdapter.OLR)) { else if (bufferType.isEhcache()) { final long cacheSize = 1024 * 1024 * 10; // 10Mb each builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferTypeName); - builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, "./target/data"); - builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, cacheSize); - builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, cacheSize); - builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, cacheSize); - builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES, cacheSize); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, getEhcacheGlobalCacheConfig()); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, getEhcacheBasicCacheConfig()); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, getEhcacheBasicCacheConfig()); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, getEhcacheBasicCacheConfig()); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, getEhcacheBasicCacheConfig()); } builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true); } @@ -205,6 +205,17 @@ else if (bufferType.isEhcache()) { .with(AsyncEmbeddedEngine.TASK_MANAGEMENT_TIMEOUT_MS, 90_000); } + private static String getEhcacheGlobalCacheConfig() { + return ""; + } + + private static String getEhcacheBasicCacheConfig() { + return "" + + "50" + + "10485760" + + ""; + } + /** * Obtain a connection using the default configuration, i.e. within the context of the * actual connector user that connectors and interacts with the database.