diff --git a/debezium-assembly-descriptors/src/main/resources/assemblies/storage-distribution.xml b/debezium-assembly-descriptors/src/main/resources/assemblies/storage-distribution.xml new file mode 100644 index 000000000..3bb125ecb --- /dev/null +++ b/debezium-assembly-descriptors/src/main/resources/assemblies/storage-distribution.xml @@ -0,0 +1,63 @@ + + + store + + tar.gz + zip + + false + + + ${project.artifactId} + false + runtime + false + true + + + com.fasterxml.jackson.core:jackson-core:* + com.fasterxml.jackson.core:jackson-databind:* + com.fasterxml.jackson.core:jackson-annotations:* + com.fasterxml.jackson.datatype:jackson-datatype-jsr310:* + + + com.google.guava:listenablefuture:* + + + org.checkerframework:checker-qual:* + + + + ${project.artifactId} + false + + ${project.groupId}:${project.artifactId}:* + + + + + + + ${project.basedir}/../.. + ${project.artifactId} + + README* + CHANGELOG* + CONTRIBUTE* + COPYRIGHT* + LICENSE* + + true + + + ${project.build.directory}/generated-sources + ${project.artifactId} + + *.json + + true + + + diff --git a/debezium-bom/pom.xml b/debezium-bom/pom.xml index 1a45e520a..5a6461cd8 100644 --- a/debezium-bom/pom.xml +++ b/debezium-bom/pom.xml @@ -40,6 +40,9 @@ 3.0.7 20.0.0 + + 4.1.1 + 4.13.1 1.4 @@ -423,6 +426,13 @@ provided + + + redis.clients + jedis + ${version.jedis} + + org.testcontainers @@ -534,10 +544,10 @@ debezium-scripting ${project.version} - + io.debezium debezium-scripting-languages - pom + pom ${project.version} @@ -620,6 +630,11 @@ debezium-quarkus-outbox-deployment ${project.version} + + io.debezium + debezium-storage-redis + ${project.version} + diff --git a/debezium-server/debezium-server-bom/pom.xml b/debezium-server/debezium-server-bom/pom.xml index a83dd220f..8338f88e6 100644 --- a/debezium-server/debezium-server-bom/pom.xml +++ b/debezium-server/debezium-server-bom/pom.xml @@ -16,7 +16,6 @@ 25.0.0 2.10.1 5.12.1 - 4.1.1 0.9.1 2.8.0 2.2.3 @@ -161,11 +160,6 @@ azure-messaging-eventhubs ${version.eventhubs} - - redis.clients - jedis - ${version.jedis} - io.nats jnats diff --git a/debezium-server/debezium-server-redis/pom.xml b/debezium-server/debezium-server-redis/pom.xml index 5cacc7e3a..02d431221 100644 --- a/debezium-server/debezium-server-redis/pom.xml +++ b/debezium-server/debezium-server-redis/pom.xml @@ -16,6 +16,10 @@ io.debezium debezium-server-core + + io.debezium + debezium-storage-redis + diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisOffsetBackingStore.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisOffsetBackingStore.java index a9774d6c1..36039c41f 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisOffsetBackingStore.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisOffsetBackingStore.java @@ -5,201 +5,22 @@ */ package io.debezium.server.redis; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -import org.apache.kafka.connect.runtime.WorkerConfig; -import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.config.Field; -import io.smallrye.mutiny.Uni; - -import redis.clients.jedis.Jedis; -import redis.clients.jedis.exceptions.JedisConnectionException; - /** - * Implementation of OffsetBackingStore that saves to Redis - * @author Oren Elias + * Deprecated and replaced with {@link io.debezium.storage.redis.offset.RedisOffsetBackingStore} + * */ -public class RedisOffsetBackingStore extends MemoryOffsetBackingStore { +@Deprecated +public class RedisOffsetBackingStore extends io.debezium.storage.redis.offset.RedisOffsetBackingStore { private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class); - private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis."; - public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled") - .withDescription("Use SSL for Redis connection") - .withDefault("false"); - - public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets"; - public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key") - .withDescription("The redis key that will be used to store the database schema history") - .withDefault(DEFAULT_REDIS_KEY_NAME); - - public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300; - public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms") - .withDescription("Initial retry delay (in ms)") - .withDefault(DEFAULT_RETRY_INITIAL_DELAY); - - public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000; - public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms") - .withDescription("Maximum retry delay (in ms)") - .withDefault(DEFAULT_RETRY_MAX_DELAY); - - public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000; - public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms") - .withDescription("Connection timeout (in ms)") - .withDefault(DEFAULT_CONNECTION_TIMEOUT); - - public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000; - public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms") - .withDescription("Socket timeout (in ms)") - .withDefault(DEFAULT_SOCKET_TIMEOUT); - - private String redisKeyName; - private String address; - private String user; - private String password; - private boolean sslEnabled; - - private Jedis client = null; - private Map config; - - private Integer initialRetryDelay; - private Integer maxRetryDelay; - - private Integer connectionTimeout; - private Integer socketTimeout; - public RedisOffsetBackingStore() { - - } - - void connect() { - RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled); - client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME); - } - - @Override - public void configure(WorkerConfig config) { - super.configure(config); - this.config = config.originalsStrings(); - - this.address = this.config.get(PROP_ADDRESS.name()); - this.user = this.config.get(PROP_USER.name()); - this.password = this.config.get(PROP_PASSWORD.name()); - this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name())); - - this.redisKeyName = Optional.ofNullable( - this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME); - // load retry settings - this.initialRetryDelay = Optional.ofNullable( - Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY); - this.maxRetryDelay = Optional.ofNullable( - Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY); - // load connection timeout settings - this.connectionTimeout = Optional.ofNullable( - Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT); - this.socketTimeout = Optional.ofNullable( - Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT); - - } - - @Override - public synchronized void start() { - super.start(); - LOGGER.info("Starting RedisOffsetBackingStore"); - this.connect(); - this.load(); - } - - @Override - public synchronized void stop() { - super.stop(); - // Nothing to do since this doesn't maintain any outstanding connections/data - LOGGER.info("Stopped RedisOffsetBackingStore"); - } - - /** - * Load offsets from redis keys - */ - private void load() { - // fetch the value from Redis - Map offsets = Uni.createFrom().item(() -> { - return (Map) client.hgetAll(this.redisKeyName); - }) - // handle failures and retry - .onFailure().invoke( - f -> { - LOGGER.warn("Reading from offset store failed with " + f); - LOGGER.warn("Will retry"); - }) - .onFailure(JedisConnectionException.class).invoke( - f -> { - LOGGER.warn("Attempting to reconnect to redis "); - this.connect(); - }) - // retry on failure with backoff - .onFailure().retry().withBackOff(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay)).indefinitely() - // write success trace message - .invoke( - item -> { - LOGGER.trace("Offsets fetched from redis: " + item); - }) - .await().indefinitely(); - this.data = new HashMap<>(); - for (Map.Entry mapEntry : offsets.entrySet()) { - ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey().getBytes()) : null; - ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue().getBytes()) : null; - data.put(key, value); - } - } - - /** - * Save offsets to redis keys - */ - @Override - protected void save() { - for (Map.Entry mapEntry : data.entrySet()) { - byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; - byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; - // set the value in Redis - Uni.createFrom().item(() -> { - return (Long) client.hset(this.redisKeyName.getBytes(), key, value); - }) - // handle failures and retry - .onFailure().invoke( - f -> { - LOGGER.warn("Writing to offset store failed with " + f); - LOGGER.warn("Will retry"); - }) - .onFailure(JedisConnectionException.class).invoke( - f -> { - LOGGER.warn("Attempting to reconnect to redis "); - this.connect(); - }) - // retry on failure with backoff - .onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(2)).indefinitely() - // write success trace message - .invoke( - item -> { - LOGGER.trace("Record written to offset store in redis: " + value); - }) - .await().indefinitely(); - } + LOGGER.warn("Class '{}' is deprecated and scheduled for removal, please use '{}'", + RedisOffsetBackingStore.class.getName(), + io.debezium.storage.redis.offset.RedisOffsetBackingStore.class.getName()); } } diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisSchemaHistory.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisSchemaHistory.java index 847b8d929..19444673a 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisSchemaHistory.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisSchemaHistory.java @@ -5,254 +5,22 @@ */ package io.debezium.server.redis; -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; - -import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.annotation.ThreadSafe; -import io.debezium.config.Configuration; -import io.debezium.config.Field; -import io.debezium.document.DocumentReader; -import io.debezium.document.DocumentWriter; -import io.debezium.relational.history.AbstractSchemaHistory; -import io.debezium.relational.history.HistoryRecord; -import io.debezium.relational.history.HistoryRecordComparator; -import io.debezium.relational.history.SchemaHistory; -import io.debezium.relational.history.SchemaHistoryException; -import io.debezium.relational.history.SchemaHistoryListener; -import io.debezium.util.Collect; -import io.debezium.util.DelayStrategy; -import io.debezium.util.Throwables; - -import redis.clients.jedis.Jedis; -import redis.clients.jedis.StreamEntryID; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.resps.StreamEntry; - /** - * A {@link SchemaHistory} implementation that stores the schema history in Redis. + * Deprecated and replaced with {@link io.debezium.storage.redis.history.RedisSchemaHistory} * */ -@ThreadSafe -public final class RedisSchemaHistory extends AbstractSchemaHistory { - private static final String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "redis."; +@Deprecated +public final class RedisSchemaHistory extends io.debezium.storage.redis.history.RedisSchemaHistory { - private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class); - public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled") - .withDescription("Use SSL for Redis connection") - .withDefault("false"); - - public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password") - .withDescription("The redis url that will be used to access the database schema history"); - - public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key") - .withDescription("The redis key that will be used to store the database schema history") - .withDefault("metadata:debezium:schema_history"); - - public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300; - public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms") - .withDescription("Initial retry delay (in ms)") - .withDefault(DEFAULT_RETRY_INITIAL_DELAY); - - public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000; - public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms") - .withDescription("Maximum retry delay (in ms)") - .withDefault(DEFAULT_RETRY_MAX_DELAY); - - public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000; - public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms") - .withDescription("Connection timeout (in ms)") - .withDefault(DEFAULT_CONNECTION_TIMEOUT); - - public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000; - public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms") - .withDescription("Socket timeout (in ms)") - .withDefault(DEFAULT_SOCKET_TIMEOUT); - - Duration initialRetryDelay; - Duration maxRetryDelay; - - public static Collection ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY); - - private final DocumentWriter writer = DocumentWriter.defaultWriter(); - private final DocumentReader reader = DocumentReader.defaultReader(); - private final AtomicBoolean running = new AtomicBoolean(); - private Configuration config; - private String redisKeyName; - private String address; - private String user; - private String password; - private boolean sslEnabled; - private Integer connectionTimeout; - private Integer socketTimeout; - - private Jedis client = null; - - void connect() { - RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled); - client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY); - } - - @Override - public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { - if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { - throw new ConnectException( - "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); - } - this.config = config; - // fetch the properties - this.address = this.config.getString(PROP_ADDRESS.name()); - this.user = this.config.getString(PROP_USER.name()); - this.password = this.config.getString(PROP_PASSWORD.name()); - this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name())); - this.redisKeyName = this.config.getString(PROP_KEY); - LOGGER.info("rediskeyname:" + this.redisKeyName); - // load retry settings - this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY)); - this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY)); - // load connection timeout settings - this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT); - this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT); - - super.configure(config, comparator, listener, useCatalogBeforeSchema); - } - - @Override - public synchronized void start() { - super.start(); - LOGGER.info("Starting RedisSchemaHistory"); - this.connect(); - } - - @Override - protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { - if (record == null) { - return; - } - String line; - try { - line = writer.write(record.document()); - } - catch (IOException e) { - Throwables.logErrorAndTraceRecord(LOGGER, record, "Failed to convert record to string", e); - throw new SchemaHistoryException("Unable to write database schema history record"); - } - - DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay); - boolean completedSuccessfully = false; - - // loop and retry until successful - while (!completedSuccessfully) { - try { - if (client == null) { - this.connect(); - } - - // write the entry to Redis - client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line)); - LOGGER.trace("Record written to database schema history in redis: " + line); - completedSuccessfully = true; - } - catch (JedisConnectionException jce) { - LOGGER.warn("Attempting to reconnect to redis "); - this.connect(); - } - catch (Exception e) { - LOGGER.warn("Writing to database schema history stream failed", e); - LOGGER.warn("Will retry"); - } - if (!completedSuccessfully) { - // Failed to execute the transaction, retry... - delayStrategy.sleepWhen(!completedSuccessfully); - } - - } - } - - @Override - public void stop() { - running.set(false); - if (client != null) { - client.disconnect(); - } - super.stop(); - } - - @Override - protected synchronized void recoverRecords(Consumer records) { - DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay); - boolean completedSuccessfully = false; - List entries = new ArrayList(); - - // loop and retry until successful - while (!completedSuccessfully) { - try { - if (client == null) { - this.connect(); - } - - // read the entries from Redis - entries = client.xrange( - this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null); - completedSuccessfully = true; - } - catch (JedisConnectionException jce) { - LOGGER.warn("Attempting to reconnect to redis "); - this.connect(); - } - catch (Exception e) { - LOGGER.warn("Reading from database schema history stream failed with " + e); - LOGGER.warn("Will retry"); - } - if (!completedSuccessfully) { - // Failed to execute the transaction, retry... - delayStrategy.sleepWhen(!completedSuccessfully); - } - - } - - for (StreamEntry item : entries) { - try { - records.accept(new HistoryRecord(reader.read(item.getFields().get("schema")))); - } - catch (IOException e) { - LOGGER.error("Failed to convert record to string: {}", item, e); - return; - } - } - - } - - @Override - public boolean storageExists() { - return true; - } - - @Override - public boolean exists() { - // check if the stream is not empty - if (client != null && client.xlen(this.redisKeyName) > 0) { - return true; - } - else { - return false; - } + public RedisSchemaHistory() { + LOGGER.warn("Class '{}' is deprecated and scheduled for removal, please use '{}'", + RedisSchemaHistory.class.getName(), + io.debezium.storage.redis.history.RedisSchemaHistory.class.getName()); } } diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java index bbc44f1b5..a771215ee 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java @@ -32,6 +32,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; import io.debezium.server.BaseChangeConsumer; +import io.debezium.storage.redis.RedisConnection; import io.debezium.util.DelayStrategy; import redis.clients.jedis.Jedis; @@ -53,6 +54,8 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class); + private static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink"; + private static final String PROP_PREFIX = "debezium.sink.redis."; private static final String PROP_ADDRESS = PROP_PREFIX + "address"; private static final String PROP_USER = PROP_PREFIX + "user"; @@ -122,7 +125,7 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) { } RedisConnection redisConnection = new RedisConnection(address, user, password, connectionTimeout, socketTimeout, sslEnabled); - client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_REDIS_SINK_CLIENT_NAME); + client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME); } @PreDestroy diff --git a/debezium-storage/debezium-storage-redis/pom.xml b/debezium-storage/debezium-storage-redis/pom.xml new file mode 100644 index 000000000..b51d04178 --- /dev/null +++ b/debezium-storage/debezium-storage-redis/pom.xml @@ -0,0 +1,117 @@ + + + + io.debezium + debezium-storage + 2.1.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-storage-redis + Debezium Storage Redis Module + jar + + + + + io.quarkus + quarkus-bom + ${quarkus.version} + pom + import + + + + + + + io.debezium + debezium-api + provided + + + io.debezium + debezium-core + provided + + + org.slf4j + slf4j-api + provided + + + org.apache.kafka + connect-api + provided + + + org.apache.kafka + connect-runtime + provided + + + org.apache.kafka + kafka-log4j-appender + + + log4j + log4j + + + + + redis.clients + jedis + + + io.smallrye.reactive + mutiny + + + + + + + assembly + + false + + + + + org.apache.maven.plugins + maven-assembly-plugin + ${version.assembly.plugin} + + + io.debezium + debezium-assembly-descriptors + ${project.version} + + + + + default + package + + single + + + ${project.artifactId}-${project.version} + true + + ${assembly.descriptor} + + posix + + + + + + + + + + diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisConnection.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java similarity index 86% rename from debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisConnection.java rename to debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java index 2307280f5..1216257cf 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisConnection.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisConnection.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.server.redis; +package io.debezium.storage.redis; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +20,8 @@ public class RedisConnection { private static final Logger LOGGER = LoggerFactory.getLogger(RedisConnection.class); - protected static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink"; - protected static final String DEBEZIUM_OFFSETS_CLIENT_NAME = "debezium:offsets"; - protected static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history"; + public static final String DEBEZIUM_OFFSETS_CLIENT_NAME = "debezium:offsets"; + public static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history"; private String address; private String user; diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java new file mode 100644 index 000000000..a083e0d14 --- /dev/null +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java @@ -0,0 +1,259 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.redis.history; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.AbstractSchemaHistory; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.relational.history.SchemaHistory; +import io.debezium.relational.history.SchemaHistoryException; +import io.debezium.relational.history.SchemaHistoryListener; +import io.debezium.storage.redis.RedisConnection; +import io.debezium.util.Collect; +import io.debezium.util.DelayStrategy; +import io.debezium.util.Throwables; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.resps.StreamEntry; + +/** + * A {@link SchemaHistory} implementation that stores the schema history in Redis. + * + */ +@ThreadSafe +public class RedisSchemaHistory extends AbstractSchemaHistory { + + private static final String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "redis."; + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class); + + public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled") + .withDescription("Use SSL for Redis connection") + .withDefault("false"); + + public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key") + .withDescription("The redis key that will be used to store the database schema history") + .withDefault("metadata:debezium:schema_history"); + + public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300; + public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms") + .withDescription("Initial retry delay (in ms)") + .withDefault(DEFAULT_RETRY_INITIAL_DELAY); + + public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000; + public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms") + .withDescription("Maximum retry delay (in ms)") + .withDefault(DEFAULT_RETRY_MAX_DELAY); + + public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000; + public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms") + .withDescription("Connection timeout (in ms)") + .withDefault(DEFAULT_CONNECTION_TIMEOUT); + + public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000; + public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms") + .withDescription("Socket timeout (in ms)") + .withDefault(DEFAULT_SOCKET_TIMEOUT); + + Duration initialRetryDelay; + Duration maxRetryDelay; + + public static Collection ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY); + + private final DocumentWriter writer = DocumentWriter.defaultWriter(); + private final DocumentReader reader = DocumentReader.defaultReader(); + private final AtomicBoolean running = new AtomicBoolean(); + private Configuration config; + private String redisKeyName; + private String address; + private String user; + private String password; + private boolean sslEnabled; + private Integer connectionTimeout; + private Integer socketTimeout; + + private Jedis client = null; + + void connect() { + RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled); + client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY); + } + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { + if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) { + throw new ConnectException( + "Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); + } + this.config = config; + // fetch the properties + this.address = this.config.getString(PROP_ADDRESS.name()); + this.user = this.config.getString(PROP_USER.name()); + this.password = this.config.getString(PROP_PASSWORD.name()); + this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name())); + this.redisKeyName = this.config.getString(PROP_KEY); + LOGGER.info("rediskeyname:" + this.redisKeyName); + // load retry settings + this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY)); + this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY)); + // load connection timeout settings + this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT); + this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT); + + super.configure(config, comparator, listener, useCatalogBeforeSchema); + } + + @Override + public synchronized void start() { + super.start(); + LOGGER.info("Starting RedisSchemaHistory"); + this.connect(); + } + + @Override + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { + if (record == null) { + return; + } + String line; + try { + line = writer.write(record.document()); + } + catch (IOException e) { + Throwables.logErrorAndTraceRecord(LOGGER, record, "Failed to convert record to string", e); + throw new SchemaHistoryException("Unable to write database schema history record"); + } + + DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay); + boolean completedSuccessfully = false; + + // loop and retry until successful + while (!completedSuccessfully) { + try { + if (client == null) { + this.connect(); + } + + // write the entry to Redis + client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line)); + LOGGER.trace("Record written to database schema history in redis: " + line); + completedSuccessfully = true; + } + catch (JedisConnectionException jce) { + LOGGER.warn("Attempting to reconnect to redis "); + this.connect(); + } + catch (Exception e) { + LOGGER.warn("Writing to database schema history stream failed", e); + LOGGER.warn("Will retry"); + } + if (!completedSuccessfully) { + // Failed to execute the transaction, retry... + delayStrategy.sleepWhen(!completedSuccessfully); + } + + } + } + + @Override + public void stop() { + running.set(false); + if (client != null) { + client.disconnect(); + } + super.stop(); + } + + @Override + protected synchronized void recoverRecords(Consumer records) { + DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay); + boolean completedSuccessfully = false; + List entries = new ArrayList(); + + // loop and retry until successful + while (!completedSuccessfully) { + try { + if (client == null) { + this.connect(); + } + + // read the entries from Redis + entries = client.xrange( + this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null); + completedSuccessfully = true; + } + catch (JedisConnectionException jce) { + LOGGER.warn("Attempting to reconnect to redis "); + this.connect(); + } + catch (Exception e) { + LOGGER.warn("Reading from database schema history stream failed with " + e); + LOGGER.warn("Will retry"); + } + if (!completedSuccessfully) { + // Failed to execute the transaction, retry... + delayStrategy.sleepWhen(!completedSuccessfully); + } + + } + + for (StreamEntry item : entries) { + try { + records.accept(new HistoryRecord(reader.read(item.getFields().get("schema")))); + } + catch (IOException e) { + LOGGER.error("Failed to convert record to string: {}", item, e); + return; + } + } + + } + + @Override + public boolean storageExists() { + return true; + } + + @Override + public boolean exists() { + // check if the stream is not empty + if (client != null && client.xlen(this.redisKeyName) > 0) { + return true; + } + else { + return false; + } + } +} diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java new file mode 100644 index 000000000..9b4affa60 --- /dev/null +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/offset/RedisOffsetBackingStore.java @@ -0,0 +1,206 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.storage.redis.offset; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Field; +import io.debezium.storage.redis.RedisConnection; +import io.smallrye.mutiny.Uni; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisConnectionException; + +/** + * Implementation of OffsetBackingStore that saves to Redis + * @author Oren Elias + */ + +public class RedisOffsetBackingStore extends MemoryOffsetBackingStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class); + + private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis."; + public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled") + .withDescription("Use SSL for Redis connection") + .withDefault("false"); + + public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password") + .withDescription("The redis url that will be used to access the database schema history"); + + public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets"; + public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key") + .withDescription("The redis key that will be used to store the database schema history") + .withDefault(DEFAULT_REDIS_KEY_NAME); + + public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300; + public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms") + .withDescription("Initial retry delay (in ms)") + .withDefault(DEFAULT_RETRY_INITIAL_DELAY); + + public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000; + public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms") + .withDescription("Maximum retry delay (in ms)") + .withDefault(DEFAULT_RETRY_MAX_DELAY); + + public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000; + public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms") + .withDescription("Connection timeout (in ms)") + .withDefault(DEFAULT_CONNECTION_TIMEOUT); + + public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000; + public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms") + .withDescription("Socket timeout (in ms)") + .withDefault(DEFAULT_SOCKET_TIMEOUT); + + private String redisKeyName; + private String address; + private String user; + private String password; + private boolean sslEnabled; + + private Jedis client = null; + private Map config; + + private Integer initialRetryDelay; + private Integer maxRetryDelay; + + private Integer connectionTimeout; + private Integer socketTimeout; + + public RedisOffsetBackingStore() { + + } + + void connect() { + RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled); + client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME); + } + + @Override + public void configure(WorkerConfig config) { + super.configure(config); + this.config = config.originalsStrings(); + + this.address = this.config.get(PROP_ADDRESS.name()); + this.user = this.config.get(PROP_USER.name()); + this.password = this.config.get(PROP_PASSWORD.name()); + this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name())); + + this.redisKeyName = Optional.ofNullable( + this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME); + // load retry settings + this.initialRetryDelay = Optional.ofNullable( + Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY); + this.maxRetryDelay = Optional.ofNullable( + Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY); + // load connection timeout settings + this.connectionTimeout = Optional.ofNullable( + Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT); + this.socketTimeout = Optional.ofNullable( + Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT); + + } + + @Override + public synchronized void start() { + super.start(); + LOGGER.info("Starting RedisOffsetBackingStore"); + this.connect(); + this.load(); + } + + @Override + public synchronized void stop() { + super.stop(); + // Nothing to do since this doesn't maintain any outstanding connections/data + LOGGER.info("Stopped RedisOffsetBackingStore"); + } + + /** + * Load offsets from redis keys + */ + private void load() { + // fetch the value from Redis + Map offsets = Uni.createFrom().item(() -> { + return (Map) client.hgetAll(this.redisKeyName); + }) + // handle failures and retry + .onFailure().invoke( + f -> { + LOGGER.warn("Reading from offset store failed with " + f); + LOGGER.warn("Will retry"); + }) + .onFailure(JedisConnectionException.class).invoke( + f -> { + LOGGER.warn("Attempting to reconnect to redis "); + this.connect(); + }) + // retry on failure with backoff + .onFailure().retry().withBackOff(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay)).indefinitely() + // write success trace message + .invoke( + item -> { + LOGGER.trace("Offsets fetched from redis: " + item); + }) + .await().indefinitely(); + this.data = new HashMap<>(); + for (Map.Entry mapEntry : offsets.entrySet()) { + ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey().getBytes()) : null; + ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue().getBytes()) : null; + data.put(key, value); + } + } + + /** + * Save offsets to redis keys + */ + @Override + protected void save() { + for (Map.Entry mapEntry : data.entrySet()) { + byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; + byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; + // set the value in Redis + Uni.createFrom().item(() -> { + return (Long) client.hset(this.redisKeyName.getBytes(), key, value); + }) + // handle failures and retry + .onFailure().invoke( + f -> { + LOGGER.warn("Writing to offset store failed with " + f); + LOGGER.warn("Will retry"); + }) + .onFailure(JedisConnectionException.class).invoke( + f -> { + LOGGER.warn("Attempting to reconnect to redis "); + this.connect(); + }) + // retry on failure with backoff + .onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(2)).indefinitely() + // write success trace message + .invoke( + item -> { + LOGGER.trace("Record written to offset store in redis: " + value); + }) + .await().indefinitely(); + } + } +} diff --git a/debezium-storage/pom.xml b/debezium-storage/pom.xml index cf45b8f7a..aed93fdfb 100644 --- a/debezium-storage/pom.xml +++ b/debezium-storage/pom.xml @@ -11,9 +11,15 @@ Debezium Storage Module pom + + + storage-distribution + + debezium-storage-kafka debezium-storage-file + debezium-storage-redis diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index c431b1be4..ed9fcdce9 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -198,7 +198,7 @@ The source configuration uses the same configuration properties that are describ |[[debezium-source-offset-storage]]<> |`org.apache.kafka.connect.storage.FileOffsetBackingStore` |Class to use for storing and retrieving offsets for non-Kafka deployments. -To use Redis to store offsets, use `io.debezium.server.redis.RedisOffsetBackingStore` +To use Redis to store offsets, use `io.debezium.storage.redis.offset.RedisOffsetBackingStore` |[[debezium-source-offset-storage-file-filename]]<> | @@ -237,7 +237,7 @@ There are also other options available * `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments * `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments -* `io.debezium.server.redis.RedisSchemaHistory` volatile store for test environments +* `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments |[[debezium-source-database-history-file-filename]]<> |