From 7c3aa19dd51fac10432fafd4430f4f533ff1a4dd Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 5 Aug 2024 18:40:15 -0400 Subject: [PATCH] DBZ-7758 Add Ehcache transaction buffer implementation --- debezium-connector-oracle/pom.xml | 16 ++ .../oracle/OracleConnectorConfig.java | 114 ++++++++++++- .../ehcache/EhcacheLogMinerCache.java | 104 ++++++++++++ .../EhcacheLogMinerEventProcessor.java | 138 ++++++++++++++++ .../processor/ehcache/EhcacheTransaction.java | 46 ++++++ .../AbstractEhcacheSerializer.java | 74 +++++++++ .../AbstractSerializerStream.java | 27 ++++ .../serialization/DeserializationContext.java | 40 +++++ .../serialization/DmlEventSerdesProvider.java | 49 ++++++ .../EhcacheTransactionSerializer.java | 50 ++++++ .../LobEraseEventSerdesProvider.java | 20 +++ .../LobWriteEventSerdesProvider.java | 40 +++++ .../LogMinerEventSerdesProvider.java | 43 +++++ .../LogMinerEventSerializer.java | 143 ++++++++++++++++ .../RedoSqlDmlEventSerdesProvider.java | 36 +++++ .../SelectLobLocatorSerdesProvider.java | 38 +++++ .../ehcache/serialization/SerdesProvider.java | 41 +++++ .../serialization/SerializerInputStream.java | 148 +++++++++++++++++ .../serialization/SerializerOutputStream.java | 153 ++++++++++++++++++ .../TruncateEventSerdesProvider.java | 20 +++ .../XmlBeginEventSerdesProvider.java | 36 +++++ .../XmlEndEventSerdesProvider.java | 20 +++ .../XmlWriteEventSerdesProvider.java | 38 +++++ .../connector/oracle/util/TestHelper.java | 9 ++ 24 files changed, 1439 insertions(+), 4 deletions(-) create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerCache.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheTransaction.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractSerializerStream.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DeserializationContext.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DmlEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/EhcacheTransactionSerializer.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobEraseEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobWriteEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerializer.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/RedoSqlDmlEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SelectLobLocatorSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerInputStream.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerOutputStream.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/TruncateEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlBeginEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlEndEventSerdesProvider.java create mode 100644 debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlWriteEventSerdesProvider.java diff --git a/debezium-connector-oracle/pom.xml b/debezium-connector-oracle/pom.xml index 6aebbcc05..aac0fac6a 100644 --- a/debezium-connector-oracle/pom.xml +++ b/debezium-connector-oracle/pom.xml @@ -85,6 +85,11 @@ protobuf-java-util ${version.com.google.protobuf} + + org.ehcache + ehcache + 3.9.6 + @@ -730,6 +735,17 @@ + + + oracle-ehcache + + false + + + ehcache + + + apicurio diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java index 8103975d2..59f17e513 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorConfig.java @@ -36,6 +36,7 @@ import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor; @@ -373,7 +374,9 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector System.lineSeparator() + "infinispan_embedded - This option uses an embedded Infinispan cache to buffer transaction data and persist it to disk." + System.lineSeparator() + System.lineSeparator() + - "infinispan_remote - This option uses a remote Infinispan cluster to buffer transaction data and persist it to disk."); + "infinispan_remote - This option uses a remote Infinispan cluster to buffer transaction data and persist it to disk." + System.lineSeparator() + + System.lineSeparator() + + "ehcache - Use ehcache in embedded mode to buffer transaction data and persist it to disk."); public static final Field LOG_MINING_BUFFER_TRANSACTION_EVENTS_THRESHOLD = Field.create("log.mining.buffer.transaction.events.threshold") .withDisplayName("The maximum number of events a transaction can have before being discarded.") @@ -621,6 +624,35 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector .withValidation(Field::isNonNegativeInteger) .withDescription("The number of attempts to retry database errors during snapshots before failing."); + public static final Field LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH = Field.create("log.mining.buffer.ehcache.storage.path") + .withType(Type.STRING) + .withValidation(OracleConnectorConfig::validateEhcacheFieldRequired) + .withDescription("The persistent location where the Ehcache off-heap files should be stored."); + + public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.transactions.bytes") + .withType(Type.LONG) + .withDefault(0) + .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) + .withDescription("The size of the Ehcache transaction cache in bytes"); + + public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.processedtransactions.bytes") + .withType(Type.LONG) + .withDefault(0) + .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) + .withDescription("The size of the Ehcache processed transaction cache in bytes"); + + public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES = Field.create("log.mining.buffer.ehcache.cache.schemachanges.bytes") + .withType(Type.LONG) + .withDefault(0) + .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) + .withDescription("The size of the Ehcache schema changes cache in bytes"); + + public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES = Field.create("log.mining.buffer.ehcache.cache.events.bytes") + .withType(Type.LONG) + .withDefault(0) + .withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired) + .withDescription("The size of the Ehcache transaction events cache in bytes"); + @Deprecated public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine") .withDisplayName("Should log mining session configured with CONTINUOUS_MINE setting?") @@ -705,7 +737,12 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector OLR_HOST, OLR_PORT, SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES, - LOG_MINING_CONTINUOUS_MINE) + LOG_MINING_CONTINUOUS_MINE, + LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, + LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, + LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, + LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, + LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES) .events(SOURCE_INFO_STRUCT_MAKER) .create(); @@ -775,6 +812,12 @@ public static ConfigDef configDef() { private final Boolean logMiningIncludeRedoSql; private final boolean logMiningContinuousMining; + private final String logMiningBufferEhcacheStoragePath; + private final long logMiningBufferEhcacheTransactionCacheSizeBytes; + private final long logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes; + private final long logMiningBufferEhcacheSchemaChangesCacheSizeBytes; + private final long logMiningBufferEhcacheEventsCacheSizeBytes; + private final String openLogReplicatorSource; private final String openLogReplicatorHostname; private final Integer openLogReplicatorPort; @@ -844,6 +887,12 @@ public OracleConnectorConfig(Configuration config) { this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL); this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE); + this.logMiningBufferEhcacheStoragePath = config.getString(LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH); + this.logMiningBufferEhcacheTransactionCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES); + this.logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES); + this.logMiningBufferEhcacheSchemaChangesCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES); + this.logMiningBufferEhcacheEventsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES); + // OpenLogReplicator this.openLogReplicatorSource = config.getString(OLR_SOURCE); this.openLogReplicatorHostname = config.getString(OLR_HOST); @@ -1452,6 +1501,21 @@ public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context, return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher, partition, offsetContext, schema, metrics); } + }, + + EHCACHE("ehcache") { + @Override + public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection connection, + EventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + LogMinerStreamingChangeEventSourceMetrics metrics) { + return new EhcacheLogMinerEventProcessor(context, connectorConfig, connection, dispatcher, + partition, offsetContext, schema, metrics); + } }; private final String value; @@ -1475,11 +1539,15 @@ public String getValue() { } public boolean isInfinispan() { - return !MEMORY.equals(this); + return INFINISPAN_EMBEDDED.equals(this) || INFINISPAN_REMOTE.equals(this); } public boolean isInfinispanEmbedded() { - return isInfinispan() && INFINISPAN_EMBEDDED.equals(this); + return INFINISPAN_EMBEDDED.equals(this); + } + + public boolean isEhcache() { + return EHCACHE.equals(this); } public static LogMiningBufferType parse(String value) { @@ -1932,6 +2000,26 @@ public Integer getOpenLogReplicatorPort() { return openLogReplicatorPort; } + public String getLogMiningBufferEhcacheStoragePath() { + return logMiningBufferEhcacheStoragePath; + } + + public long getLogMiningBufferEhcacheTransactionCacheSizeBytes() { + return logMiningBufferEhcacheTransactionCacheSizeBytes; + } + + public long getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes() { + return logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes; + } + + public long getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes() { + return logMiningBufferEhcacheSchemaChangesCacheSizeBytes; + } + + public long getLogMiningBufferEhcacheEventsCacheSizeBytes() { + return logMiningBufferEhcacheEventsCacheSizeBytes; + } + @Override public String getConnectorName() { return Module.name(); @@ -2118,4 +2206,22 @@ public static int validateLogMiningStrategy(Configuration config, Field field, V } return 0; } + + public static int validateEhcacheFieldRequired(Configuration config, Field field, ValidationOutput problems) { + if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { + if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) { + return Field.isRequired(config, field, problems); + } + } + return 0; + } + + public static int validateEhcacheBytesFieldRequired(Configuration config, Field field, ValidationOutput problems) { + if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) { + if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) { + return Field.isPositiveLong(config, field, problems); + } + } + return 0; + } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerCache.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerCache.java new file mode 100644 index 000000000..b641b2190 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerCache.java @@ -0,0 +1,104 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.ehcache.Cache; + +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; + +/** + * An implementation of {@link LogMinerCache} for Ehcache backed off-heap caches. + * + * @author Chris Cranford + */ +public class EhcacheLogMinerCache implements LogMinerCache { + + private final Cache cache; + + public EhcacheLogMinerCache(Cache cache) { + this.cache = cache; + } + + @Override + public void entries(Consumer>> streamConsumer) { + streamConsumer.accept(getCacheStream().map(e -> new Entry<>(e.getKey(), e.getValue()))); + } + + @Override + public void clear() { + cache.clear(); + } + + @Override + public V get(K key) { + return cache.get(key); + } + + @Override + public boolean isEmpty() { + // todo: how to improve this + return size() == 0; + } + + @Override + public boolean containsKey(K key) { + return cache.containsKey(key); + } + + @Override + public void put(K key, V value) { + cache.put(key, value); + } + + @Override + public V remove(K key) { + // Ehcache does not support remove returning the existing value; therefore directly fetch it first + V value = get(key); + cache.remove(key); + return value; + } + + @Override + public int size() { + final int[] count = { 0 }; + cache.spliterator().forEachRemaining(element -> count[0]++); + return count[0]; + } + + @Override + public void forEach(BiConsumer action) { + getCacheStream().forEach(e -> action.accept(e.getKey(), e.getValue())); + } + + @Override + public void removeIf(Predicate> predicate) { + final List keysToRemove = new ArrayList<>(); + forEach((k, v) -> { + if (predicate.test(new Entry<>(k, v))) { + keysToRemove.add(k); + } + }); + keysToRemove.forEach(cache::remove); + } + + @Override + public T streamAndReturn(Function>, T> function) { + return function.apply(getCacheStream().map(e -> new Entry<>(e.getKey(), e.getValue()))); + } + + private Stream> getCacheStream() { + return StreamSupport.stream(cache.spliterator(), false); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java new file mode 100644 index 000000000..102b07b82 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheLogMinerEventProcessor.java @@ -0,0 +1,138 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache; + +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.CacheConfiguration; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; +import org.ehcache.core.internal.statistics.DefaultStatisticsService; +import org.ehcache.expiry.ExpiryPolicy; +import org.ehcache.spi.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.oracle.OracleConnection; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleDatabaseSchema; +import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; +import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; +import io.debezium.connector.oracle.logminer.events.LogMinerEventRow; +import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor; +import io.debezium.connector.oracle.logminer.processor.LogMinerCache; +import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer; +import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; +import io.debezium.relational.TableId; + +/** + * An {@link AbstractTransactionCachingLogMinerEventProcessor} implementation for storing buffer details + * off-heap in a set of Ehcache-backed caches. + * + * @author Chris Cranford + */ +public class EhcacheLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheLogMinerEventProcessor.class); + + private final PersistentCacheManager cacheManager; + private final LogMinerCache transactionsCache; + private final LogMinerCache eventCache; + private final LogMinerCache processedTransactionsCache; + private final LogMinerCache schemaChangesCache; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public EhcacheLogMinerEventProcessor(ChangeEventSourceContext context, + OracleConnectorConfig connectorConfig, + OracleConnection jdbcConnection, + EventDispatcher dispatcher, + OraclePartition partition, + OracleOffsetContext offsetContext, + OracleDatabaseSchema schema, + LogMinerStreamingChangeEventSourceMetrics metrics) { + super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics); + LOGGER.info("Using Ehcache buffer"); + + this.cacheManager = createCacheManager(connectorConfig); + this.transactionsCache = new EhcacheLogMinerCache(cacheManager.getCache(TRANSACTIONS_CACHE_NAME, String.class, EhcacheTransaction.class)); + this.processedTransactionsCache = new EhcacheLogMinerCache(cacheManager.getCache(PROCESSED_TRANSACTIONS_CACHE_NAME, String.class, String.class)); + this.schemaChangesCache = new EhcacheLogMinerCache(cacheManager.getCache(SCHEMA_CHANGES_CACHE_NAME, String.class, String.class)); + this.eventCache = new EhcacheLogMinerCache(cacheManager.getCache(EVENTS_CACHE_NAME, String.class, LogMinerEvent.class)); + } + + @Override + protected EhcacheTransaction createTransaction(LogMinerEventRow row) { + return new EhcacheTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName(), row.getThread()); + } + + @Override + public LogMinerCache getTransactionCache() { + return transactionsCache; + } + + @Override + public LogMinerCache getEventCache() { + return eventCache; + } + + @Override + public LogMinerCache getSchemaChangesCache() { + return schemaChangesCache; + } + + @Override + public LogMinerCache getProcessedTransactionsCache() { + return processedTransactionsCache; + } + + @Override + public void close() throws Exception { + if (cacheManager != null) { + cacheManager.close(); + } + } + + private PersistentCacheManager createCacheManager(OracleConnectorConfig connectorConfig) { + final String cachePath = connectorConfig.getLogMiningBufferEhcacheStoragePath(); + final long transactionCacheSize = connectorConfig.getLogMiningBufferEhcacheTransactionCacheSizeBytes(); + final long processedTransactionsCacheSize = connectorConfig.getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes(); + final long schemaChangesCacheSize = connectorConfig.getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes(); + final long eventsCacheSize = connectorConfig.getLogMiningBufferEhcacheEventsCacheSizeBytes(); + + return CacheManagerBuilder.newCacheManagerBuilder() + .with(CacheManagerBuilder.persistence(cachePath)) + .using(new DefaultStatisticsService()) + .withCache(TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, EhcacheTransaction.class, transactionCacheSize, null)) + .withCache(PROCESSED_TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, String.class, processedTransactionsCacheSize, null)) + .withCache(SCHEMA_CHANGES_CACHE_NAME, createCacheConfiguration(String.class, String.class, schemaChangesCacheSize, null)) + .withCache(EVENTS_CACHE_NAME, createCacheConfiguration(String.class, LogMinerEvent.class, eventsCacheSize, null)) + .withSerializer(EhcacheTransaction.class, EhcacheTransactionSerializer.class) + .withSerializer(LogMinerEvent.class, LogMinerEventSerializer.class) + .build(true); + } + + @SuppressWarnings("SameParameterValue") + private CacheConfiguration createCacheConfiguration(Class keyClass, Class valueClass, long sizeMb, + Class> valueSerializer) { + final CacheConfigurationBuilder builder = CacheConfigurationBuilder.newCacheConfigurationBuilder( + keyClass, + valueClass, + ResourcePoolsBuilder.newResourcePoolsBuilder() + .disk(sizeMb, MemoryUnit.B, !getConfig().isLogMiningBufferDropOnStop())) + .withExpiry(ExpiryPolicy.NO_EXPIRY); + + if (valueSerializer != null) { + builder.withValueSerializer(valueSerializer); + } + + return builder.build(); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheTransaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheTransaction.java new file mode 100644 index 000000000..d15f2a9b3 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/EhcacheTransaction.java @@ -0,0 +1,46 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache; + +import java.time.Instant; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.processor.AbstractTransaction; + +/** + * A {@link AbstractTransaction} implementation for Ehcache off-heap caches. + * + * @author Chris Cranford + */ +public class EhcacheTransaction extends AbstractTransaction { + + private int numberOfEvents; + + public EhcacheTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, Integer redoThread) { + super(transactionId, startScn, changeTime, userName, redoThread); + start(); + } + + public EhcacheTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, Integer redoThread, int numberOfEvents) { + super(transactionId, startScn, changeTime, userName, redoThread); + this.numberOfEvents = numberOfEvents; + } + + @Override + public int getNumberOfEvents() { + return numberOfEvents; + } + + @Override + public int getNextEventId() { + return numberOfEvents++; + } + + @Override + public void start() { + numberOfEvents = 0; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java new file mode 100644 index 000000000..7f391fc9b --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractEhcacheSerializer.java @@ -0,0 +1,74 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import org.ehcache.spi.serialization.Serializer; +import org.ehcache.spi.serialization.SerializerException; + +/** + * An abstract implementation of the Ehcache {@link Serializer} interface. + * + * @author Chris Cranford + */ +public abstract class AbstractEhcacheSerializer implements Serializer { + @Override + public ByteBuffer serialize(T object) throws SerializerException { + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + try (SerializerOutputStream stream = new SerializerOutputStream(output)) { + serialize(object, stream); + } + return ByteBuffer.wrap(output.toByteArray()); + } + catch (Exception e) { + throw new SerializerException("Failed to serialize " + object.getClass().getSimpleName(), e); + } + } + + @Override + public T read(ByteBuffer buffer) throws ClassNotFoundException, SerializerException { + try (ByteArrayInputStream input = new ByteArrayInputStream(buffer.array())) { + // todo: unsure what this magic "40" bytes represents + if (input.skip(40) != 40) { + throw new SerializerException("Failed to skip initial buffer payload"); + } + try (SerializerInputStream stream = new SerializerInputStream(input)) { + return deserialize(stream); + } + } + catch (Exception e) { + throw new SerializerException("Failed to deserialize buffer", e); + } + } + + @Override + public boolean equals(T object, ByteBuffer buffer) throws ClassNotFoundException, SerializerException { + return Objects.equals(object, read(buffer)); + } + + /** + * Serialize the specified object to the output stream. + * + * @param object the object to be serialized, should not be {@code null} + * @param stream the output stream to write to, should not be {@code null} + * @throws IOException when a write operation fails on the output stream + */ + protected abstract void serialize(T object, SerializerOutputStream stream) throws IOException; + + /** + * Deserializes the data within the input stream. + * + * @param stream the input stream to read, should not be {@code null} + * @return the object deserialized from the input stream, should not be {@code null} + * @throws IOException when a read operation fails on the input stream + */ + protected abstract T deserialize(SerializerInputStream stream) throws IOException; +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractSerializerStream.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractSerializerStream.java new file mode 100644 index 000000000..ec6c87bd3 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/AbstractSerializerStream.java @@ -0,0 +1,27 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import io.debezium.connector.oracle.OracleValueConverters; + +/** + * @author Chris Cranford + */ +public abstract class AbstractSerializerStream implements AutoCloseable { + /** + * Arrays cannot be serialized with null values and so we use a sentinel value + * to mark a null element in an array. + */ + protected static final String NULL_VALUE_SENTINEL = "$$DBZ-NULL$$"; + + /** + * The supplied value arrays can now be populated with {@link OracleValueConverters#UNAVAILABLE_VALUE} + * which is simple java object. This cannot be represented as a string in the cached Ehcache record + * and so this sentinel is used to translate the runtime object representation to a serializable form + * and back during cache to object conversion. + */ + protected static final String UNAVAILABLE_VALUE_SENTINEL = "$$DBZ-UNAVAILABLE-VALUE$$"; +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DeserializationContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DeserializationContext.java new file mode 100644 index 000000000..61f7adec8 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DeserializationContext.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * A simple context object for tracking the deserialization of a {@link DataInputStream}. + * + * @author Chris Cranford + */ +public class DeserializationContext { + + private final List values = new ArrayList<>(); + + /** + * Adds a deserialized value to the context's value list. The value list is used to locate and + * construct the target object once the object stream has been consumed. Therefore, values are + * to be added and thus deserialized in the order they appear in the object's constructor. + * + * @param value the deserialized value + */ + public void addValue(Object value) { + values.add(value); + } + + /** + * Gets all the deserialized values in the context. + * + * @return list of deserialized values. + */ + public List getValues() { + return values; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DmlEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DmlEventSerdesProvider.java new file mode 100644 index 000000000..eeb1a6715 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/DmlEventSerdesProvider.java @@ -0,0 +1,49 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.DmlEvent; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link DmlEvent} types. + * + * @author Chris Cranford + */ +public class DmlEventSerdesProvider extends LogMinerEventSerdesProvider { + @Override + public Class getJavaType() { + return DmlEvent.class; + } + + @Override + public void serialize(DmlEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + final LogMinerDmlEntry dmlEntry = event.getDmlEntry(); + stream.writeInt(dmlEntry.getEventType().getValue()); + stream.writeString(dmlEntry.getObjectName()); + stream.writeString(dmlEntry.getObjectOwner()); + stream.writeObjectArray(dmlEntry.getNewValues()); + stream.writeObjectArray(dmlEntry.getOldValues()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + final int entryType = stream.readInt(); + final String objectName = stream.readString(); + final String objectOwner = stream.readString(); + final Object[] newValues = stream.readObjectArray(); + final Object[] oldValues = stream.readObjectArray(); + + context.addValue(new LogMinerDmlEntryImpl(entryType, newValues, oldValues, objectOwner, objectName)); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/EhcacheTransactionSerializer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/EhcacheTransactionSerializer.java new file mode 100644 index 000000000..c74013659 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/EhcacheTransactionSerializer.java @@ -0,0 +1,50 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; +import java.time.Instant; + +import org.ehcache.spi.serialization.Serializer; + +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheTransaction; + +/** + * An Ehcache {@link Serializer} implementation for storing an {@link EhcacheTransaction} in the cache. + * + * @author Chris Cranford + */ +public class EhcacheTransactionSerializer extends AbstractEhcacheSerializer { + + public EhcacheTransactionSerializer(ClassLoader classLoader) { + } + + @Override + protected void serialize(EhcacheTransaction object, SerializerOutputStream stream) throws IOException { + stream.writeString(object.getTransactionId()); + stream.writeScn(object.getStartScn()); + stream.writeInstant(object.getChangeTime()); + stream.writeString(object.getUserName()); + stream.writeInt(object.getRedoThreadId()); + stream.writeInt(object.getNumberOfEvents()); + } + + @Override + protected EhcacheTransaction deserialize(SerializerInputStream stream) throws IOException { + final String transactionId = stream.readString(); + final Scn startScn = readScn(stream.readString()); + final Instant changeTime = stream.readInstant(); + final String userName = stream.readString(); + final int redoThread = stream.readInt(); + final int numberOfEvents = stream.readInt(); + return new EhcacheTransaction(transactionId, startScn, changeTime, userName, redoThread, numberOfEvents); + } + + private Scn readScn(String value) { + return value.equals("null") ? Scn.NULL : Scn.valueOf(value); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobEraseEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobEraseEventSerdesProvider.java new file mode 100644 index 000000000..f6ff27cac --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobEraseEventSerdesProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import io.debezium.connector.oracle.logminer.events.LobEraseEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link LobEraseEvent} types. + * + * @author Chris Cranford + */ +public class LobEraseEventSerdesProvider extends LogMinerEventSerdesProvider { + @Override + public Class getJavaType() { + return LobEraseEvent.class; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobWriteEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobWriteEventSerdesProvider.java new file mode 100644 index 000000000..edddf7e07 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LobWriteEventSerdesProvider.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.LobWriteEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link LobWriteEvent} types. + * + * @author Chris Cranford + */ +public class LobWriteEventSerdesProvider extends LogMinerEventSerdesProvider { + @Override + public Class getJavaType() { + return LobWriteEvent.class; + } + + @Override + public void serialize(LobWriteEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + stream.writeString(event.getData()); + stream.writeInt(event.getOffset()); + stream.writeInt(event.getLength()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + context.addValue(stream.readString()); + context.addValue(stream.readInt()); + context.addValue(stream.readInt()); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerdesProvider.java new file mode 100644 index 000000000..bf0bde801 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerdesProvider.java @@ -0,0 +1,43 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.EventType; +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link LogMinerEvent} types. + * + * @author Chris Cranford + */ +public class LogMinerEventSerdesProvider implements SerdesProvider { + @Override + public Class getJavaType() { + return LogMinerEvent.class; + } + + @Override + public void serialize(LogMinerEvent event, SerializerOutputStream stream) throws IOException { + stream.writeInt(event.getEventType().getValue()); + stream.writeScn(event.getScn()); + stream.writeTableId(event.getTableId()); + stream.writeString(event.getRowId()); + stream.writeString(event.getRsId()); + stream.writeInstant(event.getChangeTime()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + context.addValue(EventType.from(stream.readInt())); + context.addValue(stream.readScn()); + context.addValue(stream.readTableId()); + context.addValue(stream.readString()); + context.addValue(stream.readString()); + context.addValue(stream.readInstant()); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerializer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerializer.java new file mode 100644 index 000000000..da7552dcd --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/LogMinerEventSerializer.java @@ -0,0 +1,143 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.ehcache.spi.serialization.Serializer; +import org.ehcache.spi.serialization.SerializerException; + +import io.debezium.connector.oracle.logminer.events.LogMinerEvent; + +/** + * An Ehcache {@link Serializer} implementation for handling {@link LogMinerEvent} and subtypes. + * + * @author Chris Cranford + */ +public class LogMinerEventSerializer extends AbstractEhcacheSerializer { + + private static final Map, Class> primitiveToWrapperMap = new HashMap<>(); + static { + primitiveToWrapperMap.put(boolean.class, Boolean.class); + primitiveToWrapperMap.put(byte.class, Byte.class); + primitiveToWrapperMap.put(char.class, Character.class); + primitiveToWrapperMap.put(short.class, Short.class); + primitiveToWrapperMap.put(int.class, Integer.class); + primitiveToWrapperMap.put(long.class, Long.class); + primitiveToWrapperMap.put(float.class, Float.class); + primitiveToWrapperMap.put(double.class, Double.class); + } + + private final Map> serdesProviders = new HashMap<>(); + private final Map, Constructor> constructorCache = new ConcurrentHashMap<>(); + + public LogMinerEventSerializer(ClassLoader classLoader) { + registerSerdesProviders(); + } + + @Override + protected void serialize(LogMinerEvent object, SerializerOutputStream stream) throws IOException { + final SerdesProvider serdes = getSerdesByClassName(object.getClass().getName()); + stream.writeString(object.getClass().getName()); + serdes.serialize(object, stream); + } + + @Override + protected LogMinerEvent deserialize(SerializerInputStream stream) throws IOException { + final String clazzName = stream.readString(); + final SerdesProvider serdes = getSerdesByClassName(clazzName); + + final DeserializationContext context = new DeserializationContext(); + serdes.deserialize(context, stream); + + return constructObject(serdes.getJavaType(), context); + } + + private void registerSerdesProviders() { + registerSerdes(new DmlEventSerdesProvider<>()); + registerSerdes(new LobEraseEventSerdesProvider<>()); + registerSerdes(new LobWriteEventSerdesProvider<>()); + registerSerdes(new LogMinerEventSerdesProvider<>()); + registerSerdes(new RedoSqlDmlEventSerdesProvider<>()); + registerSerdes(new SelectLobLocatorSerdesProvider<>()); + registerSerdes(new TruncateEventSerdesProvider<>()); + registerSerdes(new XmlBeginEventSerdesProvider<>()); + registerSerdes(new XmlEndEventSerdesProvider<>()); + registerSerdes(new XmlWriteEventSerdesProvider<>()); + } + + private void registerSerdes(SerdesProvider serdesProvider) { + serdesProviders.put(serdesProvider.getJavaType().getName(), serdesProvider); + } + + private SerdesProvider getSerdesByClassName(String clazzName) { + final SerdesProvider provider = serdesProviders.get(clazzName); + if (provider == null) { + throw new SerializerException("Failed to find SerdesProvider for class: " + clazzName); + } + return provider; + } + + private LogMinerEvent constructObject(Class clazz, DeserializationContext context) { + try { + final List values = context.getValues(); + + // The constructor cache allows for maintaining a reference to the matching class constructor + // used after the first match lookup to improve serialization performance. + final Constructor constructor = constructorCache.computeIfAbsent(clazz, (classType) -> { + final Class[] parameterTypes = getParameterTypes(values); + final Constructor result = getMatchingConstructor(clazz, parameterTypes); + if (result == null) { + throw new SerializerException("Failed to find matching constructor for argument types: " + Arrays.toString(parameterTypes)); + } + return result; + }); + + return (LogMinerEvent) constructor.newInstance(values.toArray()); + } + catch (Exception e) { + throw new SerializerException("Failed to construct object of type " + clazz.getName(), e); + } + } + + private Class[] getParameterTypes(List values) { + return values.stream().map(Object::getClass).toArray(Class[]::new); + } + + private Constructor getMatchingConstructor(Class clazz, Class[] parameterTypes) { + for (Constructor constructor : clazz.getConstructors()) { + final Class[] paramTypes = constructor.getParameterTypes(); + if (paramTypes.length == parameterTypes.length) { + boolean matches = true; + for (int i = 0; i < paramTypes.length; i++) { + if (!paramTypes[i].isAssignableFrom(parameterTypes[i]) + && !parameterTypes[i].isAssignableFrom(paramTypes[i]) + && !isWrapperPrimitiveMatch(paramTypes[i], parameterTypes[i])) { + matches = false; + break; + } + } + if (matches) { + return constructor; + } + } + } + return null; + } + + private static boolean isWrapperPrimitiveMatch(Class paramType, Class argType) { + if (paramType.isPrimitive()) { + return argType.equals(primitiveToWrapperMap.get(paramType)); + } + return false; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/RedoSqlDmlEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/RedoSqlDmlEventSerdesProvider.java new file mode 100644 index 000000000..1a9596fa9 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/RedoSqlDmlEventSerdesProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link RedoSqlDmlEvent} types. + * + * @author Chris Cranford + */ +public class RedoSqlDmlEventSerdesProvider extends DmlEventSerdesProvider { + @Override + public Class getJavaType() { + return RedoSqlDmlEvent.class; + } + + @Override + public void serialize(RedoSqlDmlEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + stream.writeString(event.getRedoSql()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + context.addValue(stream.readString()); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SelectLobLocatorSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SelectLobLocatorSerdesProvider.java new file mode 100644 index 000000000..25ceee937 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SelectLobLocatorSerdesProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link SelectLobLocatorEvent} types. + * + * @author Chris Cranford + */ +public class SelectLobLocatorSerdesProvider extends DmlEventSerdesProvider { + @Override + public Class getJavaType() { + return SelectLobLocatorEvent.class; + } + + @Override + public void serialize(SelectLobLocatorEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + stream.writeString(event.getColumnName()); + stream.writeBoolean(event.isBinary()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + context.addValue(stream.readString()); + context.addValue(stream.readBoolean()); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerdesProvider.java new file mode 100644 index 000000000..dc000dca0 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerdesProvider.java @@ -0,0 +1,41 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +/** + * Defines the contract for supplying a Serdes (Serializer/Deserializer tuple) for the Ehcache + * transaction event cache that stores different types of events. + * + * @author Chris Cranford + */ +public interface SerdesProvider { + /** + * Get the java type for the serialization/deserialization provider. + * + * @return the java class type + */ + Class getJavaType(); + + /** + * Serializes the object into the output stream. + * + * @param object the object to be serialized, should not be {@code null} + * @param stream the output stream, should not be {@code null} + * @throws IOException thrown if there is a problem serializing the data to the output stream + */ + void serialize(T object, SerializerOutputStream stream) throws IOException; + + /** + * Deserializes the input stream. + * + * @param context the deserialization context, should not be {@code null} + * @param stream the input stream to be read, should not be {@code null} + * @throws IOException thrown if there is a problem deserializing the data from the input stream + */ + void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException; +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerInputStream.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerInputStream.java new file mode 100644 index 000000000..c0deff01f --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerInputStream.java @@ -0,0 +1,148 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Instant; +import java.util.Arrays; +import java.util.Objects; + +import io.debezium.connector.oracle.OracleValueConverters; +import io.debezium.connector.oracle.Scn; +import io.debezium.relational.TableId; + +/** + * A reader implementation enabling the reading of data from an {@link InputStream}. + * + * @author Chris Cranford + */ +public class SerializerInputStream extends AbstractSerializerStream { + + private final DataInputStream delegate; + + public SerializerInputStream(InputStream inputStream) { + this.delegate = new DataInputStream(inputStream); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + /** + * Reads an Oracle SCN value from the stream. + * + * @return the read SCN value or {@link Scn#NULL} if null + * @throws IOException when a read operation fails + */ + public Scn readScn() throws IOException { + final String value = readString(); + return value == null ? Scn.NULL : Scn.valueOf(value); + } + + /** + * Reads a Debezium {@link TableId} from the stream. + * + * @return the table id, never {@code null} + * @throws IOException when a read operation fails + */ + public TableId readTableId() throws IOException { + return TableId.parse(readString()); + } + + /** + * Read an {@link Instant} from the stream. + * + * @return the read instant value, never {@code null} + * @throws IOException when a read operation fails + */ + public Instant readInstant() throws IOException { + return Instant.parse(readString()); + } + + /** + * Read a boolean value from the stream. + * + * @return the boolean value + * @throws IOException when a read operation fails + */ + public boolean readBoolean() throws IOException { + return delegate.readBoolean(); + } + + /** + * Read an integer value from the stream. + * + * @return the integer value + * @throws IOException when a read operation fails + */ + public int readInt() throws IOException { + return delegate.readInt(); + } + + /** + * Read a string value from the stream. + * + * @return the string value or {@code null} when null + * @throws IOException when a read operation fails + */ + public String readString() throws IOException { + // Strings are serialized with a boolean flag preceding the string indicating nullability. + boolean isNull = delegate.readBoolean(); + if (isNull) { + return null; + } + return delegate.readUTF(); + } + + /** + * Reads an object array from the stream. + * + * @return the object array, never {@code null} + * @throws IOException when a read operation fails + */ + public Object[] readObjectArray() throws IOException { + return stringArrayToObjectArray(readStringArray()); + } + + /** + * Reads a string array from the stream. + * + * @return the string array, never {@code null} + * @throws IOException when a read operation fails + */ + protected String[] readStringArray() throws IOException { + final int size = readInt(); + final String[] data = new String[size]; + for (int i = 0; i < size; i++) { + data[i] = readString(); + } + return data; + } + + /** + * Convert an array of string values, typically from the encoded stream back into their + * Java object representations after being deserialized. + * + * @param values array of string values to convert + * @return array of objects after conversion + */ + protected Object[] stringArrayToObjectArray(String[] values) { + Objects.requireNonNull(values); + Object[] results = Arrays.copyOf(values, values.length, Object[].class); + for (int i = 0; i < results.length; ++i) { + if (results[i].equals(NULL_VALUE_SENTINEL)) { + results[i] = null; + } + else if (results[i].equals(UNAVAILABLE_VALUE_SENTINEL)) { + results[i] = OracleValueConverters.UNAVAILABLE_VALUE; + } + } + return results; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerOutputStream.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerOutputStream.java new file mode 100644 index 000000000..72f518c32 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/SerializerOutputStream.java @@ -0,0 +1,153 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.time.Instant; +import java.util.Objects; + +import io.debezium.connector.oracle.OracleValueConverters; +import io.debezium.connector.oracle.Scn; +import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry; +import io.debezium.relational.TableId; + +/** + * @author Chris Cranford + */ +public class SerializerOutputStream extends AbstractSerializerStream { + + private final DataOutputStream delegate; + + public SerializerOutputStream(OutputStream outputStream) { + this.delegate = new DataOutputStream(outputStream); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + + /** + * Writes an Oracle SCN value to the stream. + * + * @param scn the SCN value + * @throws IOException when a write operation fails + */ + public void writeScn(Scn scn) throws IOException { + writeString(scn == null || scn.isNull() ? null : scn.toString()); + } + + /** + * Writes a {@link TableId} to the stream. + * + * @param tableId the table id, should not be {@code null} + * @throws IOException when a write operation fails + */ + public void writeTableId(TableId tableId) throws IOException { + Objects.requireNonNull(tableId); + writeString(tableId.toDoubleQuotedString()); + } + + /** + * Writes an {@link Instant} to the stream. + * @param instant the instant to be written, should not be {@code null} + * @throws IOException when a write operation fails + */ + public void writeInstant(Instant instant) throws IOException { + Objects.requireNonNull(instant); + writeString(instant.toString()); + } + + /** + * Writes a boolean value to the stream. + * + * @param value the boolean value to write + * @throws IOException when a write operation fails + */ + public void writeBoolean(boolean value) throws IOException { + delegate.writeBoolean(value); + } + + /** + * Write an integer value to the stream. + * + * @param value the integer value to write + * @throws IOException when a write operation fails + */ + public void writeInt(int value) throws IOException { + delegate.writeInt(value); + } + + /** + * Write a string value to the stream. + * + * @param value the string value to write, can be {@code null} + * @throws IOException when a write operation fails + */ + public void writeString(String value) throws IOException { + // When writing to a DataOutputStream, the writeUTF method does not accept a null string value. + // To account for this, Debezium will write a preceding boolean flag indicating nullability, + // and the string will only be written if the string is not null. + if (value == null) { + delegate.writeBoolean(true); + } + else { + delegate.writeBoolean(false); + delegate.writeUTF(value); + } + } + + /** + * Write an object array to the stream. + * + * @param values the object array to write, should not be {@code null} + * @throws IOException when a write operation fails + */ + public void writeObjectArray(Object[] values) throws IOException { + Objects.requireNonNull(values); + writeStringArray(objectArrayToStringArray(values)); + } + + /** + * Writes a string array to the stream. + * + * @param values the string array to write, should not be {@code null} + * @throws IOException when a write operation fails + */ + protected void writeStringArray(String[] values) throws IOException { + Objects.requireNonNull(values); + writeInt(values.length); + for (String value : values) { + writeString(value); + } + } + + /** + * Convert an array of object values to a string array for serialization. This is typically + * used when preparing the {@link LogMinerDmlEntry} values array for serialization. + * + * @param values the dml entry's new or old object values array + * @return string array of values prepared for serialization + */ + protected String[] objectArrayToStringArray(Object[] values) { + Objects.requireNonNull(values); + String[] results = new String[values.length]; + for (int i = 0; i < values.length; ++i) { + if (values[i] == null) { + results[i] = NULL_VALUE_SENTINEL; + } + else if (values[i] == OracleValueConverters.UNAVAILABLE_VALUE) { + results[i] = UNAVAILABLE_VALUE_SENTINEL; + } + else { + results[i] = (String) values[i]; + } + } + return results; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/TruncateEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/TruncateEventSerdesProvider.java new file mode 100644 index 000000000..c3dbac5b3 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/TruncateEventSerdesProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import io.debezium.connector.oracle.logminer.events.TruncateEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link TruncateEvent} types. + * + * @author Chris Cranford + */ +public class TruncateEventSerdesProvider extends DmlEventSerdesProvider { + @Override + public Class getJavaType() { + return TruncateEvent.class; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlBeginEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlBeginEventSerdesProvider.java new file mode 100644 index 000000000..0d424d4b5 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlBeginEventSerdesProvider.java @@ -0,0 +1,36 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.XmlBeginEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link XmlBeginEvent} types. + * + * @author Chris Cranford + */ +public class XmlBeginEventSerdesProvider extends DmlEventSerdesProvider { + @Override + public Class getJavaType() { + return XmlBeginEvent.class; + } + + @Override + public void serialize(XmlBeginEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + stream.writeString(event.getColumnName()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + context.addValue(stream.readString()); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlEndEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlEndEventSerdesProvider.java new file mode 100644 index 000000000..8e9806f67 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlEndEventSerdesProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import io.debezium.connector.oracle.logminer.events.XmlEndEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link XmlEndEvent} types. + * + * @author Chris Cranford + */ +public class XmlEndEventSerdesProvider extends LogMinerEventSerdesProvider { + @Override + public Class getJavaType() { + return XmlEndEvent.class; + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlWriteEventSerdesProvider.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlWriteEventSerdesProvider.java new file mode 100644 index 000000000..920643682 --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/ehcache/serialization/XmlWriteEventSerdesProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer.processor.ehcache.serialization; + +import java.io.IOException; + +import io.debezium.connector.oracle.logminer.events.XmlWriteEvent; + +/** + * A specialized implementation of {@link SerdesProvider} for {@link XmlWriteEvent} types. + * + * @author Chris Cranford + */ +public class XmlWriteEventSerdesProvider extends LogMinerEventSerdesProvider { + @Override + public Class getJavaType() { + return XmlWriteEvent.class; + } + + @Override + public void serialize(XmlWriteEvent event, SerializerOutputStream stream) throws IOException { + super.serialize(event, stream); + + stream.writeString(event.getXml()); + stream.writeInt(event.getLength()); + } + + @Override + public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException { + super.deserialize(context, stream); + + context.addValue(stream.readString()); + context.addValue(stream.readInt()); + } +} diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 5f2e691f2..e12d4ff1b 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -178,6 +178,15 @@ else if (adapter().equals(ConnectorAdapter.OLR)) { builder.with("log.mining.buffer." + ConfigurationProperties.AUTH_PASSWORD, INFINISPAN_PASS); } } + else if (bufferType.isEhcache()) { + final long cacheSize = 1024 * 1024 * 10; // 10Mb each + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferTypeName); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, "./target/data"); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, cacheSize); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, cacheSize); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, cacheSize); + builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES, cacheSize); + } builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true); }