DBZ-5749 Convert Redis store into standard store module

This commit is contained in:
Jiri Pechanec 2022-10-21 08:21:49 +02:00 committed by Chris Cranford
parent ad8935be3b
commit 27441b87da
13 changed files with 696 additions and 441 deletions

View File

@ -0,0 +1,63 @@
<?xml version="1.0"?>
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>store</id>
<formats>
<format>tar.gz</format>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>${project.artifactId}</outputDirectory>
<unpack>false</unpack>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<!-- Exclude dependencies of Kafka APIs, since they will be available in the runtime -->
<exclude>com.fasterxml.jackson.core:jackson-core:*</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind:*</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations:*</exclude>
<exclude>com.fasterxml.jackson.datatype:jackson-datatype-jsr310:*</exclude>
<!-- Exclude guava dependencies -->
<exclude>com.google.guava:listenablefuture:*</exclude>
<!-- Exclude dependencies with incorrect scope -->
<exclude>org.checkerframework:checker-qual:*</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>${project.artifactId}</outputDirectory>
<unpack>false</unpack>
<includes>
<include>${project.groupId}:${project.artifactId}:*</include>
</includes>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>
<!-- Get the files from the top-level directory, which should be two levels above the storages -->
<directory>${project.basedir}/../..</directory>
<outputDirectory>${project.artifactId}</outputDirectory>
<includes>
<include>README*</include>
<include>CHANGELOG*</include>
<include>CONTRIBUTE*</include>
<include>COPYRIGHT*</include>
<include>LICENSE*</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
</fileSet>
<fileSet>
<directory>${project.build.directory}/generated-sources</directory>
<outputDirectory>${project.artifactId}</outputDirectory>
<includes>
<include>*.json</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
</fileSet>
</fileSets>
</assembly>

View File

@ -40,6 +40,9 @@
<version.groovy>3.0.7</version.groovy>
<version.graalvm.js>20.0.0</version.graalvm.js>
<!-- Storages -->
<version.jedis>4.1.1</version.jedis>
<!-- Testing -->
<version.junit>4.13.1</version.junit>
<version.fest>1.4</version.fest>
@ -423,6 +426,13 @@
<scope>provided</scope>
</dependency>
<!--Storages -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version.jedis}</version>
</dependency>
<!-- Testing utilities -->
<dependency>
<groupId>org.testcontainers</groupId>
@ -534,10 +544,10 @@
<artifactId>debezium-scripting</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting-languages</artifactId>
<type>pom</type>
<type>pom</type>
<version>${project.version}</version>
</dependency>
<dependency>
@ -620,6 +630,11 @@
<artifactId>debezium-quarkus-outbox-deployment</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Debezium test artifacts -->
<dependency>

View File

@ -16,7 +16,6 @@
<version.pubsub>25.0.0</version.pubsub>
<version.pulsar>2.10.1</version.pulsar>
<version.eventhubs>5.12.1</version.eventhubs>
<version.jedis>4.1.1</version.jedis>
<version.pravega>0.9.1</version.pravega>
<version.nats>2.8.0</version.nats>
<version.stan>2.2.3</version.stan>
@ -161,11 +160,6 @@
<artifactId>azure-messaging-eventhubs</artifactId>
<version>${version.eventhubs}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version.jedis}</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>

View File

@ -16,6 +16,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage-redis</artifactId>
</dependency>
<!-- Target systems -->
<dependency>

View File

@ -5,201 +5,22 @@
*/
package io.debezium.server.redis;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.smallrye.mutiny.Uni;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
/**
* Implementation of OffsetBackingStore that saves to Redis
* @author Oren Elias
* Deprecated and replaced with {@link io.debezium.storage.redis.offset.RedisOffsetBackingStore}
*
*/
public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
@Deprecated
public class RedisOffsetBackingStore extends io.debezium.storage.redis.offset.RedisOffsetBackingStore {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis.";
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDefault(DEFAULT_REDIS_KEY_NAME);
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Jedis client = null;
private Map<String, String> config;
private Integer initialRetryDelay;
private Integer maxRetryDelay;
private Integer connectionTimeout;
private Integer socketTimeout;
public RedisOffsetBackingStore() {
}
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME);
}
@Override
public void configure(WorkerConfig config) {
super.configure(config);
this.config = config.originalsStrings();
this.address = this.config.get(PROP_ADDRESS.name());
this.user = this.config.get(PROP_USER.name());
this.password = this.config.get(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name()));
this.redisKeyName = Optional.ofNullable(
this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME);
// load retry settings
this.initialRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY);
this.maxRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
// load connection timeout settings
this.connectionTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT);
this.socketTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT);
}
@Override
public synchronized void start() {
super.start();
LOGGER.info("Starting RedisOffsetBackingStore");
this.connect();
this.load();
}
@Override
public synchronized void stop() {
super.stop();
// Nothing to do since this doesn't maintain any outstanding connections/data
LOGGER.info("Stopped RedisOffsetBackingStore");
}
/**
* Load offsets from redis keys
*/
private void load() {
// fetch the value from Redis
Map<String, String> offsets = Uni.createFrom().item(() -> {
return (Map<String, String>) client.hgetAll(this.redisKeyName);
})
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Reading from offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
})
// retry on failure with backoff
.onFailure().retry().withBackOff(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay)).indefinitely()
// write success trace message
.invoke(
item -> {
LOGGER.trace("Offsets fetched from redis: " + item);
})
.await().indefinitely();
this.data = new HashMap<>();
for (Map.Entry<String, String> mapEntry : offsets.entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey().getBytes()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue().getBytes()) : null;
data.put(key, value);
}
}
/**
* Save offsets to redis keys
*/
@Override
protected void save() {
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
// set the value in Redis
Uni.createFrom().item(() -> {
return (Long) client.hset(this.redisKeyName.getBytes(), key, value);
})
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Writing to offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
})
// retry on failure with backoff
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(2)).indefinitely()
// write success trace message
.invoke(
item -> {
LOGGER.trace("Record written to offset store in redis: " + value);
})
.await().indefinitely();
}
LOGGER.warn("Class '{}' is deprecated and scheduled for removal, please use '{}'",
RedisOffsetBackingStore.class.getName(),
io.debezium.storage.redis.offset.RedisOffsetBackingStore.class.getName());
}
}

View File

@ -5,254 +5,22 @@
*/
package io.debezium.server.redis;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Throwables;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.resps.StreamEntry;
/**
* A {@link SchemaHistory} implementation that stores the schema history in Redis.
* Deprecated and replaced with {@link io.debezium.storage.redis.history.RedisSchemaHistory}
*
*/
@ThreadSafe
public final class RedisSchemaHistory extends AbstractSchemaHistory {
private static final String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "redis.";
@Deprecated
public final class RedisSchemaHistory extends io.debezium.storage.redis.history.RedisSchemaHistory {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDefault("metadata:debezium:schema_history");
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
Duration initialRetryDelay;
Duration maxRetryDelay;
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY);
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
private Configuration config;
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Integer connectionTimeout;
private Integer socketTimeout;
private Jedis client = null;
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY);
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
this.config = config;
// fetch the properties
this.address = this.config.getString(PROP_ADDRESS.name());
this.user = this.config.getString(PROP_USER.name());
this.password = this.config.getString(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name()));
this.redisKeyName = this.config.getString(PROP_KEY);
LOGGER.info("rediskeyname:" + this.redisKeyName);
// load retry settings
this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY));
this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY));
// load connection timeout settings
this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT);
this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT);
super.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@Override
public synchronized void start() {
super.start();
LOGGER.info("Starting RedisSchemaHistory");
this.connect();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
String line;
try {
line = writer.write(record.document());
}
catch (IOException e) {
Throwables.logErrorAndTraceRecord(LOGGER, record, "Failed to convert record to string", e);
throw new SchemaHistoryException("Unable to write database schema history record");
}
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
boolean completedSuccessfully = false;
// loop and retry until successful
while (!completedSuccessfully) {
try {
if (client == null) {
this.connect();
}
// write the entry to Redis
client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line));
LOGGER.trace("Record written to database schema history in redis: " + line);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
}
catch (Exception e) {
LOGGER.warn("Writing to database schema history stream failed", e);
LOGGER.warn("Will retry");
}
if (!completedSuccessfully) {
// Failed to execute the transaction, retry...
delayStrategy.sleepWhen(!completedSuccessfully);
}
}
}
@Override
public void stop() {
running.set(false);
if (client != null) {
client.disconnect();
}
super.stop();
}
@Override
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
boolean completedSuccessfully = false;
List<StreamEntry> entries = new ArrayList<StreamEntry>();
// loop and retry until successful
while (!completedSuccessfully) {
try {
if (client == null) {
this.connect();
}
// read the entries from Redis
entries = client.xrange(
this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
}
catch (Exception e) {
LOGGER.warn("Reading from database schema history stream failed with " + e);
LOGGER.warn("Will retry");
}
if (!completedSuccessfully) {
// Failed to execute the transaction, retry...
delayStrategy.sleepWhen(!completedSuccessfully);
}
}
for (StreamEntry item : entries) {
try {
records.accept(new HistoryRecord(reader.read(item.getFields().get("schema"))));
}
catch (IOException e) {
LOGGER.error("Failed to convert record to string: {}", item, e);
return;
}
}
}
@Override
public boolean storageExists() {
return true;
}
@Override
public boolean exists() {
// check if the stream is not empty
if (client != null && client.xlen(this.redisKeyName) > 0) {
return true;
}
else {
return false;
}
public RedisSchemaHistory() {
LOGGER.warn("Class '{}' is deprecated and scheduled for removal, please use '{}'",
RedisSchemaHistory.class.getName(),
io.debezium.storage.redis.history.RedisSchemaHistory.class.getName());
}
}

View File

@ -32,6 +32,7 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import redis.clients.jedis.Jedis;
@ -53,6 +54,8 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class);
private static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink";
private static final String PROP_PREFIX = "debezium.sink.redis.";
private static final String PROP_ADDRESS = PROP_PREFIX + "address";
private static final String PROP_USER = PROP_PREFIX + "user";
@ -122,7 +125,7 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) {
}
RedisConnection redisConnection = new RedisConnection(address, user, password, connectionTimeout, socketTimeout, sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_REDIS_SINK_CLIENT_NAME);
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME);
}
@PreDestroy

View File

@ -0,0 +1,117 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-storage</artifactId>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-storage-redis</artifactId>
<name>Debezium Storage Redis Module</name>
<packaging>jar</packaging>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${quarkus.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
</dependency>
</dependencies>
<!--
Define several useful profiles
-->
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.redis;
package io.debezium.storage.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,9 +20,8 @@
public class RedisConnection {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisConnection.class);
protected static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink";
protected static final String DEBEZIUM_OFFSETS_CLIENT_NAME = "debezium:offsets";
protected static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history";
public static final String DEBEZIUM_OFFSETS_CLIENT_NAME = "debezium:offsets";
public static final String DEBEZIUM_SCHEMA_HISTORY = "debezium:schema_history";
private String address;
private String user;

View File

@ -0,0 +1,259 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis.history;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Throwables;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.resps.StreamEntry;
/**
* A {@link SchemaHistory} implementation that stores the schema history in Redis.
*
*/
@ThreadSafe
public class RedisSchemaHistory extends AbstractSchemaHistory {
private static final String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "redis.";
private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDefault("metadata:debezium:schema_history");
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
Duration initialRetryDelay;
Duration maxRetryDelay;
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY);
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
private Configuration config;
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Integer connectionTimeout;
private Integer socketTimeout;
private Jedis client = null;
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY);
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
this.config = config;
// fetch the properties
this.address = this.config.getString(PROP_ADDRESS.name());
this.user = this.config.getString(PROP_USER.name());
this.password = this.config.getString(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name()));
this.redisKeyName = this.config.getString(PROP_KEY);
LOGGER.info("rediskeyname:" + this.redisKeyName);
// load retry settings
this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY));
this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY));
// load connection timeout settings
this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT);
this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT);
super.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@Override
public synchronized void start() {
super.start();
LOGGER.info("Starting RedisSchemaHistory");
this.connect();
}
@Override
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
String line;
try {
line = writer.write(record.document());
}
catch (IOException e) {
Throwables.logErrorAndTraceRecord(LOGGER, record, "Failed to convert record to string", e);
throw new SchemaHistoryException("Unable to write database schema history record");
}
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
boolean completedSuccessfully = false;
// loop and retry until successful
while (!completedSuccessfully) {
try {
if (client == null) {
this.connect();
}
// write the entry to Redis
client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line));
LOGGER.trace("Record written to database schema history in redis: " + line);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
}
catch (Exception e) {
LOGGER.warn("Writing to database schema history stream failed", e);
LOGGER.warn("Will retry");
}
if (!completedSuccessfully) {
// Failed to execute the transaction, retry...
delayStrategy.sleepWhen(!completedSuccessfully);
}
}
}
@Override
public void stop() {
running.set(false);
if (client != null) {
client.disconnect();
}
super.stop();
}
@Override
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
boolean completedSuccessfully = false;
List<StreamEntry> entries = new ArrayList<StreamEntry>();
// loop and retry until successful
while (!completedSuccessfully) {
try {
if (client == null) {
this.connect();
}
// read the entries from Redis
entries = client.xrange(
this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
}
catch (Exception e) {
LOGGER.warn("Reading from database schema history stream failed with " + e);
LOGGER.warn("Will retry");
}
if (!completedSuccessfully) {
// Failed to execute the transaction, retry...
delayStrategy.sleepWhen(!completedSuccessfully);
}
}
for (StreamEntry item : entries) {
try {
records.accept(new HistoryRecord(reader.read(item.getFields().get("schema"))));
}
catch (IOException e) {
LOGGER.error("Failed to convert record to string: {}", item, e);
return;
}
}
}
@Override
public boolean storageExists() {
return true;
}
@Override
public boolean exists() {
// check if the stream is not empty
if (client != null && client.xlen(this.redisKeyName) > 0) {
return true;
}
else {
return false;
}
}
}

View File

@ -0,0 +1,206 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis.offset;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.debezium.storage.redis.RedisConnection;
import io.smallrye.mutiny.Uni;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
/**
* Implementation of OffsetBackingStore that saves to Redis
* @author Oren Elias
*/
public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis.";
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDefault(DEFAULT_REDIS_KEY_NAME);
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Jedis client = null;
private Map<String, String> config;
private Integer initialRetryDelay;
private Integer maxRetryDelay;
private Integer connectionTimeout;
private Integer socketTimeout;
public RedisOffsetBackingStore() {
}
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME);
}
@Override
public void configure(WorkerConfig config) {
super.configure(config);
this.config = config.originalsStrings();
this.address = this.config.get(PROP_ADDRESS.name());
this.user = this.config.get(PROP_USER.name());
this.password = this.config.get(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name()));
this.redisKeyName = Optional.ofNullable(
this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME);
// load retry settings
this.initialRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY);
this.maxRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
// load connection timeout settings
this.connectionTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT);
this.socketTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT);
}
@Override
public synchronized void start() {
super.start();
LOGGER.info("Starting RedisOffsetBackingStore");
this.connect();
this.load();
}
@Override
public synchronized void stop() {
super.stop();
// Nothing to do since this doesn't maintain any outstanding connections/data
LOGGER.info("Stopped RedisOffsetBackingStore");
}
/**
* Load offsets from redis keys
*/
private void load() {
// fetch the value from Redis
Map<String, String> offsets = Uni.createFrom().item(() -> {
return (Map<String, String>) client.hgetAll(this.redisKeyName);
})
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Reading from offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
})
// retry on failure with backoff
.onFailure().retry().withBackOff(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay)).indefinitely()
// write success trace message
.invoke(
item -> {
LOGGER.trace("Offsets fetched from redis: " + item);
})
.await().indefinitely();
this.data = new HashMap<>();
for (Map.Entry<String, String> mapEntry : offsets.entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey().getBytes()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue().getBytes()) : null;
data.put(key, value);
}
}
/**
* Save offsets to redis keys
*/
@Override
protected void save() {
for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : data.entrySet()) {
byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
// set the value in Redis
Uni.createFrom().item(() -> {
return (Long) client.hset(this.redisKeyName.getBytes(), key, value);
})
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Writing to offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
this.connect();
})
// retry on failure with backoff
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(2)).indefinitely()
// write success trace message
.invoke(
item -> {
LOGGER.trace("Record written to offset store in redis: " + value);
})
.await().indefinitely();
}
}
}

View File

@ -11,9 +11,15 @@
<name>Debezium Storage Module</name>
<packaging>pom</packaging>
<properties>
<!-- Assembly configuration -->
<assembly.descriptor>storage-distribution</assembly.descriptor>
</properties>
<modules>
<module>debezium-storage-kafka</module>
<module>debezium-storage-file</module>
<module>debezium-storage-redis</module>
</modules>
</project>

View File

@ -198,7 +198,7 @@ The source configuration uses the same configuration properties that are describ
|[[debezium-source-offset-storage]]<<debezium-source-offset-storage, `debezium.source.offset.storage`>>
|`org.apache.kafka.connect.storage.FileOffsetBackingStore`
|Class to use for storing and retrieving offsets for non-Kafka deployments.
To use Redis to store offsets, use `io.debezium.server.redis.RedisOffsetBackingStore`
To use Redis to store offsets, use `io.debezium.storage.redis.offset.RedisOffsetBackingStore`
|[[debezium-source-offset-storage-file-filename]]<<debezium-source-offset-storage-file-filename, `debezium.source.offset.storage.file.filename`>>
|
@ -237,7 +237,7 @@ There are also other options available
* `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments
* `io.debezium.server.redis.RedisSchemaHistory` volatile store for test environments
* `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments
|[[debezium-source-database-history-file-filename]]<<debezium-source-database-history-file-filename, `debezium.source.schema.history.internal.file.filename`>>
|