DBZ-7758 Add Ehcache transaction buffer implementation

This commit is contained in:
Chris Cranford 2024-08-05 18:40:15 -04:00 committed by Jiri Pechanec
parent f3f8621e97
commit 7c3aa19dd5
24 changed files with 1439 additions and 4 deletions

View File

@ -85,6 +85,11 @@
<artifactId>protobuf-java-util</artifactId> <artifactId>protobuf-java-util</artifactId>
<version>${version.com.google.protobuf}</version> <version>${version.com.google.protobuf}</version>
</dependency> </dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>3.9.6</version>
</dependency>
<!-- Testing --> <!-- Testing -->
<dependency> <dependency>
@ -730,6 +735,17 @@
</plugins> </plugins>
</build> </build>
</profile> </profile>
<!-- This profile should be used for testing connector with EhCache only -->
<profile>
<id>oracle-ehcache</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<log.mining.buffer.type.name>ehcache</log.mining.buffer.type.name>
</properties>
<!-- todo: when using this profile, enforce oracle-xstream being mutually exclusive -->
</profile>
<profile> <profile>
<id>apicurio</id> <id>apicurio</id>
<activation> <activation>

View File

@ -36,6 +36,7 @@
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy; import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor; import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
@ -373,7 +374,9 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
System.lineSeparator() + System.lineSeparator() +
"infinispan_embedded - This option uses an embedded Infinispan cache to buffer transaction data and persist it to disk." + System.lineSeparator() + "infinispan_embedded - This option uses an embedded Infinispan cache to buffer transaction data and persist it to disk." + System.lineSeparator() +
System.lineSeparator() + System.lineSeparator() +
"infinispan_remote - This option uses a remote Infinispan cluster to buffer transaction data and persist it to disk."); "infinispan_remote - This option uses a remote Infinispan cluster to buffer transaction data and persist it to disk." + System.lineSeparator() +
System.lineSeparator() +
"ehcache - Use ehcache in embedded mode to buffer transaction data and persist it to disk.");
public static final Field LOG_MINING_BUFFER_TRANSACTION_EVENTS_THRESHOLD = Field.create("log.mining.buffer.transaction.events.threshold") public static final Field LOG_MINING_BUFFER_TRANSACTION_EVENTS_THRESHOLD = Field.create("log.mining.buffer.transaction.events.threshold")
.withDisplayName("The maximum number of events a transaction can have before being discarded.") .withDisplayName("The maximum number of events a transaction can have before being discarded.")
@ -621,6 +624,35 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
.withValidation(Field::isNonNegativeInteger) .withValidation(Field::isNonNegativeInteger)
.withDescription("The number of attempts to retry database errors during snapshots before failing."); .withDescription("The number of attempts to retry database errors during snapshots before failing.");
public static final Field LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH = Field.create("log.mining.buffer.ehcache.storage.path")
.withType(Type.STRING)
.withValidation(OracleConnectorConfig::validateEhcacheFieldRequired)
.withDescription("The persistent location where the Ehcache off-heap files should be stored.");
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.transactions.bytes")
.withType(Type.LONG)
.withDefault(0)
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
.withDescription("The size of the Ehcache transaction cache in bytes");
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES = Field.create("log.mining.buffer.ehcache.cache.processedtransactions.bytes")
.withType(Type.LONG)
.withDefault(0)
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
.withDescription("The size of the Ehcache processed transaction cache in bytes");
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES = Field.create("log.mining.buffer.ehcache.cache.schemachanges.bytes")
.withType(Type.LONG)
.withDefault(0)
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
.withDescription("The size of the Ehcache schema changes cache in bytes");
public static final Field LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES = Field.create("log.mining.buffer.ehcache.cache.events.bytes")
.withType(Type.LONG)
.withDefault(0)
.withValidation(OracleConnectorConfig::validateEhcacheBytesFieldRequired)
.withDescription("The size of the Ehcache transaction events cache in bytes");
@Deprecated @Deprecated
public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine") public static final Field LOG_MINING_CONTINUOUS_MINE = Field.create("log.mining.continuous.mine")
.withDisplayName("Should log mining session configured with CONTINUOUS_MINE setting?") .withDisplayName("Should log mining session configured with CONTINUOUS_MINE setting?")
@ -705,7 +737,12 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
OLR_HOST, OLR_HOST,
OLR_PORT, OLR_PORT,
SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES, SNAPSHOT_DATABASE_ERRORS_MAX_RETRIES,
LOG_MINING_CONTINUOUS_MINE) LOG_MINING_CONTINUOUS_MINE,
LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH,
LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES,
LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES,
LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES,
LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES)
.events(SOURCE_INFO_STRUCT_MAKER) .events(SOURCE_INFO_STRUCT_MAKER)
.create(); .create();
@ -775,6 +812,12 @@ public static ConfigDef configDef() {
private final Boolean logMiningIncludeRedoSql; private final Boolean logMiningIncludeRedoSql;
private final boolean logMiningContinuousMining; private final boolean logMiningContinuousMining;
private final String logMiningBufferEhcacheStoragePath;
private final long logMiningBufferEhcacheTransactionCacheSizeBytes;
private final long logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes;
private final long logMiningBufferEhcacheSchemaChangesCacheSizeBytes;
private final long logMiningBufferEhcacheEventsCacheSizeBytes;
private final String openLogReplicatorSource; private final String openLogReplicatorSource;
private final String openLogReplicatorHostname; private final String openLogReplicatorHostname;
private final Integer openLogReplicatorPort; private final Integer openLogReplicatorPort;
@ -844,6 +887,12 @@ public OracleConnectorConfig(Configuration config) {
this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL); this.logMiningIncludeRedoSql = config.getBoolean(LOG_MINING_INCLUDE_REDO_SQL);
this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE); this.logMiningContinuousMining = config.getBoolean(LOG_MINING_CONTINUOUS_MINE);
this.logMiningBufferEhcacheStoragePath = config.getString(LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH);
this.logMiningBufferEhcacheTransactionCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES);
this.logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES);
this.logMiningBufferEhcacheSchemaChangesCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES);
this.logMiningBufferEhcacheEventsCacheSizeBytes = config.getLong(LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES);
// OpenLogReplicator // OpenLogReplicator
this.openLogReplicatorSource = config.getString(OLR_SOURCE); this.openLogReplicatorSource = config.getString(OLR_SOURCE);
this.openLogReplicatorHostname = config.getString(OLR_HOST); this.openLogReplicatorHostname = config.getString(OLR_HOST);
@ -1452,6 +1501,21 @@ public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher, return new RemoteInfinispanLogMinerEventProcessor(context, connectorConfig, connection, dispatcher,
partition, offsetContext, schema, metrics); partition, offsetContext, schema, metrics);
} }
},
EHCACHE("ehcache") {
@Override
public LogMinerEventProcessor createProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection connection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
return new EhcacheLogMinerEventProcessor(context, connectorConfig, connection, dispatcher,
partition, offsetContext, schema, metrics);
}
}; };
private final String value; private final String value;
@ -1475,11 +1539,15 @@ public String getValue() {
} }
public boolean isInfinispan() { public boolean isInfinispan() {
return !MEMORY.equals(this); return INFINISPAN_EMBEDDED.equals(this) || INFINISPAN_REMOTE.equals(this);
} }
public boolean isInfinispanEmbedded() { public boolean isInfinispanEmbedded() {
return isInfinispan() && INFINISPAN_EMBEDDED.equals(this); return INFINISPAN_EMBEDDED.equals(this);
}
public boolean isEhcache() {
return EHCACHE.equals(this);
} }
public static LogMiningBufferType parse(String value) { public static LogMiningBufferType parse(String value) {
@ -1932,6 +2000,26 @@ public Integer getOpenLogReplicatorPort() {
return openLogReplicatorPort; return openLogReplicatorPort;
} }
public String getLogMiningBufferEhcacheStoragePath() {
return logMiningBufferEhcacheStoragePath;
}
public long getLogMiningBufferEhcacheTransactionCacheSizeBytes() {
return logMiningBufferEhcacheTransactionCacheSizeBytes;
}
public long getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes() {
return logMiningBufferEhcacheProcessedTransactionsCacheSizeBytes;
}
public long getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes() {
return logMiningBufferEhcacheSchemaChangesCacheSizeBytes;
}
public long getLogMiningBufferEhcacheEventsCacheSizeBytes() {
return logMiningBufferEhcacheEventsCacheSizeBytes;
}
@Override @Override
public String getConnectorName() { public String getConnectorName() {
return Module.name(); return Module.name();
@ -2118,4 +2206,22 @@ public static int validateLogMiningStrategy(Configuration config, Field field, V
} }
return 0; return 0;
} }
public static int validateEhcacheFieldRequired(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) {
return Field.isRequired(config, field, problems);
}
}
return 0;
}
public static int validateEhcacheBytesFieldRequired(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.LOG_MINER.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
if (LogMiningBufferType.parse(config.getString(LOG_MINING_BUFFER_TYPE)).isEhcache()) {
return Field.isPositiveLong(config, field, problems);
}
}
return 0;
}
} }

View File

@ -0,0 +1,104 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.ehcache.Cache;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
/**
* An implementation of {@link LogMinerCache} for Ehcache backed off-heap caches.
*
* @author Chris Cranford
*/
public class EhcacheLogMinerCache<K, V> implements LogMinerCache<K, V> {
private final Cache<K, V> cache;
public EhcacheLogMinerCache(Cache<K, V> cache) {
this.cache = cache;
}
@Override
public void entries(Consumer<Stream<Entry<K, V>>> streamConsumer) {
streamConsumer.accept(getCacheStream().map(e -> new Entry<>(e.getKey(), e.getValue())));
}
@Override
public void clear() {
cache.clear();
}
@Override
public V get(K key) {
return cache.get(key);
}
@Override
public boolean isEmpty() {
// todo: how to improve this
return size() == 0;
}
@Override
public boolean containsKey(K key) {
return cache.containsKey(key);
}
@Override
public void put(K key, V value) {
cache.put(key, value);
}
@Override
public V remove(K key) {
// Ehcache does not support remove returning the existing value; therefore directly fetch it first
V value = get(key);
cache.remove(key);
return value;
}
@Override
public int size() {
final int[] count = { 0 };
cache.spliterator().forEachRemaining(element -> count[0]++);
return count[0];
}
@Override
public void forEach(BiConsumer<K, V> action) {
getCacheStream().forEach(e -> action.accept(e.getKey(), e.getValue()));
}
@Override
public void removeIf(Predicate<Entry<K, V>> predicate) {
final List<K> keysToRemove = new ArrayList<>();
forEach((k, v) -> {
if (predicate.test(new Entry<>(k, v))) {
keysToRemove.add(k);
}
});
keysToRemove.forEach(cache::remove);
}
@Override
public <T> T streamAndReturn(Function<Stream<Entry<K, V>>, T> function) {
return function.apply(getCacheStream().map(e -> new Entry<>(e.getKey(), e.getValue())));
}
private Stream<Cache.Entry<K, V>> getCacheStream() {
return StreamSupport.stream(cache.spliterator(), false);
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache;
import org.ehcache.PersistentCacheManager;
import org.ehcache.config.CacheConfiguration;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.internal.statistics.DefaultStatisticsService;
import org.ehcache.expiry.ExpiryPolicy;
import org.ehcache.spi.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractTransactionCachingLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.LogMinerCache;
import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.EhcacheTransactionSerializer;
import io.debezium.connector.oracle.logminer.processor.ehcache.serialization.LogMinerEventSerializer;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.relational.TableId;
/**
* An {@link AbstractTransactionCachingLogMinerEventProcessor} implementation for storing buffer details
* off-heap in a set of Ehcache-backed caches.
*
* @author Chris Cranford
*/
public class EhcacheLogMinerEventProcessor extends AbstractTransactionCachingLogMinerEventProcessor<EhcacheTransaction> {
private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheLogMinerEventProcessor.class);
private final PersistentCacheManager cacheManager;
private final LogMinerCache<String, EhcacheTransaction> transactionsCache;
private final LogMinerCache<String, LogMinerEvent> eventCache;
private final LogMinerCache<String, String> processedTransactionsCache;
private final LogMinerCache<String, String> schemaChangesCache;
@SuppressWarnings({ "rawtypes", "unchecked" })
public EhcacheLogMinerEventProcessor(ChangeEventSourceContext context,
OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection,
EventDispatcher<OraclePartition, TableId> dispatcher,
OraclePartition partition,
OracleOffsetContext offsetContext,
OracleDatabaseSchema schema,
LogMinerStreamingChangeEventSourceMetrics metrics) {
super(context, connectorConfig, jdbcConnection, dispatcher, partition, offsetContext, schema, metrics);
LOGGER.info("Using Ehcache buffer");
this.cacheManager = createCacheManager(connectorConfig);
this.transactionsCache = new EhcacheLogMinerCache(cacheManager.getCache(TRANSACTIONS_CACHE_NAME, String.class, EhcacheTransaction.class));
this.processedTransactionsCache = new EhcacheLogMinerCache(cacheManager.getCache(PROCESSED_TRANSACTIONS_CACHE_NAME, String.class, String.class));
this.schemaChangesCache = new EhcacheLogMinerCache(cacheManager.getCache(SCHEMA_CHANGES_CACHE_NAME, String.class, String.class));
this.eventCache = new EhcacheLogMinerCache(cacheManager.getCache(EVENTS_CACHE_NAME, String.class, LogMinerEvent.class));
}
@Override
protected EhcacheTransaction createTransaction(LogMinerEventRow row) {
return new EhcacheTransaction(row.getTransactionId(), row.getScn(), row.getChangeTime(), row.getUserName(), row.getThread());
}
@Override
public LogMinerCache<String, EhcacheTransaction> getTransactionCache() {
return transactionsCache;
}
@Override
public LogMinerCache<String, LogMinerEvent> getEventCache() {
return eventCache;
}
@Override
public LogMinerCache<String, String> getSchemaChangesCache() {
return schemaChangesCache;
}
@Override
public LogMinerCache<String, String> getProcessedTransactionsCache() {
return processedTransactionsCache;
}
@Override
public void close() throws Exception {
if (cacheManager != null) {
cacheManager.close();
}
}
private PersistentCacheManager createCacheManager(OracleConnectorConfig connectorConfig) {
final String cachePath = connectorConfig.getLogMiningBufferEhcacheStoragePath();
final long transactionCacheSize = connectorConfig.getLogMiningBufferEhcacheTransactionCacheSizeBytes();
final long processedTransactionsCacheSize = connectorConfig.getLogMiningBufferEhcacheProcessedTransactionsCacheSizeBytes();
final long schemaChangesCacheSize = connectorConfig.getLogMiningBufferEhcacheSchemaChangesCacheSizeBytes();
final long eventsCacheSize = connectorConfig.getLogMiningBufferEhcacheEventsCacheSizeBytes();
return CacheManagerBuilder.newCacheManagerBuilder()
.with(CacheManagerBuilder.persistence(cachePath))
.using(new DefaultStatisticsService())
.withCache(TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, EhcacheTransaction.class, transactionCacheSize, null))
.withCache(PROCESSED_TRANSACTIONS_CACHE_NAME, createCacheConfiguration(String.class, String.class, processedTransactionsCacheSize, null))
.withCache(SCHEMA_CHANGES_CACHE_NAME, createCacheConfiguration(String.class, String.class, schemaChangesCacheSize, null))
.withCache(EVENTS_CACHE_NAME, createCacheConfiguration(String.class, LogMinerEvent.class, eventsCacheSize, null))
.withSerializer(EhcacheTransaction.class, EhcacheTransactionSerializer.class)
.withSerializer(LogMinerEvent.class, LogMinerEventSerializer.class)
.build(true);
}
@SuppressWarnings("SameParameterValue")
private <K, V> CacheConfiguration<K, V> createCacheConfiguration(Class<K> keyClass, Class<V> valueClass, long sizeMb,
Class<? extends Serializer<V>> valueSerializer) {
final CacheConfigurationBuilder<K, V> builder = CacheConfigurationBuilder.newCacheConfigurationBuilder(
keyClass,
valueClass,
ResourcePoolsBuilder.newResourcePoolsBuilder()
.disk(sizeMb, MemoryUnit.B, !getConfig().isLogMiningBufferDropOnStop()))
.withExpiry(ExpiryPolicy.NO_EXPIRY);
if (valueSerializer != null) {
builder.withValueSerializer(valueSerializer);
}
return builder.build();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache;
import java.time.Instant;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.AbstractTransaction;
/**
* A {@link AbstractTransaction} implementation for Ehcache off-heap caches.
*
* @author Chris Cranford
*/
public class EhcacheTransaction extends AbstractTransaction {
private int numberOfEvents;
public EhcacheTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, Integer redoThread) {
super(transactionId, startScn, changeTime, userName, redoThread);
start();
}
public EhcacheTransaction(String transactionId, Scn startScn, Instant changeTime, String userName, Integer redoThread, int numberOfEvents) {
super(transactionId, startScn, changeTime, userName, redoThread);
this.numberOfEvents = numberOfEvents;
}
@Override
public int getNumberOfEvents() {
return numberOfEvents;
}
@Override
public int getNextEventId() {
return numberOfEvents++;
}
@Override
public void start() {
numberOfEvents = 0;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.ehcache.spi.serialization.Serializer;
import org.ehcache.spi.serialization.SerializerException;
/**
* An abstract implementation of the Ehcache {@link Serializer} interface.
*
* @author Chris Cranford
*/
public abstract class AbstractEhcacheSerializer<T> implements Serializer<T> {
@Override
public ByteBuffer serialize(T object) throws SerializerException {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
try (SerializerOutputStream stream = new SerializerOutputStream(output)) {
serialize(object, stream);
}
return ByteBuffer.wrap(output.toByteArray());
}
catch (Exception e) {
throw new SerializerException("Failed to serialize " + object.getClass().getSimpleName(), e);
}
}
@Override
public T read(ByteBuffer buffer) throws ClassNotFoundException, SerializerException {
try (ByteArrayInputStream input = new ByteArrayInputStream(buffer.array())) {
// todo: unsure what this magic "40" bytes represents
if (input.skip(40) != 40) {
throw new SerializerException("Failed to skip initial buffer payload");
}
try (SerializerInputStream stream = new SerializerInputStream(input)) {
return deserialize(stream);
}
}
catch (Exception e) {
throw new SerializerException("Failed to deserialize buffer", e);
}
}
@Override
public boolean equals(T object, ByteBuffer buffer) throws ClassNotFoundException, SerializerException {
return Objects.equals(object, read(buffer));
}
/**
* Serialize the specified object to the output stream.
*
* @param object the object to be serialized, should not be {@code null}
* @param stream the output stream to write to, should not be {@code null}
* @throws IOException when a write operation fails on the output stream
*/
protected abstract void serialize(T object, SerializerOutputStream stream) throws IOException;
/**
* Deserializes the data within the input stream.
*
* @param stream the input stream to read, should not be {@code null}
* @return the object deserialized from the input stream, should not be {@code null}
* @throws IOException when a read operation fails on the input stream
*/
protected abstract T deserialize(SerializerInputStream stream) throws IOException;
}

View File

@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import io.debezium.connector.oracle.OracleValueConverters;
/**
* @author Chris Cranford
*/
public abstract class AbstractSerializerStream implements AutoCloseable {
/**
* Arrays cannot be serialized with null values and so we use a sentinel value
* to mark a null element in an array.
*/
protected static final String NULL_VALUE_SENTINEL = "$$DBZ-NULL$$";
/**
* The supplied value arrays can now be populated with {@link OracleValueConverters#UNAVAILABLE_VALUE}
* which is simple java object. This cannot be represented as a string in the cached Ehcache record
* and so this sentinel is used to translate the runtime object representation to a serializable form
* and back during cache to object conversion.
*/
protected static final String UNAVAILABLE_VALUE_SENTINEL = "$$DBZ-UNAVAILABLE-VALUE$$";
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.List;
/**
* A simple context object for tracking the deserialization of a {@link DataInputStream}.
*
* @author Chris Cranford
*/
public class DeserializationContext {
private final List<Object> values = new ArrayList<>();
/**
* Adds a deserialized value to the context's value list. The value list is used to locate and
* construct the target object once the object stream has been consumed. Therefore, values are
* to be added and thus deserialized in the order they appear in the object's constructor.
*
* @param value the deserialized value
*/
public void addValue(Object value) {
values.add(value);
}
/**
* Gets all the deserialized values in the context.
*
* @return list of deserialized values.
*/
public List<Object> getValues() {
return values;
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntryImpl;
/**
* A specialized implementation of {@link SerdesProvider} for {@link DmlEvent} types.
*
* @author Chris Cranford
*/
public class DmlEventSerdesProvider<T extends DmlEvent> extends LogMinerEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return DmlEvent.class;
}
@Override
public void serialize(DmlEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
final LogMinerDmlEntry dmlEntry = event.getDmlEntry();
stream.writeInt(dmlEntry.getEventType().getValue());
stream.writeString(dmlEntry.getObjectName());
stream.writeString(dmlEntry.getObjectOwner());
stream.writeObjectArray(dmlEntry.getNewValues());
stream.writeObjectArray(dmlEntry.getOldValues());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
final int entryType = stream.readInt();
final String objectName = stream.readString();
final String objectOwner = stream.readString();
final Object[] newValues = stream.readObjectArray();
final Object[] oldValues = stream.readObjectArray();
context.addValue(new LogMinerDmlEntryImpl(entryType, newValues, oldValues, objectOwner, objectName));
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import java.time.Instant;
import org.ehcache.spi.serialization.Serializer;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.ehcache.EhcacheTransaction;
/**
* An Ehcache {@link Serializer} implementation for storing an {@link EhcacheTransaction} in the cache.
*
* @author Chris Cranford
*/
public class EhcacheTransactionSerializer extends AbstractEhcacheSerializer<EhcacheTransaction> {
public EhcacheTransactionSerializer(ClassLoader classLoader) {
}
@Override
protected void serialize(EhcacheTransaction object, SerializerOutputStream stream) throws IOException {
stream.writeString(object.getTransactionId());
stream.writeScn(object.getStartScn());
stream.writeInstant(object.getChangeTime());
stream.writeString(object.getUserName());
stream.writeInt(object.getRedoThreadId());
stream.writeInt(object.getNumberOfEvents());
}
@Override
protected EhcacheTransaction deserialize(SerializerInputStream stream) throws IOException {
final String transactionId = stream.readString();
final Scn startScn = readScn(stream.readString());
final Instant changeTime = stream.readInstant();
final String userName = stream.readString();
final int redoThread = stream.readInt();
final int numberOfEvents = stream.readInt();
return new EhcacheTransaction(transactionId, startScn, changeTime, userName, redoThread, numberOfEvents);
}
private Scn readScn(String value) {
return value.equals("null") ? Scn.NULL : Scn.valueOf(value);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link LobEraseEvent} types.
*
* @author Chris Cranford
*/
public class LobEraseEventSerdesProvider<T extends LobEraseEvent> extends LogMinerEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return LobEraseEvent.class;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link LobWriteEvent} types.
*
* @author Chris Cranford
*/
public class LobWriteEventSerdesProvider<T extends LobWriteEvent> extends LogMinerEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return LobWriteEvent.class;
}
@Override
public void serialize(LobWriteEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
stream.writeString(event.getData());
stream.writeInt(event.getOffset());
stream.writeInt(event.getLength());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
context.addValue(stream.readString());
context.addValue(stream.readInt());
context.addValue(stream.readInt());
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link LogMinerEvent} types.
*
* @author Chris Cranford
*/
public class LogMinerEventSerdesProvider<T extends LogMinerEvent> implements SerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return LogMinerEvent.class;
}
@Override
public void serialize(LogMinerEvent event, SerializerOutputStream stream) throws IOException {
stream.writeInt(event.getEventType().getValue());
stream.writeScn(event.getScn());
stream.writeTableId(event.getTableId());
stream.writeString(event.getRowId());
stream.writeString(event.getRsId());
stream.writeInstant(event.getChangeTime());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
context.addValue(EventType.from(stream.readInt()));
context.addValue(stream.readScn());
context.addValue(stream.readTableId());
context.addValue(stream.readString());
context.addValue(stream.readString());
context.addValue(stream.readInstant());
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.ehcache.spi.serialization.Serializer;
import org.ehcache.spi.serialization.SerializerException;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
/**
* An Ehcache {@link Serializer} implementation for handling {@link LogMinerEvent} and subtypes.
*
* @author Chris Cranford
*/
public class LogMinerEventSerializer extends AbstractEhcacheSerializer<LogMinerEvent> {
private static final Map<Class<?>, Class<?>> primitiveToWrapperMap = new HashMap<>();
static {
primitiveToWrapperMap.put(boolean.class, Boolean.class);
primitiveToWrapperMap.put(byte.class, Byte.class);
primitiveToWrapperMap.put(char.class, Character.class);
primitiveToWrapperMap.put(short.class, Short.class);
primitiveToWrapperMap.put(int.class, Integer.class);
primitiveToWrapperMap.put(long.class, Long.class);
primitiveToWrapperMap.put(float.class, Float.class);
primitiveToWrapperMap.put(double.class, Double.class);
}
private final Map<String, SerdesProvider<?>> serdesProviders = new HashMap<>();
private final Map<Class<?>, Constructor<?>> constructorCache = new ConcurrentHashMap<>();
public LogMinerEventSerializer(ClassLoader classLoader) {
registerSerdesProviders();
}
@Override
protected void serialize(LogMinerEvent object, SerializerOutputStream stream) throws IOException {
final SerdesProvider serdes = getSerdesByClassName(object.getClass().getName());
stream.writeString(object.getClass().getName());
serdes.serialize(object, stream);
}
@Override
protected LogMinerEvent deserialize(SerializerInputStream stream) throws IOException {
final String clazzName = stream.readString();
final SerdesProvider<?> serdes = getSerdesByClassName(clazzName);
final DeserializationContext context = new DeserializationContext();
serdes.deserialize(context, stream);
return constructObject(serdes.getJavaType(), context);
}
private void registerSerdesProviders() {
registerSerdes(new DmlEventSerdesProvider<>());
registerSerdes(new LobEraseEventSerdesProvider<>());
registerSerdes(new LobWriteEventSerdesProvider<>());
registerSerdes(new LogMinerEventSerdesProvider<>());
registerSerdes(new RedoSqlDmlEventSerdesProvider<>());
registerSerdes(new SelectLobLocatorSerdesProvider<>());
registerSerdes(new TruncateEventSerdesProvider<>());
registerSerdes(new XmlBeginEventSerdesProvider<>());
registerSerdes(new XmlEndEventSerdesProvider<>());
registerSerdes(new XmlWriteEventSerdesProvider<>());
}
private <T> void registerSerdes(SerdesProvider<T> serdesProvider) {
serdesProviders.put(serdesProvider.getJavaType().getName(), serdesProvider);
}
private SerdesProvider<?> getSerdesByClassName(String clazzName) {
final SerdesProvider<?> provider = serdesProviders.get(clazzName);
if (provider == null) {
throw new SerializerException("Failed to find SerdesProvider for class: " + clazzName);
}
return provider;
}
private LogMinerEvent constructObject(Class<?> clazz, DeserializationContext context) {
try {
final List<Object> values = context.getValues();
// The constructor cache allows for maintaining a reference to the matching class constructor
// used after the first match lookup to improve serialization performance.
final Constructor<?> constructor = constructorCache.computeIfAbsent(clazz, (classType) -> {
final Class<?>[] parameterTypes = getParameterTypes(values);
final Constructor<?> result = getMatchingConstructor(clazz, parameterTypes);
if (result == null) {
throw new SerializerException("Failed to find matching constructor for argument types: " + Arrays.toString(parameterTypes));
}
return result;
});
return (LogMinerEvent) constructor.newInstance(values.toArray());
}
catch (Exception e) {
throw new SerializerException("Failed to construct object of type " + clazz.getName(), e);
}
}
private Class<?>[] getParameterTypes(List<Object> values) {
return values.stream().map(Object::getClass).toArray(Class<?>[]::new);
}
private Constructor<?> getMatchingConstructor(Class<?> clazz, Class<?>[] parameterTypes) {
for (Constructor<?> constructor : clazz.getConstructors()) {
final Class<?>[] paramTypes = constructor.getParameterTypes();
if (paramTypes.length == parameterTypes.length) {
boolean matches = true;
for (int i = 0; i < paramTypes.length; i++) {
if (!paramTypes[i].isAssignableFrom(parameterTypes[i])
&& !parameterTypes[i].isAssignableFrom(paramTypes[i])
&& !isWrapperPrimitiveMatch(paramTypes[i], parameterTypes[i])) {
matches = false;
break;
}
}
if (matches) {
return constructor;
}
}
}
return null;
}
private static boolean isWrapperPrimitiveMatch(Class<?> paramType, Class<?> argType) {
if (paramType.isPrimitive()) {
return argType.equals(primitiveToWrapperMap.get(paramType));
}
return false;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link RedoSqlDmlEvent} types.
*
* @author Chris Cranford
*/
public class RedoSqlDmlEventSerdesProvider<T extends RedoSqlDmlEvent> extends DmlEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return RedoSqlDmlEvent.class;
}
@Override
public void serialize(RedoSqlDmlEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
stream.writeString(event.getRedoSql());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
context.addValue(stream.readString());
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link SelectLobLocatorEvent} types.
*
* @author Chris Cranford
*/
public class SelectLobLocatorSerdesProvider<T extends SelectLobLocatorEvent> extends DmlEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return SelectLobLocatorEvent.class;
}
@Override
public void serialize(SelectLobLocatorEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
stream.writeString(event.getColumnName());
stream.writeBoolean(event.isBinary());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
context.addValue(stream.readString());
context.addValue(stream.readBoolean());
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
/**
* Defines the contract for supplying a Serdes (Serializer/Deserializer tuple) for the Ehcache
* transaction event cache that stores different types of events.
*
* @author Chris Cranford
*/
public interface SerdesProvider<T> {
/**
* Get the java type for the serialization/deserialization provider.
*
* @return the java class type
*/
Class<?> getJavaType();
/**
* Serializes the object into the output stream.
*
* @param object the object to be serialized, should not be {@code null}
* @param stream the output stream, should not be {@code null}
* @throws IOException thrown if there is a problem serializing the data to the output stream
*/
void serialize(T object, SerializerOutputStream stream) throws IOException;
/**
* Deserializes the input stream.
*
* @param context the deserialization context, should not be {@code null}
* @param stream the input stream to be read, should not be {@code null}
* @throws IOException thrown if there is a problem deserializing the data from the input stream
*/
void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException;
}

View File

@ -0,0 +1,148 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.util.Arrays;
import java.util.Objects;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.relational.TableId;
/**
* A reader implementation enabling the reading of data from an {@link InputStream}.
*
* @author Chris Cranford
*/
public class SerializerInputStream extends AbstractSerializerStream {
private final DataInputStream delegate;
public SerializerInputStream(InputStream inputStream) {
this.delegate = new DataInputStream(inputStream);
}
@Override
public void close() throws Exception {
delegate.close();
}
/**
* Reads an Oracle SCN value from the stream.
*
* @return the read SCN value or {@link Scn#NULL} if null
* @throws IOException when a read operation fails
*/
public Scn readScn() throws IOException {
final String value = readString();
return value == null ? Scn.NULL : Scn.valueOf(value);
}
/**
* Reads a Debezium {@link TableId} from the stream.
*
* @return the table id, never {@code null}
* @throws IOException when a read operation fails
*/
public TableId readTableId() throws IOException {
return TableId.parse(readString());
}
/**
* Read an {@link Instant} from the stream.
*
* @return the read instant value, never {@code null}
* @throws IOException when a read operation fails
*/
public Instant readInstant() throws IOException {
return Instant.parse(readString());
}
/**
* Read a boolean value from the stream.
*
* @return the boolean value
* @throws IOException when a read operation fails
*/
public boolean readBoolean() throws IOException {
return delegate.readBoolean();
}
/**
* Read an integer value from the stream.
*
* @return the integer value
* @throws IOException when a read operation fails
*/
public int readInt() throws IOException {
return delegate.readInt();
}
/**
* Read a string value from the stream.
*
* @return the string value or {@code null} when null
* @throws IOException when a read operation fails
*/
public String readString() throws IOException {
// Strings are serialized with a boolean flag preceding the string indicating nullability.
boolean isNull = delegate.readBoolean();
if (isNull) {
return null;
}
return delegate.readUTF();
}
/**
* Reads an object array from the stream.
*
* @return the object array, never {@code null}
* @throws IOException when a read operation fails
*/
public Object[] readObjectArray() throws IOException {
return stringArrayToObjectArray(readStringArray());
}
/**
* Reads a string array from the stream.
*
* @return the string array, never {@code null}
* @throws IOException when a read operation fails
*/
protected String[] readStringArray() throws IOException {
final int size = readInt();
final String[] data = new String[size];
for (int i = 0; i < size; i++) {
data[i] = readString();
}
return data;
}
/**
* Convert an array of string values, typically from the encoded stream back into their
* Java object representations after being deserialized.
*
* @param values array of string values to convert
* @return array of objects after conversion
*/
protected Object[] stringArrayToObjectArray(String[] values) {
Objects.requireNonNull(values);
Object[] results = Arrays.copyOf(values, values.length, Object[].class);
for (int i = 0; i < results.length; ++i) {
if (results[i].equals(NULL_VALUE_SENTINEL)) {
results[i] = null;
}
else if (results[i].equals(UNAVAILABLE_VALUE_SENTINEL)) {
results[i] = OracleValueConverters.UNAVAILABLE_VALUE;
}
}
return results;
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Objects;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.relational.TableId;
/**
* @author Chris Cranford
*/
public class SerializerOutputStream extends AbstractSerializerStream {
private final DataOutputStream delegate;
public SerializerOutputStream(OutputStream outputStream) {
this.delegate = new DataOutputStream(outputStream);
}
@Override
public void close() throws Exception {
delegate.close();
}
/**
* Writes an Oracle SCN value to the stream.
*
* @param scn the SCN value
* @throws IOException when a write operation fails
*/
public void writeScn(Scn scn) throws IOException {
writeString(scn == null || scn.isNull() ? null : scn.toString());
}
/**
* Writes a {@link TableId} to the stream.
*
* @param tableId the table id, should not be {@code null}
* @throws IOException when a write operation fails
*/
public void writeTableId(TableId tableId) throws IOException {
Objects.requireNonNull(tableId);
writeString(tableId.toDoubleQuotedString());
}
/**
* Writes an {@link Instant} to the stream.
* @param instant the instant to be written, should not be {@code null}
* @throws IOException when a write operation fails
*/
public void writeInstant(Instant instant) throws IOException {
Objects.requireNonNull(instant);
writeString(instant.toString());
}
/**
* Writes a boolean value to the stream.
*
* @param value the boolean value to write
* @throws IOException when a write operation fails
*/
public void writeBoolean(boolean value) throws IOException {
delegate.writeBoolean(value);
}
/**
* Write an integer value to the stream.
*
* @param value the integer value to write
* @throws IOException when a write operation fails
*/
public void writeInt(int value) throws IOException {
delegate.writeInt(value);
}
/**
* Write a string value to the stream.
*
* @param value the string value to write, can be {@code null}
* @throws IOException when a write operation fails
*/
public void writeString(String value) throws IOException {
// When writing to a DataOutputStream, the writeUTF method does not accept a null string value.
// To account for this, Debezium will write a preceding boolean flag indicating nullability,
// and the string will only be written if the string is not null.
if (value == null) {
delegate.writeBoolean(true);
}
else {
delegate.writeBoolean(false);
delegate.writeUTF(value);
}
}
/**
* Write an object array to the stream.
*
* @param values the object array to write, should not be {@code null}
* @throws IOException when a write operation fails
*/
public void writeObjectArray(Object[] values) throws IOException {
Objects.requireNonNull(values);
writeStringArray(objectArrayToStringArray(values));
}
/**
* Writes a string array to the stream.
*
* @param values the string array to write, should not be {@code null}
* @throws IOException when a write operation fails
*/
protected void writeStringArray(String[] values) throws IOException {
Objects.requireNonNull(values);
writeInt(values.length);
for (String value : values) {
writeString(value);
}
}
/**
* Convert an array of object values to a string array for serialization. This is typically
* used when preparing the {@link LogMinerDmlEntry} values array for serialization.
*
* @param values the dml entry's new or old object values array
* @return string array of values prepared for serialization
*/
protected String[] objectArrayToStringArray(Object[] values) {
Objects.requireNonNull(values);
String[] results = new String[values.length];
for (int i = 0; i < values.length; ++i) {
if (values[i] == null) {
results[i] = NULL_VALUE_SENTINEL;
}
else if (values[i] == OracleValueConverters.UNAVAILABLE_VALUE) {
results[i] = UNAVAILABLE_VALUE_SENTINEL;
}
else {
results[i] = (String) values[i];
}
}
return results;
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link TruncateEvent} types.
*
* @author Chris Cranford
*/
public class TruncateEventSerdesProvider<T extends TruncateEvent> extends DmlEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return TruncateEvent.class;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.XmlBeginEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link XmlBeginEvent} types.
*
* @author Chris Cranford
*/
public class XmlBeginEventSerdesProvider<T extends XmlBeginEvent> extends DmlEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return XmlBeginEvent.class;
}
@Override
public void serialize(XmlBeginEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
stream.writeString(event.getColumnName());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
context.addValue(stream.readString());
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import io.debezium.connector.oracle.logminer.events.XmlEndEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link XmlEndEvent} types.
*
* @author Chris Cranford
*/
public class XmlEndEventSerdesProvider<T extends XmlEndEvent> extends LogMinerEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return XmlEndEvent.class;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer.processor.ehcache.serialization;
import java.io.IOException;
import io.debezium.connector.oracle.logminer.events.XmlWriteEvent;
/**
* A specialized implementation of {@link SerdesProvider} for {@link XmlWriteEvent} types.
*
* @author Chris Cranford
*/
public class XmlWriteEventSerdesProvider<T extends XmlWriteEvent> extends LogMinerEventSerdesProvider<T> {
@Override
public Class<?> getJavaType() {
return XmlWriteEvent.class;
}
@Override
public void serialize(XmlWriteEvent event, SerializerOutputStream stream) throws IOException {
super.serialize(event, stream);
stream.writeString(event.getXml());
stream.writeInt(event.getLength());
}
@Override
public void deserialize(DeserializationContext context, SerializerInputStream stream) throws IOException {
super.deserialize(context, stream);
context.addValue(stream.readString());
context.addValue(stream.readInt());
}
}

View File

@ -178,6 +178,15 @@ else if (adapter().equals(ConnectorAdapter.OLR)) {
builder.with("log.mining.buffer." + ConfigurationProperties.AUTH_PASSWORD, INFINISPAN_PASS); builder.with("log.mining.buffer." + ConfigurationProperties.AUTH_PASSWORD, INFINISPAN_PASS);
} }
} }
else if (bufferType.isEhcache()) {
final long cacheSize = 1024 * 1024 * 10; // 10Mb each
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, bufferTypeName);
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_STORAGE_PATH, "./target/data");
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_TRANSACTIONS_BYTES, cacheSize);
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_PROCESSED_TRANSACTIONS_BYTES, cacheSize);
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_SCHEMA_CHANGES_BYTES, cacheSize);
builder.with(OracleConnectorConfig.LOG_MINING_BUFFER_EHCACHE_CACHE_EVENTS_BYTES, cacheSize);
}
builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true); builder.withDefault(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, true);
} }