DBZ-7758 Move configuration to XML
This commit is contained in:
parent
7c3aa19dd5
commit
b21b057729
@ -85,11 +85,28 @@
|
||||
<artifactId>protobuf-java-util</artifactId>
|
||||
<version>${version.com.google.protobuf}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Ehcache Dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.ehcache</groupId>
|
||||
<artifactId>ehcache</artifactId>
|
||||
<version>3.9.6</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.xml.bind</groupId>
|
||||
<artifactId>jaxb-api</artifactId>
|
||||
<version>2.3.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.xml.bind</groupId>
|
||||
<artifactId>jaxb-core</artifactId>
|
||||
<version>2.3.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.sun.xml.bind</groupId>
|
||||
<artifactId>jaxb-impl</artifactId>
|
||||
<version>2.3.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Testing -->
|
||||
<dependency>
|
||||
|
@ -624,34 +624,50 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
|
||||
.withValidation(Field::isNonNegativeInteger)
|
||||
.withDescription("The number of attempts to retry database errors during snapshots before failing.");
|
||||
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH = Field.create("log.mining.buffer.ehcache.storage.path")
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG = Field.create("log.mining.buffer.ehcache.global.config")
|
||||
.withDisplayName("Defines any global configuration for the Ehcache transaction buffer")
|
||||
.withType(Type.STRING)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheFieldRequired)
|
||||
.withDescription("The persistent location where the Ehcache off-heap files should be stored.");
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withValidation(OracleConnectorConfig::validateEhCacheGlobalConfigField)
|
||||
.withDescription("Specifies any Ehcache global configurations such as services or persistence. " +
|
||||
"This cannot include <cache/> nor <default-serializers/> tags as these are managed by Debezium.");
|
||||
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.transactions.bytes")
|
||||
.withType(Type.LONG)
|
||||
.withDefault(0)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
|
||||
.withDescription("The size of the Ehcache transaction cache in bytes");
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG = Field.create("log.mining.buffer.ehcache.transactions.config")
|
||||
.withDisplayName("Defines the partial ehcache configuration for the transaction cache")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired)
|
||||
.withDescription("Specifies the inner body the Ehcache <cache/> tag for the transaction cache, but " +
|
||||
"should not include the <key-type/> nor the <value-type/> attributes as these are managed by Debezium.");
|
||||
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.processedtransactions.bytes")
|
||||
.withType(Type.LONG)
|
||||
.withDefault(0)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
|
||||
.withDescription("The size of the Ehcache processed transaction cache in bytes");
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG = Field.create("log.mining.buffer.ehcache.processedtransactions.config")
|
||||
.withDisplayName("Defines the partial ehcache configuration for the processed transaction cache")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired)
|
||||
.withDescription("Specifies the inner body the Ehcache <cache/> tag for the processed transaction cache, but " +
|
||||
"should not include the <key-type/> nor the <value-type/> attributes as these are managed by Debezium.");
|
||||
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES = Field.create("log.mining.buffer.ehcache.cache.schemachanges.bytes")
|
||||
.withType(Type.LONG)
|
||||
.withDefault(0)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
|
||||
.withDescription("The size of the Ehcache schema changes cache in bytes");
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG = Field.create("log.mining.buffer.ehcache.schemachanges.config")
|
||||
.withDisplayName("Defines the partial ehcache configuration for the schema changes cache")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired)
|
||||
.withDescription("Specifies the inner body the Ehcache <cache/> tag for the schema changes cache, but " +
|
||||
"should not include the <key-type/> nor the <value-type/> attributes as these are managed by Debezium.");
|
||||
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES = Field.create("log.mining.buffer.ehcache.cache.events.bytes")
|
||||
.withType(Type.LONG)
|
||||
.withDefault(0)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
|
||||
.withDescription("The size of the Ehcache transaction events cache in bytes");
|
||||
public static final Field LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG = Field.create("log.mining.buffer.ehcache.events.config")
|
||||
.withDisplayName("Defines the partial ehcache configuration for the events cache")
|
||||
.withType(Type.STRING)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withValidation(OracleConnectorConfig::validateEhcacheConfigFieldRequired)
|
||||
.withDescription("Specifies the inner body the Ehcache <cache/> tag for the events cache, but " +
|
||||
"should not include the <key-type/> nor the <value-type/> attributes as these are managed by Debezium.");
|
||||
|
||||
@Deprecated
|
||||
public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine")
|
||||
@ -738,11 +754,11 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
|
||||
OLR_PORT,
|
||||
SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES,
|
||||
LOG_MINING_CONTINUOUS_MINE,
|
||||
LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH,
|
||||
LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES,
|
||||
LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES,
|
||||
LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES,
|
||||
LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES)
|
||||
LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG,
|
||||
LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG,
|
||||
LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG,
|
||||
LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG,
|
||||
LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG)
|
||||
.events(SOURCE_INFO_STRUCT_MAKER)
|
||||
.create();
|
||||
|
||||
@ -811,12 +827,7 @@ public static ConfigDef configDef() {
|
||||
private final Set<String> logMiningSchemaChangesUsernameExcludes;
|
||||
private final Boolean logMiningIncludeRedoSql;
|
||||
private final boolean logMiningContinuousMining;
|
||||
|
||||
private final String logMiningBufferEhcacheStoragePath;
|
||||
private final long logMiningBufferEhcacheTransactionCacheSizeBytes;
|
||||
private final long logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes;
|
||||
private final long logMiningBufferEhcacheSchemaChangesCacheSizeBytes;
|
||||
private final long logMiningBufferEhcacheEventsCacheSizeBytes;
|
||||
private final Configuration logMiningEhCacheConfiguration;
|
||||
|
||||
private final String openLogReplicatorSource;
|
||||
private final String openLogReplicatorHostname;
|
||||
@ -887,11 +898,7 @@ public OracleConnectorConfig(Configuration config) {
|
||||
this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL);
|
||||
this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE);
|
||||
|
||||
this.logMiningBufferEhcacheStoragePath = config.getString(LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH);
|
||||
this.logMiningBufferEhcacheTransactionCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES);
|
||||
this.logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES);
|
||||
this.logMiningBufferEhcacheSchemaChangesCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES);
|
||||
this.logMiningBufferEhcacheEventsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES);
|
||||
this.logMiningEhCacheConfiguration = config.subset("log.mining.buffer.ehcache", false);
|
||||
|
||||
// OpenLogReplicator
|
||||
this.openLogReplicatorSource = config.getString(OLR_SOURCE);
|
||||
@ -2000,24 +2007,14 @@ public Integer getOpenLogReplicatorPort() {
|
||||
return openLogReplicatorPort;
|
||||
}
|
||||
|
||||
public String getLogMiningBufferEhcacheStoragePath() {
|
||||
return logMiningBufferEhcacheStoragePath;
|
||||
}
|
||||
|
||||
public long getLogMiningBufferEhcacheTransactionCacheSizeBytes() {
|
||||
return logMiningBufferEhcacheTransactionCacheSizeBytes;
|
||||
}
|
||||
|
||||
public long getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes() {
|
||||
return logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes;
|
||||
}
|
||||
|
||||
public long getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes() {
|
||||
return logMiningBufferEhcacheSchemaChangesCacheSizeBytes;
|
||||
}
|
||||
|
||||
public long getLogMiningBufferEhcacheEventsCacheSizeBytes() {
|
||||
return logMiningBufferEhcacheEventsCacheSizeBytes;
|
||||
/**
|
||||
* Get the Ehcache buffer configuration, which is all attributes under the configuration prefix
|
||||
* "log.mining.buffer.ehcache" namespace, with the prefix removed.
|
||||
*
|
||||
* @return the ehcache transaction buffer configuration, never {@code null}
|
||||
*/
|
||||
public Configuration getLogMiningEhcacheConfiguration() {
|
||||
return logMiningEhCacheConfiguration;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -2207,7 +2204,24 @@ public static int validateLogMiningStrategy(Configuration config, Field field, V
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static int validateEhcacheFieldRequired(Configuration config, Field field, ValidationOutput problems) {
|
||||
public static int validateEhCacheGlobalConfigField(Configuration config, Field field, ValidationOutput problems) {
|
||||
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
|
||||
if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) {
|
||||
// The string cannot include any `<cache ` or `<default-serializers` tags.
|
||||
final String globalConfig = config.getString(LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, "").toLowerCase();
|
||||
if (!Strings.isNullOrEmpty(globalConfig)) {
|
||||
if (globalConfig.contains("<cache") || globalConfig.contains("<default-serializers")) {
|
||||
problems.accept(LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, globalConfig,
|
||||
"The ehcache global configuration should not contain a <cache/> or <default-serializers/> section");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static int validateEhcacheConfigFieldRequired(Configuration config, Field field, ValidationOutput problems) {
|
||||
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
|
||||
if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) {
|
||||
return Field.isRequired(config, field, problems);
|
||||
@ -2215,13 +2229,4 @@ public static int validateEhcacheFieldRequired(Configuration config, Field field
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static int validateEhcacheBytesFieldRequired(Configuration config, Field field, ValidationOutput problems) {
|
||||
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
|
||||
if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) {
|
||||
return Field.isPositiveLong(config, field, problems);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -5,18 +5,31 @@
|
||||
*/
|
||||
package io.debezium.connector.oracle.logminer.processor.ehcache;
|
||||
|
||||
import org.ehcache.PersistentCacheManager;
|
||||
import org.ehcache.config.CacheConfiguration;
|
||||
import org.ehcache.config.builders.CacheConfigurationBuilder;
|
||||
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG;
|
||||
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG;
|
||||
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG;
|
||||
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG;
|
||||
import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.StringReader;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
|
||||
import org.ehcache.CacheManager;
|
||||
import org.ehcache.config.builders.CacheManagerBuilder;
|
||||
import org.ehcache.config.builders.ResourcePoolsBuilder;
|
||||
import org.ehcache.config.units.MemoryUnit;
|
||||
import org.ehcache.core.internal.statistics.DefaultStatisticsService;
|
||||
import org.ehcache.expiry.ExpiryPolicy;
|
||||
import org.ehcache.spi.serialization.Serializer;
|
||||
import org.ehcache.xml.XmlConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.xml.sax.InputSource;
|
||||
|
||||
import io.debezium.DebeziumException;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.oracle.OracleConnection;
|
||||
import io.debezium.connector.oracle.OracleConnectorConfig;
|
||||
import io.debezium.connector.oracle.OracleDatabaseSchema;
|
||||
@ -27,8 +40,6 @@
|
||||
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
|
||||
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
|
||||
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
|
||||
import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer;
|
||||
import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer;
|
||||
import io.debezium.pipeline.EventDispatcher;
|
||||
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
|
||||
import io.debezium.relational.TableId;
|
||||
@ -43,7 +54,7 @@ public class EhcacheLogMinerEventProcessor extends AbstractTransactionCachingLog
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheLogMinerEventProcessor.class);
|
||||
|
||||
private final PersistentCacheManager cacheManager;
|
||||
private final CacheManager cacheManager;
|
||||
private final LogMinerCache<String, EhcacheTransaction> transactionsCache;
|
||||
private final LogMinerCache<String, LogMinerEvent> eventCache;
|
||||
private final LogMinerCache<String, String> processedTransactionsCache;
|
||||
@ -100,39 +111,58 @@ public void close() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private PersistentCacheManager createCacheManager(OracleConnectorConfig connectorConfig) {
|
||||
final String cachePath = connectorConfig.getLogMiningBufferEhcacheStoragePath();
|
||||
final long transactionCacheSize = connectorConfig.getLogMiningBufferEhcacheTransactionCacheSizeBytes();
|
||||
final long processedTransactionsCacheSize = connectorConfig.getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes();
|
||||
final long schemaChangesCacheSize = connectorConfig.getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes();
|
||||
final long eventsCacheSize = connectorConfig.getLogMiningBufferEhcacheEventsCacheSizeBytes();
|
||||
private CacheManager createCacheManager(OracleConnectorConfig connectorConfig) {
|
||||
try {
|
||||
final Configuration ehcacheConfig = connectorConfig.getLogMiningEhcacheConfiguration();
|
||||
|
||||
return CacheManagerBuilder.newCacheManagerBuilder()
|
||||
.with(CacheManagerBuilder.persistence(cachePath))
|
||||
.using(new DefaultStatisticsService())
|
||||
.withCache(TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, EhcacheTransaction.class, transactionCacheSize, null))
|
||||
.withCache(PROCESSED_TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, String.class, processedTransactionsCacheSize, null))
|
||||
.withCache(SCHEMA_CHANGES_CACHE_NAME, createCacheConfiguration(String.class, String.class, schemaChangesCacheSize, null))
|
||||
.withCache(EVENTS_CACHE_NAME, createCacheConfiguration(String.class, LogMinerEvent.class, eventsCacheSize, null))
|
||||
.withSerializer(EhcacheTransaction.class, EhcacheTransactionSerializer.class)
|
||||
.withSerializer(LogMinerEvent.class, LogMinerEventSerializer.class)
|
||||
.build(true);
|
||||
// Create the full XML configuration based on configuration template
|
||||
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
|
||||
final DocumentBuilder builder = factory.newDocumentBuilder();
|
||||
|
||||
final String xmlData = getConfigurationWithSubstitutions(ehcacheConfig);
|
||||
LOGGER.debug("Using Ehcache XML configuration:\n{}", xmlData);
|
||||
|
||||
final Document xmlDocument = builder.parse(new InputSource(new StringReader(xmlData)));
|
||||
|
||||
final CacheManager cacheManager = CacheManagerBuilder.newCacheManager(new XmlConfiguration(xmlDocument));
|
||||
cacheManager.init();
|
||||
|
||||
return cacheManager;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new DebeziumException("Failed to create Ehcache cache manager", e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getConfigurationWithSubstitutions(Configuration configuration) {
|
||||
return readConfigurationTemplate()
|
||||
.replace("${log.mining.buffer.ehcache.global.config}",
|
||||
configuration.getString(LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, ""))
|
||||
.replace("${log.mining.buffer.ehcache.transactions.config}",
|
||||
configuration.getString(LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, ""))
|
||||
.replace("${log.mining.buffer.ehcache.processedtransactions.config}",
|
||||
configuration.getString(LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, ""))
|
||||
.replace("${log.mining.buffer.ehcache.schemachanges.config}",
|
||||
configuration.getString(LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, ""))
|
||||
.replace("${log.mining.buffer.ehcache.events.config}",
|
||||
configuration.getString(LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, ""));
|
||||
}
|
||||
|
||||
private String readConfigurationTemplate() {
|
||||
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream("ehcache/configuration-template.xml")) {
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
reader = new BufferedReader(new InputStreamReader(inputStream));
|
||||
return reader.lines().collect(Collectors.joining(System.lineSeparator()));
|
||||
}
|
||||
finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new DebeziumException("Failed to read Ehcache configuration template", e);
|
||||
}
|
||||
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
private <K, V> CacheConfiguration<K, V> createCacheConfiguration(Class<K> keyClass, Class<V> valueClass, long sizeMb,
|
||||
Class<? extends Serializer<V>> valueSerializer) {
|
||||
final CacheConfigurationBuilder<K, V> builder = CacheConfigurationBuilder.newCacheConfigurationBuilder(
|
||||
keyClass,
|
||||
valueClass,
|
||||
ResourcePoolsBuilder.newResourcePoolsBuilder()
|
||||
.disk(sizeMb, MemoryUnit.B, !getConfig().isLogMiningBufferDropOnStop()))
|
||||
.withExpiry(ExpiryPolicy.NO_EXPIRY);
|
||||
|
||||
if (valueSerializer != null) {
|
||||
builder.withValueSerializer(valueSerializer);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
@ -36,9 +36,11 @@ public ByteBuffer serialize(T object) throws SerializerException {
|
||||
@Override
|
||||
public T read(ByteBuffer buffer) throws ClassNotFoundException, SerializerException {
|
||||
try (ByteArrayInputStream input = new ByteArrayInputStream(buffer.array())) {
|
||||
// todo: unsure what this magic "40" bytes represents
|
||||
if (input.skip(40) != 40) {
|
||||
throw new SerializerException("Failed to skip initial buffer payload");
|
||||
// Depending on how the data is sourced by Ehcache, it may adjust the buffer offset
|
||||
// to deal with off-heap preamble bytes. This makes sure skip the right number of
|
||||
// bytes in the stream based on the current array offset in the buffer.
|
||||
if (input.skip(buffer.arrayOffset()) != buffer.arrayOffset()) {
|
||||
throw new SerializerException("Failed to adjust buffer offset position before read");
|
||||
}
|
||||
try (SerializerInputStream stream = new SerializerInputStream(input)) {
|
||||
return deserialize(stream);
|
||||
|
@ -0,0 +1,55 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<config xmlns='http://www.ehcache.org/v3'>
|
||||
|
||||
<!--
|
||||
Defines the ehcache global configuration
|
||||
|
||||
Caches should not be defined in this configuration but instead things like
|
||||
services, persistence, etc.
|
||||
|
||||
NOTE: this configuration cannot contain the "<default-serializers>" tag.
|
||||
-->
|
||||
${log.mining.buffer.ehcache.global.config}
|
||||
|
||||
<!-- Setup transactions cache -->
|
||||
<cache alias="transactions">
|
||||
<!-- Debezium specifically sets the key/value types -->
|
||||
<key-type>java.lang.String</key-type>
|
||||
<value-type>io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheTransaction</value-type>
|
||||
${log.mining.buffer.ehcache.transactions.config}
|
||||
</cache>
|
||||
|
||||
<!-- Setup processed transactions cache -->
|
||||
<cache alias="processed-transactions">
|
||||
<!-- Debezium specifically sets the key/value types -->
|
||||
<key-type>java.lang.String</key-type>
|
||||
<value-type>java.lang.String</value-type>
|
||||
${log.mining.buffer.ehcache.processedtransactions.config}
|
||||
</cache>
|
||||
|
||||
<!-- Setup schema changes cache -->
|
||||
<cache alias="schema-changes">
|
||||
<!-- Debezium specifically sets the key/value types -->
|
||||
<key-type>java.lang.String</key-type>
|
||||
<value-type>java.lang.String</value-type>
|
||||
${log.mining.buffer.ehcache.schemachanges.config}
|
||||
</cache>
|
||||
|
||||
<!-- Setup events cache -->
|
||||
<cache alias="events">
|
||||
<!-- Debezium specifically sets the key/value types -->
|
||||
<key-type>java.lang.String</key-type>
|
||||
<value-type>io.debezium.connector.oracle.logminer.events.LogMinerEvent</value-type>
|
||||
${log.mining.buffer.ehcache.events.config}
|
||||
</cache>
|
||||
|
||||
<default-serializers>
|
||||
<serializer type="io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheTransaction">
|
||||
io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer
|
||||
</serializer>
|
||||
<serializer type="io.debezium.connector.oracle.logminer.events.LogMinerEvent">
|
||||
io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer
|
||||
</serializer>
|
||||
</default-serializers>
|
||||
|
||||
</config>
|
@ -181,11 +181,11 @@ else if (adapter().equals(ConnectorAdapter.OLR)) {
|
||||
else if (bufferType.isEhcache()) {
|
||||
final long cacheSize = 1024 * 1024 * 10; // 10Mb each
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferTypeName);
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, "./target/data");
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, cacheSize);
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, cacheSize);
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, cacheSize);
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES, cacheSize);
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_GLOBAL_CONFIG, getEhcacheGlobalCacheConfig());
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_TRANSACTIONS_CONFIG, getEhcacheBasicCacheConfig());
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_PROCESSED_TRANSACTIONS_CONFIG, getEhcacheBasicCacheConfig());
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_SCHEMA_CHANGES_CONFIG, getEhcacheBasicCacheConfig());
|
||||
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_EVENTS_CONFIG, getEhcacheBasicCacheConfig());
|
||||
}
|
||||
builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true);
|
||||
}
|
||||
@ -205,6 +205,17 @@ else if (bufferType.isEhcache()) {
|
||||
.with(AsyncEmbeddedEngine.TASK_MANAGEMENT_TIMEOUT_MS, 90_000);
|
||||
}
|
||||
|
||||
private static String getEhcacheGlobalCacheConfig() {
|
||||
return "<persistence directory=\"./target/data\"/>";
|
||||
}
|
||||
|
||||
private static String getEhcacheBasicCacheConfig() {
|
||||
return "<resources>" +
|
||||
"<heap unit=\"entries\">50</heap>" +
|
||||
"<disk unit=\"B\">10485760</disk>" +
|
||||
"</resources>";
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a connection using the default configuration, i.e. within the context of the
|
||||
* actual connector user that connectors and interacts with the database.
|
||||
|
Loading…
Reference in New Issue
Block a user