DBZ-4720 Unification and validation for Redis sink, offset storage and schema history configuration

This commit is contained in:
ggaborg 2022-11-11 12:57:07 +02:00 committed by Jiri Pechanec
parent a6b2f0db3c
commit 4a70dccd7d
8 changed files with 412 additions and 290 deletions

View File

@ -5,6 +5,9 @@
*/
package io.debezium.server.redis;
import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_COMPACT;
import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_EXTENDED;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -27,13 +30,12 @@
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
@ -59,52 +61,12 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink";
private static final String PROP_PREFIX = "debezium.sink.redis.";
private static final String PROP_ADDRESS = PROP_PREFIX + "address";
private static final String PROP_USER = PROP_PREFIX + "user";
private static final String PROP_PASSWORD = PROP_PREFIX + "password";
private static final String PROP_CONNECTION_TIMEOUT = PROP_PREFIX + "connection.timeout.ms";
private static final String PROP_SOCKET_TIMEOUT = PROP_PREFIX + "socket.timeout.ms";
private static final String PROP_MESSAGE_FORMAT = PROP_PREFIX + "message.format";
private static final String PROP_WAIT_ENABLED = PROP_PREFIX + "wait.enabled";
private static final String PROP_WAIT_TIMEOUT = PROP_PREFIX + "wait.timeout.ms";
private static final String PROP_WAIT_RETRY_ENABLED = PROP_PREFIX + "wait.retry.enabled";
private static final String PROP_WAIT_RETRY_DELAY = PROP_PREFIX + "wait.retry.delay.ms";
private static final int DEFAULT_MEMORY_THRESHOLD_PERCENTAGE = 85;
private static final String PROP_MEMORY_THRESHOLD_PERCENTAGE = PROP_PREFIX + "memory.threshold.percentage";
private static final String INFO_MEMORY = "memory";
private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory";
private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory";
private static final String MESSAGE_FORMAT_COMPACT = "compact";
private static final String MESSAGE_FORMAT_EXTENDED = "extended";
private static final String EXTENDED_MESSAGE_KEY_KEY = "key";
private static final String EXTENDED_MESSAGE_VALUE_KEY = "value";
private String address;
private String user;
private String password;
private Integer connectionTimeout;
private Integer socketTimeout;
@ConfigProperty(name = PROP_PREFIX + "ssl.enabled", defaultValue = "false")
boolean sslEnabled;
@ConfigProperty(name = PROP_PREFIX + "batch.size", defaultValue = "500")
Integer batchSize;
@ConfigProperty(name = PROP_PREFIX + "retry.initial.delay.ms", defaultValue = "300")
Integer initialRetryDelay;
@ConfigProperty(name = PROP_PREFIX + "retry.max.delay.ms", defaultValue = "10000")
Integer maxRetryDelay;
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;
@ConfigProperty(name = PROP_PREFIX + "null.value", defaultValue = "default")
String nullValue;
private static final String INFO_MEMORY = "memory";
private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory";
private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory";
private RedisClient client;
@ -112,17 +74,13 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private Supplier<Boolean> isMemoryOk;
private RedisStreamChangeConsumerConfig config;
@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
address = config.getValue(PROP_ADDRESS, String.class);
user = config.getOptionalValue(PROP_USER, String.class).orElse(null);
password = config.getOptionalValue(PROP_PASSWORD, String.class).orElse(null);
connectionTimeout = config.getOptionalValue(PROP_CONNECTION_TIMEOUT, Integer.class).orElse(2000);
socketTimeout = config.getOptionalValue(PROP_SOCKET_TIMEOUT, Integer.class).orElse(2000);
String messageFormat = config.getOptionalValue(PROP_MESSAGE_FORMAT, String.class).orElse(MESSAGE_FORMAT_COMPACT);
LOGGER.info("Property {}={}", PROP_MESSAGE_FORMAT, messageFormat);
Configuration configuration = Configuration.from(getConfigSubset(ConfigProvider.getConfig(), ""));
config = new RedisStreamChangeConsumerConfig(configuration);
String messageFormat = config.getMessageFormat();
if (MESSAGE_FORMAT_EXTENDED.equals(messageFormat)) {
recordMapFunction = (key, value) -> {
Map<String, String> recordMap = new LinkedHashMap<>(2);
@ -134,24 +92,14 @@ void connect() {
else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) {
recordMapFunction = Collections::singletonMap;
}
else {
throw new DebeziumException(
String.format("Property %s expects value one of '%s' or '%s'", PROP_MESSAGE_FORMAT, MESSAGE_FORMAT_EXTENDED, MESSAGE_FORMAT_COMPACT));
}
int memoryThreshold = config.getOptionalValue(PROP_MEMORY_THRESHOLD_PERCENTAGE, Integer.class).orElse(DEFAULT_MEMORY_THRESHOLD_PERCENTAGE);
if (memoryThreshold < 0 || memoryThreshold > 100) {
throw new DebeziumException(String.format("Property %s should be between 0 and 100", PROP_MEMORY_THRESHOLD_PERCENTAGE));
}
int memoryThreshold = config.getMemoryThreshold();
isMemoryOk = memoryThreshold > 0 ? () -> isMemoryOk(memoryThreshold) : () -> true;
boolean waitEnabled = config.getOptionalValue(PROP_WAIT_ENABLED, Boolean.class).orElse(false);
long waitTimeout = config.getOptionalValue(PROP_WAIT_TIMEOUT, Long.class).orElse(1000L);
boolean waitRetryEnabled = config.getOptionalValue(PROP_WAIT_RETRY_ENABLED, Boolean.class).orElse(false);
long waitRetryDelay = config.getOptionalValue(PROP_WAIT_RETRY_DELAY, Long.class).orElse(1000L);
RedisConnection redisConnection = new RedisConnection(address, user, password, connectionTimeout, socketTimeout, sslEnabled);
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, waitEnabled, waitTimeout, waitRetryEnabled, waitRetryDelay);
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getUser(), config.getPassword(), config.getConnectionTimeout(),
config.getSocketTimeout(), config.isSslEnabled());
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(), config.isWaitRetryEnabled(),
config.getWaitRetryDelay());
}
@PreDestroy
@ -188,10 +136,10 @@ private <T> Stream<List<T>> batches(List<T> source, int length) {
public void handleBatch(List<ChangeEvent<Object, Object>> records,
RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
DelayStrategy delayStrategy = DelayStrategy.exponential(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay));
DelayStrategy delayStrategy = DelayStrategy.exponential(Duration.ofMillis(config.getInitialRetryDelay()), Duration.ofMillis(config.getMaxRetryDelay()));
LOGGER.trace("Handling a batch of {} records", records.size());
batches(records, batchSize).forEach(batch -> {
batches(records, config.getBatchSize()).forEach(batch -> {
boolean completedSuccessfully = false;
// Clone the batch and remove the records that have been successfully processed.
@ -218,8 +166,8 @@ else if (canHandleBatch()) {
List<SimpleEntry<String, Map<String, String>>> recordsMap = new ArrayList<>(clonedBatch.size());
for (ChangeEvent<Object, Object> record : clonedBatch) {
String destination = streamNameMapper.map(record.destination());
String key = (record.key() != null) ? getString(record.key()) : nullKey;
String value = (record.value() != null) ? getString(record.value()) : nullValue;
String key = (record.key() != null) ? getString(record.key()) : config.getNullKey();
String value = (record.value() != null) ? getString(record.value()) : config.getNullValue();
Map<String, String> recordMap = recordMapFunction.apply(key, value);
recordsMap.add(new SimpleEntry<>(destination, recordMap));
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.redis;
import java.util.List;
import java.util.Set;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Field.RangeValidator;
import io.debezium.storage.redis.RedisCommonConfig;
import io.debezium.util.Collect;
public class RedisStreamChangeConsumerConfig extends RedisCommonConfig {
private static final String PROP_PREFIX = "debezium.sink.";
private static final int DEFAULT_BATCH_SIZE = 500;
private static final Field PROP_BATCH_SIZE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "batch.size")
.withDefault(DEFAULT_BATCH_SIZE);
private static final String DEFAULT_NULL_KEY = "default";
private static final Field PROP_NULL_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "null.key")
.withDefault(DEFAULT_NULL_KEY);
private static final String DEFAULT_NULL_VALUE = "default";
private static final Field PROP_NULL_VALUE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "null.value")
.withDefault(DEFAULT_NULL_VALUE);
static final String MESSAGE_FORMAT_COMPACT = "compact";
static final String MESSAGE_FORMAT_EXTENDED = "extended";
private static final Field PROP_MESSAGE_FORMAT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "message.format")
.withAllowedValues(Set.of(MESSAGE_FORMAT_COMPACT, MESSAGE_FORMAT_EXTENDED))
.withDefault(MESSAGE_FORMAT_COMPACT);
private static final int DEFAULT_MEMORY_THRESHOLD_PERCENTAGE = 85;
private static final Field PROP_MEMORY_THRESHOLD_PERCENTAGE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "memory.threshold.percentage")
.withDefault(DEFAULT_MEMORY_THRESHOLD_PERCENTAGE)
.withValidation(RangeValidator.between(0, 100));
private int batchSize;
private String nullKey;
private String nullValue;
private String messageFormat;
private int memoryThreshold;
public RedisStreamChangeConsumerConfig(Configuration config) {
super(config, PROP_PREFIX);
}
@Override
protected void init(Configuration config) {
super.init(config);
batchSize = config.getInteger(PROP_BATCH_SIZE);
nullKey = config.getString(PROP_NULL_KEY);
nullValue = config.getString(PROP_NULL_VALUE);
messageFormat = config.getString(PROP_MESSAGE_FORMAT);
memoryThreshold = config.getInteger(PROP_MEMORY_THRESHOLD_PERCENTAGE);
}
@Override
protected List<Field> getAllConfigurationFields() {
List<Field> fields = Collect.arrayListOf(PROP_BATCH_SIZE, PROP_NULL_KEY, PROP_NULL_VALUE, PROP_MESSAGE_FORMAT, PROP_MEMORY_THRESHOLD_PERCENTAGE);
fields.addAll(super.getAllConfigurationFields());
return fields;
}
public int getBatchSize() {
return batchSize;
}
public String getNullKey() {
return nullKey;
}
public String getNullValue() {
return nullValue;
}
public String getMessageFormat() {
return messageFormat;
}
public int getMemoryThreshold() {
return memoryThreshold;
}
}

View File

@ -57,7 +57,7 @@ protected SchemaHistory createHistory() {
SchemaHistory history = new RedisSchemaHistory();
history.configure(Configuration.create()
.with(RedisSchemaHistory.PROP_ADDRESS, HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
.with("schema.history.internal.redis.address", HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()))
.build(), null, SchemaHistoryMetrics.NOOP, true);
history.start();
return history;

View File

@ -0,0 +1,181 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.Collect;
public class RedisCommonConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisCommonConfig.class);
public static final String CONFIGURATION_FIELD_PREFIX_STRING = "redis.";
private static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The url that will be used to access Redis")
.required();
private static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The user that will be used to access Redis");
private static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The password that will be used to access Redis");
private static final boolean DEFAULT_SSL_ENABLED = false;
private static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault(DEFAULT_SSL_ENABLED);
private static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
private static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
private static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
private static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
private static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
private static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
private static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
private static final boolean DEFAULT_WAIT_ENABLED = false;
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
.withDescription(
"Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.")
.withDefault(DEFAULT_WAIT_ENABLED);
private static final long DEFAULT_WAIT_TIMEOUT = 1000L;
private static final Field PROP_WAIT_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.timeout.ms")
.withDescription("Timeout when wait for replica")
.withDefault(DEFAULT_WAIT_TIMEOUT);
private static final boolean DEFAULT_WAIT_RETRY_ENABLED = false;
private static final Field PROP_WAIT_RETRY_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.enabled")
.withDescription("Enables retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_ENABLED);
private static final long DEFAULT_WAIT_RETRY_DELAY = 1000L;
private static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_DELAY);
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Integer initialRetryDelay;
private Integer maxRetryDelay;
private Integer connectionTimeout;
private Integer socketTimeout;
private boolean waitEnabled;
private long waitTimeout;
private boolean waitRetryEnabled;
private long waitRetryDelay;
public RedisCommonConfig(Configuration config, String prefix) {
config = config.subset(prefix, true);
LOGGER.info("Configuration for '{}' with prefix '{}': {}", getClass().getSimpleName(), prefix, config.asMap());
if (!config.validateAndRecord(getAllConfigurationFields(), error -> LOGGER.error("Validation error for property with prefix '{}': {}", prefix, error))) {
throw new DebeziumException(
String.format("Error configuring an instance of '%s' with prefix '%s'; check the logs for errors", getClass().getSimpleName(), prefix));
}
init(config);
}
protected List<Field> getAllConfigurationFields() {
return Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_SSL_ENABLED, PROP_CONNECTION_TIMEOUT, PROP_SOCKET_TIMEOUT, PROP_RETRY_INITIAL_DELAY,
PROP_RETRY_MAX_DELAY, PROP_WAIT_ENABLED, PROP_WAIT_TIMEOUT, PROP_WAIT_RETRY_ENABLED, PROP_WAIT_RETRY_DELAY);
}
protected void init(Configuration config) {
address = config.getString(PROP_ADDRESS);
user = config.getString(PROP_USER);
password = config.getString(PROP_PASSWORD);
sslEnabled = config.getBoolean(PROP_SSL_ENABLED);
initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY);
maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY);
connectionTimeout = config.getInteger(PROP_CONNECTION_TIMEOUT);
socketTimeout = config.getInteger(PROP_SOCKET_TIMEOUT);
waitEnabled = config.getBoolean(PROP_WAIT_ENABLED);
waitTimeout = config.getLong(PROP_WAIT_TIMEOUT);
waitRetryEnabled = config.getBoolean(PROP_WAIT_RETRY_ENABLED);
waitRetryDelay = config.getLong(PROP_WAIT_RETRY_DELAY);
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAddress() {
return address;
}
public String getUser() {
return user;
}
public boolean isSslEnabled() {
return sslEnabled;
}
public Integer getInitialRetryDelay() {
return initialRetryDelay;
}
public Integer getMaxRetryDelay() {
return maxRetryDelay;
}
public Integer getConnectionTimeout() {
return connectionTimeout;
}
public Integer getSocketTimeout() {
return socketTimeout;
}
public boolean isWaitEnabled() {
return waitEnabled;
}
public long getWaitTimeout() {
return waitTimeout;
}
public boolean isWaitRetryEnabled() {
return waitRetryEnabled;
}
public long getWaitRetryDelay() {
return waitRetryDelay;
}
}

View File

@ -8,20 +8,17 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.AbstractSchemaHistory;
@ -33,7 +30,6 @@
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.Collect;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Loggings;
@ -44,116 +40,31 @@
@ThreadSafe
public class RedisSchemaHistory extends AbstractSchemaHistory {
private static final String CONFIGURATION_FIELD_PREFIX_STRING = SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + "redis.";
private static final Logger LOGGER = LoggerFactory.getLogger(RedisSchemaHistory.class);
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The Redis key that will be used to store the database schema history")
.withDefault("metadata:debezium:schema_history");
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
.withDescription(
"Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.")
.withDefault(false);
private static final Field PROP_WAIT_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.timeout.ms")
.withDescription("Timeout when wait for replica")
.withDefault(1000L);
private static final Field PROP_WAIT_RETRY_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.enabled")
.withDescription("Enables retry on wait for replica failure")
.withDefault(false);
private static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for replica failure")
.withDefault(1000L);
Duration initialRetryDelay;
Duration maxRetryDelay;
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(PROP_ADDRESS, PROP_USER, PROP_PASSWORD, PROP_KEY);
private Duration initialRetryDelay;
private Duration maxRetryDelay;
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
private Configuration config;
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private Integer connectionTimeout;
private Integer socketTimeout;
private boolean waitEnabled;
private long waitTimeout;
private boolean waitRetryEnabled;
private long waitRetryDelay;
private RedisClient client;
private RedisSchemaHistoryConfig config;
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, waitEnabled, waitTimeout, waitRetryEnabled, waitRetryDelay);
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getUser(), config.getPassword(), config.getConnectionTimeout(),
config.getSocketTimeout(), config.isSslEnabled());
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_SCHEMA_HISTORY, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}
@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
if (!config.validateAndRecord(ALL_FIELDS, LOGGER::error)) {
throw new ConnectException(
"Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
this.config = config;
// fetch the properties
this.address = this.config.getString(PROP_ADDRESS.name());
this.user = this.config.getString(PROP_USER.name());
this.password = this.config.getString(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.getString(PROP_SSL_ENABLED.name()));
this.redisKeyName = this.config.getString(PROP_KEY);
LOGGER.info("rediskeyname:" + this.redisKeyName);
// load retry settings
this.initialRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_INITIAL_DELAY));
this.maxRetryDelay = Duration.ofMillis(this.config.getInteger(PROP_RETRY_MAX_DELAY));
// load connection timeout settings
this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT);
this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT);
this.waitEnabled = this.config.getBoolean(PROP_WAIT_ENABLED);
this.waitTimeout = this.config.getLong(PROP_WAIT_TIMEOUT);
this.waitRetryEnabled = this.config.getBoolean(PROP_WAIT_RETRY_ENABLED);
this.waitRetryDelay = this.config.getLong(PROP_WAIT_RETRY_DELAY);
this.config = new RedisSchemaHistoryConfig(config);
this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay());
this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay());
super.configure(config, comparator, listener, useCatalogBeforeSchema);
}
@ -189,7 +100,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
}
// write the entry to Redis
client.xadd(this.redisKeyName, Collections.singletonMap("schema", line));
client.xadd(config.getRedisKeyName(), Collections.singletonMap("schema", line));
LOGGER.trace("Record written to database schema history in Redis: " + line);
completedSuccessfully = true;
}
@ -232,7 +143,7 @@ protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
}
// read the entries from Redis
entries = client.xrange(this.redisKeyName);
entries = client.xrange(config.getRedisKeyName());
completedSuccessfully = true;
}
catch (RedisClientConnectionException e) {
@ -270,7 +181,7 @@ public boolean storageExists() {
@Override
public boolean exists() {
// check if the stream is not empty
if (client != null && client.xlen(this.redisKeyName) > 0) {
if (client != null && client.xlen(config.getRedisKeyName()) > 0) {
return true;
}
else {

View File

@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis.history;
import java.util.List;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.storage.redis.RedisCommonConfig;
import io.debezium.util.Collect;
public class RedisSchemaHistoryConfig extends RedisCommonConfig {
private static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:schema_history";
private static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The Redis key that will be used to store the database schema history")
.withDefault(DEFAULT_REDIS_KEY_NAME);
private String redisKeyName;
public RedisSchemaHistoryConfig(Configuration config) {
super(config, SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING);
}
@Override
protected void init(Configuration config) {
super.init(config);
this.redisKeyName = config.getString(PROP_KEY_NAME);
}
@Override
protected List<Field> getAllConfigurationFields() {
List<Field> fields = Collect.arrayListOf(PROP_KEY_NAME);
fields.addAll(super.getAllConfigurationFields());
return fields;
}
public String getRedisKeyName() {
return redisKeyName;
}
}

View File

@ -9,14 +9,13 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Field;
import io.debezium.config.Configuration;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
@ -31,123 +30,22 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisOffsetBackingStore.class);
private static final String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.redis.";
public static final Field PROP_ADDRESS = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "address")
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_SSL_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ssl.enabled")
.withDescription("Use SSL for Redis connection")
.withDefault("false");
public static final Field PROP_USER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "user")
.withDescription("The Redis url that will be used to access the database schema history");
public static final Field PROP_PASSWORD = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "password")
.withDescription("The Redis url that will be used to access the database schema history");
public static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
public static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The Redis key that will be used to store the database schema history")
.withDefault(DEFAULT_REDIS_KEY_NAME);
public static final Integer DEFAULT_RETRY_INITIAL_DELAY = 300;
public static final Field PROP_RETRY_INITIAL_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.initial.delay.ms")
.withDescription("Initial retry delay (in ms)")
.withDefault(DEFAULT_RETRY_INITIAL_DELAY);
public static final Integer DEFAULT_RETRY_MAX_DELAY = 10000;
public static final Field PROP_RETRY_MAX_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.delay.ms")
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final boolean DEFAULT_WAIT_ENABLED = false;
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
.withDescription(
"Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.")
.withDefault(DEFAULT_WAIT_ENABLED);
private static final long DEFAULT_WAIT_TIMEOUT = 1000L;
private static final Field PROP_WAIT_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.timeout.ms")
.withDescription("Timeout when wait for replica")
.withDefault(DEFAULT_WAIT_TIMEOUT);
private static final boolean DEFAULT_WAIT_RETRY_ENABLED = false;
private static final Field PROP_WAIT_RETRY_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.enabled")
.withDescription("Enables retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_ENABLED);
private static final long DEFAULT_WAIT_RETRY_DELAY = 1000L;
private static final Field PROP_WAIT_RETRY_DELAY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.retry.delay.ms")
.withDescription("Delay of retry on wait for replica failure")
.withDefault(DEFAULT_WAIT_RETRY_DELAY);
private String redisKeyName;
private String address;
private String user;
private String password;
private boolean sslEnabled;
private RedisOffsetBackingStoreConfig config;
private RedisClient client;
private Map<String, String> config;
private Integer initialRetryDelay;
private Integer maxRetryDelay;
private Integer connectionTimeout;
private Integer socketTimeout;
private boolean waitEnabled;
private long waitTimeout;
private boolean waitRetryEnabled;
private long waitRetryDelay;
public RedisOffsetBackingStore() {
}
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, this.waitEnabled, this.waitTimeout, this.waitRetryEnabled,
this.waitRetryDelay);
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getUser(), config.getPassword(), config.getConnectionTimeout(),
config.getSocketTimeout(), config.isSslEnabled());
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(),
config.isWaitRetryEnabled(), config.getWaitRetryDelay());
}
@Override
public void configure(WorkerConfig config) {
super.configure(config);
this.config = config.originalsStrings();
this.address = this.config.get(PROP_ADDRESS.name());
this.user = this.config.get(PROP_USER.name());
this.password = this.config.get(PROP_PASSWORD.name());
this.sslEnabled = Boolean.parseBoolean(this.config.get(PROP_SSL_ENABLED.name()));
this.redisKeyName = Optional.ofNullable(
this.config.get(PROP_KEY_NAME.name())).orElse(DEFAULT_REDIS_KEY_NAME);
// load retry settings
this.initialRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY);
this.maxRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
// load connection timeout settings
this.connectionTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT);
this.socketTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT);
this.waitEnabled = Optional.ofNullable(Boolean.getBoolean(this.config.get(PROP_WAIT_ENABLED.name()))).orElse(DEFAULT_WAIT_ENABLED);
this.waitTimeout = Optional.ofNullable(Long.getLong(this.config.get(PROP_WAIT_TIMEOUT.name()))).orElse(DEFAULT_WAIT_TIMEOUT);
this.waitRetryEnabled = Optional.ofNullable(Boolean.getBoolean(this.config.get(PROP_WAIT_RETRY_ENABLED.name()))).orElse(DEFAULT_WAIT_RETRY_ENABLED);
this.waitRetryDelay = Optional.ofNullable(Long.getLong(this.config.get(PROP_WAIT_RETRY_DELAY.name()))).orElse(DEFAULT_WAIT_RETRY_DELAY);
Configuration configuration = Configuration.from(config.originalsStrings());
this.config = new RedisOffsetBackingStoreConfig(configuration);
}
@Override
@ -171,7 +69,7 @@ public synchronized void stop() {
private void load() {
// fetch the value from Redis
Map<String, String> offsets = Uni.createFrom().item(() -> {
return (Map<String, String>) client.hgetAll(this.redisKeyName);
return (Map<String, String>) client.hgetAll(config.getRedisKeyName());
})
// handle failures and retry
.onFailure().invoke(
@ -185,7 +83,7 @@ private void load() {
this.connect();
})
// retry on failure with backoff
.onFailure().retry().withBackOff(Duration.ofMillis(initialRetryDelay), Duration.ofMillis(maxRetryDelay)).indefinitely()
.onFailure().retry().withBackOff(Duration.ofMillis(config.getInitialRetryDelay()), Duration.ofMillis(config.getMaxRetryDelay())).indefinitely()
// write success trace message
.invoke(
item -> {
@ -210,7 +108,7 @@ protected void save() {
byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
// set the value in Redis
Uni.createFrom().item(() -> {
return (Long) client.hset(this.redisKeyName.getBytes(), key, value);
return (Long) client.hset(config.getRedisKeyName().getBytes(), key, value);
})
// handle failures and retry
.onFailure().invoke(

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis.offset;
import java.util.List;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.storage.redis.RedisCommonConfig;
import io.debezium.util.Collect;
public class RedisOffsetBackingStoreConfig extends RedisCommonConfig {
private static final String PROP_PREFIX = "offset.storage.";
private static final String DEFAULT_REDIS_KEY_NAME = "metadata:debezium:offsets";
private static final Field PROP_KEY_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "key")
.withDescription("The Redis key that will be used to store the offsets")
.withDefault(DEFAULT_REDIS_KEY_NAME);
private String redisKeyName;
public RedisOffsetBackingStoreConfig(Configuration config) {
super(config, PROP_PREFIX);
}
@Override
protected void init(Configuration config) {
super.init(config);
this.redisKeyName = config.getString(PROP_KEY_NAME);
}
@Override
protected List<Field> getAllConfigurationFields() {
List<Field> fields = Collect.arrayListOf(PROP_KEY_NAME);
fields.addAll(super.getAllConfigurationFields());
return fields;
}
public String getRedisKeyName() {
return redisKeyName;
}
}