DBZ-5752 Redis Sink wait for Redis Replica writes

This commit is contained in:
ggaborg 2022-10-26 12:12:04 +03:00 committed by Jiri Pechanec
parent 28aa715867
commit fcb15dc722
11 changed files with 729 additions and 94 deletions

View File

@ -6,6 +6,7 @@
package io.debezium.server.redis;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@ -32,15 +33,11 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
/**
* Implementation of the consumer that delivers the messages into Redis (stream) destination.
*
@ -63,6 +60,10 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final String PROP_CONNECTION_TIMEOUT = PROP_PREFIX + "connection.timeout.ms";
private static final String PROP_SOCKET_TIMEOUT = PROP_PREFIX + "socket.timeout.ms";
private static final String PROP_MESSAGE_FORMAT = PROP_PREFIX + "message.format";
private static final String PROP_WAIT_ENABLED = PROP_PREFIX + "wait.enabled";
private static final String PROP_WAIT_TIMEOUT = PROP_PREFIX + "wait.timeout.ms";
private static final String PROP_WAIT_RETRY_ENABLED = PROP_PREFIX + "wait.retry.enabled";
private static final String PROP_WAIT_RETRY_DELAY = PROP_PREFIX + "wait.retry.delay.ms";
private static final String MESSAGE_FORMAT_COMPACT = "compact";
private static final String MESSAGE_FORMAT_EXTENDED = "extended";
@ -93,7 +94,7 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
@ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default")
String nullValue;
private Jedis client = null;
private RedisClient client;
private BiFunction<String, String, Map<String, String>> recordMapFunction;
@ -124,8 +125,13 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) {
String.format("Property %s expects value one of '%s' or '%s'", PROP_MESSAGE_FORMAT, MESSAGE_FORMAT_EXTENDED, MESSAGE_FORMAT_COMPACT));
}
boolean waitEnabled = config.getOptionalValue(PROP_WAIT_ENABLED, Boolean.class).orElse(false);
long waitTimeout = config.getOptionalValue(PROP_WAIT_TIMEOUT, Long.class).orElse(1000L);
boolean waitRetryEnabled = config.getOptionalValue(PROP_WAIT_RETRY_ENABLED, Boolean.class).orElse(false);
long waitRetryDelay = config.getOptionalValue(PROP_WAIT_RETRY_DELAY, Long.class).orElse(1000L);
RedisConnection redisConnection = new RedisConnection(address, user, password, connectionTimeout, socketTimeout, sslEnabled);
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME);
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, waitEnabled, waitTimeout, waitRetryEnabled, waitRetryDelay);
}
@PreDestroy
@ -186,33 +192,23 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
}
}
else {
Pipeline pipeline;
try {
LOGGER.trace("Preparing a Redis Pipeline of {} records", clonedBatch.size());
// Make sure the connection is still alive before creating the pipeline
// to reduce the chance of ending up with duplicate records
client.ping();
pipeline = client.pipelined();
// Add the batch records to the stream(s) via Pipeline
List<SimpleEntry<String, Map<String, String>>> recordsMap = new ArrayList<>(clonedBatch.size());
for (ChangeEvent<Object, Object> record : clonedBatch) {
String destination = streamNameMapper.map(record.destination());
String key = (record.key() != null) ? getString(record.key()) : nullKey;
String value = (record.value() != null) ? getString(record.value()) : nullValue;
Map<String, String> recordMap = recordMapFunction.apply(key, value);
// Add the record to the destination stream
pipeline.xadd(destination, StreamEntryID.NEW_ENTRY, recordMap);
recordsMap.add(new SimpleEntry<>(destination, recordMap));
}
// Sync the pipeline in Redis and parse the responses (response per command with the same order)
List<Object> responses = pipeline.syncAndReturnAll();
List<String> responses = client.xadd(recordsMap);
List<ChangeEvent<Object, Object>> processedRecords = new ArrayList<ChangeEvent<Object, Object>>();
int index = 0;
int totalOOMResponses = 0;
for (Object response : responses) {
String message = response.toString();
for (String message : responses) {
// When Redis reaches its max memory limitation, an OOM error message will be retrieved.
// In this case, we will retry execute the failed commands, assuming some memory will be freed eventually as result
// of evicting elements from the stream by the target DB.
@ -239,21 +235,10 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
completedSuccessfully = true;
}
}
catch (JedisConnectionException jce) {
catch (RedisClientConnectionException jce) {
LOGGER.error("Connection error", jce);
close();
}
catch (JedisDataException jde) {
// When Redis is starting, a JedisDataException will be thrown with this message.
// We will retry communicating with the target DB as once of the Redis is available, this message will be gone.
if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
LOGGER.error("Redis is starting", jde);
}
else {
LOGGER.error("Unexpected JedisDataException", jde);
throw new DebeziumException(jde);
}
}
catch (Exception e) {
LOGGER.error("Unexpected Exception", e);
throw new DebeziumException(e);

View File

@ -67,6 +67,13 @@
<groupId>io.smallrye.reactive</groupId>
<artifactId>mutiny</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!--

View File

@ -0,0 +1,123 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
public class JedisClient implements RedisClient {
private static final Logger LOGGER = LoggerFactory.getLogger(JedisClient.class);
private final Jedis jedis;
public JedisClient(Jedis jedis) {
this.jedis = jedis;
}
@Override
public void disconnect() {
tryErrors(() -> jedis.disconnect());
}
@Override
public void close() {
tryErrors(() -> jedis.close());
}
@Override
public String xadd(String key, Map<String, String> hash) {
return tryErrors(() -> jedis.xadd(key, (StreamEntryID) null, hash).toString());
}
@Override
public List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes) {
return tryErrors(() -> {
try {
// Make sure the connection is still alive before creating the pipeline
// to reduce the chance of ending up with duplicate records
jedis.ping();
Pipeline pipeline = jedis.pipelined();
hashes.forEach((hash) -> pipeline.xadd(hash.getKey(), StreamEntryID.NEW_ENTRY, hash.getValue()));
return pipeline.syncAndReturnAll().stream().map(response -> response.toString()).collect(Collectors.toList());
}
catch (JedisDataException jde) {
// When Redis is starting, a JedisDataException will be thrown with this message.
// We will retry communicating with the target DB as once of the Redis is available, this message will be gone.
if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
LOGGER.error("Redis is starting", jde);
}
else {
LOGGER.error("Unexpected JedisDataException", jde);
throw new DebeziumException(jde);
}
}
return Collections.emptyList();
});
}
@Override
public List<Map<String, String>> xrange(String key) {
return tryErrors(() -> jedis.xrange(key, (StreamEntryID) null, (StreamEntryID) null).stream().map(item -> item.getFields()).collect(Collectors.toList()));
}
@Override
public long xlen(String key) {
return tryErrors(() -> jedis.xlen(key));
}
@Override
public Map<String, String> hgetAll(String key) {
return tryErrors(() -> jedis.hgetAll(key));
}
@Override
public long hset(byte[] key, byte[] field, byte[] value) {
return tryErrors(() -> jedis.hset(key, field, value));
}
@Override
public long waitReplicas(int replicas, long timeout) {
return tryErrors(() -> jedis.waitReplicas(replicas, timeout));
}
@Override
public String toString() {
return "JedisClient [jedis=" + jedis + "]";
}
private void tryErrors(Runnable runnable) {
tryErrors(() -> {
runnable.run();
return null;
});
}
private <R> R tryErrors(Supplier<R> supplier) {
try {
return supplier.get();
}
catch (JedisConnectionException e) {
throw new RedisClientConnectionException(e);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map;
public interface RedisClient {
/**
*
* @throws RedisClientConnectionException
*/
void disconnect();
/**
*
* @throws RedisClientConnectionException
*/
void close();
/**
*
* @param key
* @param hash
* @return
* @throws RedisClientConnectionException
*/
String xadd(String key, Map<String, String> hash);
/**
*
* @param hashes
* @return
* @throws RedisClientConnectionException
*/
List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes);
/**
*
* @param key
* @return
* @throws RedisClientConnectionException
*/
List<Map<String, String>> xrange(String key);
/**
*
* @param key
* @return
* @throws RedisClientConnectionException
*/
long xlen(String key);
/**
*
* @param key
* @return
* @throws RedisClientConnectionException
*/
Map<String, String> hgetAll(String key);
/**
*
* @param key
* @param field
* @param value
* @return
* @throws RedisClientConnectionException
*/
long hset(byte[] key, byte[] field, byte[] value);
/**
*
* @param replicas
* @param timeout
* @return
* @throws RedisClientConnectionException
*/
long waitReplicas(int replicas, long timeout);
}

View File

@ -0,0 +1,16 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
public class RedisClientConnectionException extends RuntimeException {
private static final long serialVersionUID = -4315965419500005492L;
public RedisClientConnectionException(Throwable cause) {
super(cause);
}
}

View File

@ -8,8 +8,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
/**
@ -30,6 +33,15 @@ public class RedisConnection {
private int socketTimeout;
private boolean sslEnabled;
/**
*
* @param address
* @param user
* @param password
* @param connectionTimeout
* @param socketTimeout
* @param sslEnabled
*/
public RedisConnection(String address, String user, String password, int connectionTimeout, int socketTimeout, boolean sslEnabled) {
this.address = address;
this.user = user;
@ -39,10 +51,26 @@ public RedisConnection(String address, String user, String password, int connect
this.sslEnabled = sslEnabled;
}
public Jedis getRedisClient(String clientName) {
/**
*
* @param clientName
* @param waitEnabled
* @param waitTimeout
* @param waitRetry
* @param waitRetryDelay
* @return
* @throws RedisClientConnectionException
*/
public RedisClient getRedisClient(String clientName, boolean waitEnabled, long waitTimeout, boolean waitRetry, long waitRetryDelay) {
if (waitEnabled && waitTimeout <= 0) {
throw new DebeziumException("Redis client wait timeout should be positive");
}
HostAndPort address = HostAndPort.from(this.address);
Jedis client = new Jedis(address.getHost(), address.getPort(), this.connectionTimeout, this.socketTimeout, this.sslEnabled);
Jedis client;
try {
client = new Jedis(address.getHost(), address.getPort(), this.connectionTimeout, this.socketTimeout, this.sslEnabled);
if (this.user != null) {
client.auth(this.user, this.password);
@ -61,9 +89,18 @@ else if (this.password != null) {
catch (JedisDataException e) {
LOGGER.warn("Failed to set client name", e);
}
}
catch (JedisConnectionException e) {
throw new RedisClientConnectionException(e);
}
LOGGER.info("Using Jedis '{}'", client);
RedisClient jedisClient = new JedisClient(client);
return client;
// we use 1 for number of replicas as in Redis Enterprise there can be only one replica shard
RedisClient redisClient = waitEnabled ? new WaitReplicasRedisClient(jedisClient, 1, waitTimeout, waitRetry, waitRetryDelay) : jedisClient;
LOGGER.info("Using Redis client '{}'", redisClient);
return redisClient;
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.util.DelayStrategy;
public class WaitReplicasRedisClient implements RedisClient {
private static final Logger LOGGER = LoggerFactory.getLogger(WaitReplicasRedisClient.class);
private final RedisClient delegate;
private final int replicas;
private final long timeout;
private final boolean retry;
private final long delay;
public WaitReplicasRedisClient(RedisClient delegate, int replicas, long timeout, boolean retry, long delay) {
this.delegate = delegate;
this.replicas = replicas;
this.timeout = timeout;
this.retry = retry;
this.delay = delay;
}
@Override
public void disconnect() {
delegate.disconnect();
}
@Override
public void close() {
delegate.close();
}
@Override
public String xadd(String key, Map<String, String> hash) {
return waitResult(() -> delegate.xadd(key, hash));
}
@Override
public List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes) {
return waitResult(() -> delegate.xadd(hashes));
}
@Override
public List<Map<String, String>> xrange(String key) {
return delegate.xrange(key);
}
@Override
public long xlen(String key) {
return delegate.xlen(key);
}
@Override
public Map<String, String> hgetAll(String key) {
return delegate.hgetAll(key);
}
@Override
public long hset(byte[] key, byte[] field, byte[] value) {
return waitResult(() -> delegate.hset(key, field, value));
}
@Override
public long waitReplicas(int replicas, long timeout) {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
return "WaitReplicaRedisClient [delegate=" + delegate + ", replicas=" + replicas + ", timeout=" + timeout + ", retry=" + retry + ", delay=" + delay + "]";
}
private <R> R waitResult(Supplier<R> supplier) {
R result;
DelayStrategy delayStrategy = null;
do {
result = supplier.get();
long reachedReplicas = delegate.waitReplicas(replicas, timeout);
if (reachedReplicas != replicas) {
if (retry) {
LOGGER.error("Failed to update {} replica(s) in {} millis. Retrying in {} millis...", replicas, timeout, delay);
if (delayStrategy == null) {
delayStrategy = DelayStrategy.constant(Duration.ofMillis(delay));
}
delayStrategy.sleepWhen(true);
continue;
}
else {
LOGGER.warn("Failed to update {} replica(s) in {} millis.", replicas, timeout);
}
}
break;
} while (true);
return result;
}
}

View File

@ -11,6 +11,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@ -29,16 +30,13 @@
import io.debezium.relational.history.SchemaHistory;
import io.debezium.relational.history.SchemaHistoryException;
import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Loggings;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.resps.StreamEntry;
/**
* A {@link SchemaHistory} implementation that stores the schema history in Redis.
*
@ -51,20 +49,20 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDescription("The Redis key that will be used to store the database schema history")
.withDefault("metadata:debezium:schema_history");
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
@ -87,6 +85,20 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
.withDescription(
"Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.")
.withDefault(false);
private static final Field PROP_WAIT_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.timeout.ms")
.withDescription("Timeout when wait for replica")
.withDefault(1000L);
private static final Field PROP_WAIT_RETRY_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.enabled")
.withDescription("Enables retry on wait for replica failure")
.withDefault(false);
private static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for replica failure")
.withDefault(1000L);
Duration initialRetryDelay;
Duration maxRetryDelay;
@ -104,11 +116,16 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
private Integer connectionTimeout;
private Integer socketTimeout;
private Jedis client = null;
private boolean waitEnabled;
private long waitTimeout;
private boolean waitRetryEnabled;
private long waitRetryDelay;
private RedisClient client;
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, waitEnabled, waitTimeout, waitRetryEnabled, waitRetryDelay);
}
@Override
@ -132,6 +149,11 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT);
this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT);
this.waitEnabled = this.config.getBoolean(PROP_WAIT_ENABLED);
this.waitTimeout = this.config.getLong(PROP_WAIT_TIMEOUT);
this.waitRetryEnabled = this.config.getBoolean(PROP_WAIT_RETRY_ENABLED);
this.waitRetryDelay = this.config.getLong(PROP_WAIT_RETRY_DELAY);
super.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@ -167,12 +189,12 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
}
// write the entry to Redis
client.xadd(this.redisKeyName, (StreamEntryID) null, Collections.singletonMap("schema", line));
LOGGER.trace("Record written to database schema history in redis: " + line);
client.xadd(this.redisKeyName, Collections.singletonMap("schema", line));
LOGGER.trace("Record written to database schema history in Redis: " + line);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
catch (RedisClientConnectionException e) {
LOGGER.warn("Attempting to reconnect to Redis");
this.connect();
}
catch (Exception e) {
@ -200,7 +222,7 @@ public void stop() {
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
DelayStrategy delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
boolean completedSuccessfully = false;
List<StreamEntry> entries = new ArrayList<StreamEntry>();
List<Map<String, String>> entries = new ArrayList<>();
// loop and retry until successful
while (!completedSuccessfully) {
@ -210,12 +232,11 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
}
// read the entries from Redis
entries = client.xrange(
this.redisKeyName, (StreamEntryID) null, (StreamEntryID) null);
entries = client.xrange(this.redisKeyName);
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
LOGGER.warn("Attempting to reconnect to redis ");
catch (RedisClientConnectionException e) {
LOGGER.warn("Attempting to reconnect to Redis");
this.connect();
}
catch (Exception e) {
@ -229,9 +250,9 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
}
for (StreamEntry item : entries) {
for (Map<String, String> item : entries) {
try {
records.accept(new HistoryRecord(reader.read(item.getFields().get("schema"))));
records.accept(new HistoryRecord(reader.read(item.get("schema"))));
}
catch (IOException e) {
LOGGER.error("Failed to convert record to string: {}", item, e);

View File

@ -17,12 +17,11 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.smallrye.mutiny.Uni;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
/**
* Implementation of OffsetBackingStore that saves to Redis
* @author Oren Elias
@ -34,21 +33,21 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis.";
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The redis url that will be used to access the database schema history");
.withDescription("The Redis url that will be used to access the database schema history");
public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The redis key that will be used to store the database schema history")
.withDescription("The Redis key that will be used to store the database schema history")
.withDefault(DEFAULT_REDIS_KEY_NAME);
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
@ -71,13 +70,34 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final boolean DEFAULT_WAIT_ENABLED = false;
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
.withDescription(
"Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.")
.withDefault(DEFAULT_WAIT_ENABLED);
private static final long DEFAULT_WAIT_TIMEOUT = 1000L;
private static final Field PROP_WAIT_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.timeout.ms")
.withDescription("Timeout when wait for replica")
.withDefault(DEFAULT_WAIT_TIMEOUT);
private static final boolean DEFAULT_WAIT_RETRY_ENABLED = false;
private static final Field PROP_WAIT_RETRY_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.enabled")
.withDescription("Enables retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_ENABLED);
private static final long DEFAULT_WAIT_RETRY_DELAY = 1000L;
private static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_DELAY);
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Jedis client = null;
private RedisClient client;
private Map<String, String> config;
private Integer initialRetryDelay;
@ -86,13 +106,19 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private Integer connectionTimeout;
private Integer socketTimeout;
private boolean waitEnabled;
private long waitTimeout;
private boolean waitRetryEnabled;
private long waitRetryDelay;
public RedisOffsetBackingStore() {
}
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, this.waitEnabled, this.waitTimeout, this.waitRetryEnabled,
this.waitRetryDelay);
}
@Override
@ -118,6 +144,10 @@ public void configure(WorkerConfig config) {
this.socketTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT);
this.waitEnabled = Optional.ofNullable(Boolean.getBoolean(this.config.get(PROP_WAIT_ENABLED.name()))).orElse(DEFAULT_WAIT_ENABLED);
this.waitTimeout = Optional.ofNullable(Long.getLong(this.config.get(PROP_WAIT_TIMEOUT.name()))).orElse(DEFAULT_WAIT_TIMEOUT);
this.waitRetryEnabled = Optional.ofNullable(Boolean.getBoolean(this.config.get(PROP_WAIT_RETRY_ENABLED.name()))).orElse(DEFAULT_WAIT_RETRY_ENABLED);
this.waitRetryDelay = Optional.ofNullable(Long.getLong(this.config.get(PROP_WAIT_RETRY_DELAY.name()))).orElse(DEFAULT_WAIT_RETRY_DELAY);
}
@Override
@ -136,7 +166,7 @@ public synchronized void stop() {
}
/**
* Load offsets from redis keys
* Load offsets from Redis keys
*/
private void load() {
// fetch the value from Redis
@ -146,12 +176,12 @@ private void load() {
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Reading from offset store failed with " + f);
LOGGER.warn("Reading from Redis offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
.onFailure(RedisClientConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
LOGGER.warn("Attempting to reconnect to Redis");
this.connect();
})
// retry on failure with backoff
@ -159,7 +189,7 @@ private void load() {
// write success trace message
.invoke(
item -> {
LOGGER.trace("Offsets fetched from redis: " + item);
LOGGER.trace("Offsets fetched from Redis: " + item);
})
.await().indefinitely();
this.data = new HashMap<>();
@ -171,7 +201,7 @@ private void load() {
}
/**
* Save offsets to redis keys
* Save offsets to Redis keys
*/
@Override
protected void save() {
@ -185,12 +215,12 @@ protected void save() {
// handle failures and retry
.onFailure().invoke(
f -> {
LOGGER.warn("Writing to offset store failed with " + f);
LOGGER.warn("Writing to Redis offset store failed with " + f);
LOGGER.warn("Will retry");
})
.onFailure(JedisConnectionException.class).invoke(
.onFailure(RedisClientConnectionException.class).invoke(
f -> {
LOGGER.warn("Attempting to reconnect to redis ");
LOGGER.warn("Attempting to reconnect to Redis");
this.connect();
})
// retry on failure with backoff
@ -198,7 +228,7 @@ protected void save() {
// write success trace message
.invoke(
item -> {
LOGGER.trace("Record written to offset store in redis: " + value);
LOGGER.trace("Offsets written to Redis: " + value);
})
.await().indefinitely();
}

View File

@ -0,0 +1,165 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
public class WaitReplicasRedisClientTest {
@Test
public void xaddOneNoRetry() {
String result = client(false).xadd(null, null);
assertEquals("2", result);
}
@Test
public void xaddOneRetry() {
String result = client(true).xadd(null, null);
assertEquals("0", result);
}
@Test
public void xaddAllNoRetry() {
List<String> result = client(false).xadd(null);
assertEquals("2", result.get(0));
}
@Test
public void xaddAllRetry() {
List<String> result = client(true).xadd(null);
assertEquals("0", result.get(0));
}
@Test
public void xrangeNoRetry() {
List<Map<String, String>> result = client(false).xrange("key");
assertEquals("2", result.get(0).get("key"));
}
@Test
public void xrangeRetry() {
List<Map<String, String>> result = client(true).xrange("key");
assertEquals("2", result.get(0).get("key"));
}
@Test
public void xlenNoRetry() {
long result = client(false).xlen("key");
assertEquals(2, result);
}
@Test
public void xlenRetry() {
long result = client(true).xlen("key");
assertEquals(2, result);
}
@Test
public void hgetAllNoRetry() {
Map<String, String> result = client(false).hgetAll("key");
assertEquals("2", result.get("key"));
}
@Test
public void hgetAllRetry() {
Map<String, String> result = client(true).hgetAll("key");
assertEquals("2", result.get("key"));
}
@Test
public void hsetNoRetry() {
long result = client(false).hset(null, null, null);
assertEquals(2, result);
}
@Test
public void hsetRetry() {
long result = client(true).hset(null, null, null);
assertEquals(0, result);
}
@Test
public void waitUnsupported() {
assertThrows(UnsupportedOperationException.class, () -> client(false).waitReplicas(0, 0));
}
private RedisClient client(boolean retry) {
RedisClient client = new RedisClientImpl(2);
RedisClient waitClient = new WaitReplicasRedisClient(client, 1, 1, retry, 1);
return waitClient;
}
private static class RedisClientImpl implements RedisClient {
private int errorCount;
private RedisClientImpl(int errorCount) {
this.errorCount = errorCount;
}
@Override
public void disconnect() throws RedisClientConnectionException {
}
@Override
public void close() throws RedisClientConnectionException {
}
@Override
public String xadd(String key, Map<String, String> hash) throws RedisClientConnectionException {
return errorCount();
}
@Override
public List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes) throws RedisClientConnectionException {
List<String> result = new ArrayList<>();
result.add(errorCount());
return result;
}
@Override
public List<Map<String, String>> xrange(String key) throws RedisClientConnectionException {
List<Map<String, String>> result = new ArrayList<>();
result.add(Collections.singletonMap(key, errorCount()));
return result;
}
@Override
public long xlen(String key) throws RedisClientConnectionException {
return errorCount;
}
@Override
public Map<String, String> hgetAll(String key) throws RedisClientConnectionException {
return Collections.singletonMap(key, errorCount());
}
@Override
public long hset(byte[] key, byte[] field, byte[] value) throws RedisClientConnectionException {
return errorCount;
}
@Override
public long waitReplicas(int replicas, long timeout) throws RedisClientConnectionException {
return replicas + errorCount--;
}
private String errorCount() {
return "" + errorCount;
}
}
}

View File

@ -228,6 +228,22 @@ To use Redis to store offsets, use `io.debezium.storage.redis.offset.RedisOffset
|
|(Optional) If using Redis to store offsets, define the hash key in redis. If the `redis.key` configuration is not supplied, and the default value is `metadata:debezium:offsets`
|[[debezium-source-offset-redis-wait-enabled]]<<redis-wait-enabled, `debezium.source.offset.storage.redis.wait.enabled`>>
|`false`
|If using Redis to store offsets, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[debezium-source-offset-redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.source.offset.storage.redis.wait.timeout.ms`>>
|`1000`
|If using Redis to store offsets, defines the timeout in milliseconds when waiting for replica. Must have a positive value.
|[[debezium-source-offset-redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.source.offset.storage.redis.wait.retry.enabled`>>
|`false`
|If using Redis to store offsets, enables retry on wait for replica failure.
|[[debezium-source-offset-redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.source.offset.storage.redis.wait.retry.delay.ms`>>
|`1000`
|If using Redis to store offsets, defines the delay of retry on wait for replica failure.
|[[debezium-source-database-history-class]]<<debezium-source-database-history-class, `debezium.source.schema.history.internal`>>
|`io.debezium.storage.kafka.history.KafkaSchemaHistory`
@ -279,6 +295,23 @@ There are also other options available
|
|Socket timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)
|[[debezium-source-database-history-redis-wait-enabled]]<<redis-wait-enabled, `debezium.source.schema.history.internal.redis.wait.enabled`>>
|`false`
|If using Redis to store schema history, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[debezium-source-database-history-redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.source.schema.history.internal.redis.wait.timeout.ms`>>
|`1000`
|If using Redis to store schema history, defines the timeout in milliseconds when waiting for replica. Must have a positive value.
|[[debezium-source-database-history-redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.source.schema.history.internal.redis.wait.retry.enabled`>>
|`false`
|If using Redis to store schema history, enables retry on wait for replica failure.
|[[debezium-source-database-history-redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.source.schema.history.internal.redis.wait.retry.delay.ms`>>
|`1000`
|If using Redis to store schema history, defines the delay of retry on wait for replica failure.
|===
[id="debezium-format-configuration-options"]
@ -855,6 +888,23 @@ This value will be doubled upon every retry but won't exceed `debezium.sink.redi
|`2000`
|Socket timeout for Redis client.
|[[redis-wait-enabled]]<<redis-wait-enabled, `debezium.sink.redis.wait.enabled`>>
|`false`
|Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.sink.redis.wait.timeout.ms`>>
|`1000`
|Timeout in milliseconds when waiting for replica. Must have a positive value.
|[[redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.sink.redis.wait.retry.enabled`>>
|`false`
|Enables retry on wait for replica failure.
|[[redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.sink.redis.wait.retry.delay.ms`>>
|`1000`
|Delay of retry on wait for replica failure.
|[[redis-message-format]]<<redis-message-format, `debezium.sink.redis.message.format`>>
|`compact`
|The format of the message sent to the Redis stream. Possible values are `extended`(newer format) and `compact`(the until now, old format).