DBZ-5142 Configurable Redis connection timeouts

This commit is contained in:
Eliran 2022-05-19 10:56:27 +03:00 committed by Chris Cranford
parent 5877bea4bb
commit a1cf758c45
5 changed files with 65 additions and 6 deletions

View File

@ -26,19 +26,23 @@ public class RedisConnection {
private String address;
private String user;
private String password;
private int connectionTimeout;
private int socketTimeout;
private boolean sslEnabled;
public RedisConnection(String address, String user, String password, boolean sslEnabled) {
public RedisConnection(String address, String user, String password, int connectionTimeout, int socketTimeout, boolean sslEnabled) {
this.address = address;
this.user = user;
this.password = password;
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.sslEnabled = sslEnabled;
}
public Jedis getRedisClient(String clientName) {
HostAndPort address = HostAndPort.from(this.address);
Jedis client = new Jedis(address.getHost(), address.getPort(), this.sslEnabled);
Jedis client = new Jedis(address.getHost(), address.getPort(), this.connectionTimeout, this.socketTimeout, this.sslEnabled);
if (this.user != null) {
client.auth(this.user, this.password);

View File

@ -74,6 +74,16 @@ public final class RedisDatabaseHistory extends AbstractDatabaseHistory {
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
Integer initialRetryDelay;
Integer maxRetryDelay;
@ -91,11 +101,13 @@ public final class RedisDatabaseHistory extends AbstractDatabaseHistory {
private String user;
private String password;
private boolean sslEnabled;
private Integer connectionTimeout;
private Integer socketTimeout;
private Jedis client = null;
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.sslEnabled);
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_DB_HISTORY);
}
@ -126,6 +138,9 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
// load retry settings
this.initialRetryDelay = this.config.getInteger(PROP_RETRY_INITIAL_DELAY);
this.maxRetryDelay = this.config.getInteger(PROP_RETRY_MAX_DELAY);
// load connection timeout settings
this.connectionTimeout = this.config.getInteger(PROP_CONNECTION_TIMEOUT);
this.socketTimeout = this.config.getInteger(PROP_SOCKET_TIMEOUT);
super.configure(config, comparator, listener, useCatalogBeforeSchema);
}

View File

@ -60,6 +60,16 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
.withDescription("Maximum retry delay (in ms)")
.withDefault(DEFAULT_RETRY_MAX_DELAY);
public static final Integer DEFAULT_CONNECTION_TIMEOUT = 2000;
public static final Field PROP_CONNECTION_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connection.timeout.ms")
.withDescription("Connection timeout (in ms)")
.withDefault(DEFAULT_CONNECTION_TIMEOUT);
public static final Integer DEFAULT_SOCKET_TIMEOUT = 2000;
public static final Field PROP_SOCKET_TIMEOUT = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "socket.timeout.ms")
.withDescription("Socket timeout (in ms)")
.withDefault(DEFAULT_SOCKET_TIMEOUT);
private static final String SINK_PROP_PREFIX = "debezium.sink.redis.";
private String redisKeyName;
@ -74,12 +84,15 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private Integer initialRetryDelay;
private Integer maxRetryDelay;
private Integer connectionTimeout;
private Integer socketTimeout;
public RedisOffsetBackingStore() {
}
void connect() {
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.sslEnabled);
RedisConnection redisConnection = new RedisConnection(this.address, this.user, this.password, this.connectionTimeout, this.socketTimeout, this.sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME);
}
@ -109,7 +122,12 @@ public void configure(WorkerConfig config) {
this.initialRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_INITIAL_DELAY);
this.maxRetryDelay = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_RETRY_INITIAL_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
Integer.getInteger(this.config.get(PROP_RETRY_MAX_DELAY.name()))).orElse(DEFAULT_RETRY_MAX_DELAY);
// load connection timeout settings
this.connectionTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_CONNECTION_TIMEOUT.name()))).orElse(DEFAULT_CONNECTION_TIMEOUT);
this.socketTimeout = Optional.ofNullable(
Integer.getInteger(this.config.get(PROP_SOCKET_TIMEOUT.name()))).orElse(DEFAULT_SOCKET_TIMEOUT);
}

View File

@ -53,10 +53,14 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final String PROP_ADDRESS = PROP_PREFIX + "address";
private static final String PROP_USER = PROP_PREFIX + "user";
private static final String PROP_PASSWORD = PROP_PREFIX + "password";
private static final String PROP_CONNECTION_TIMEOUT = PROP_PREFIX + "connection.timeout.ms";
private static final String PROP_SOCKET_TIMEOUT = PROP_PREFIX + "socket.timeout.ms";
private String address;
private String user;
private String password;
private Integer connectionTimeout;
private Integer socketTimeout;
@ConfigProperty(name = PROP_PREFIX + "ssl.enabled", defaultValue = "false")
boolean sslEnabled;
@ -84,8 +88,10 @@ void connect() {
address = config.getValue(PROP_ADDRESS, String.class);
user = config.getOptionalValue(PROP_USER, String.class).orElse(null);
password = config.getOptionalValue(PROP_PASSWORD, String.class).orElse(null);
connectionTimeout = config.getOptionalValue(PROP_CONNECTION_TIMEOUT, Integer.class).orElse(2000);
socketTimeout = config.getOptionalValue(PROP_SOCKET_TIMEOUT, Integer.class).orElse(2000);
RedisConnection redisConnection = new RedisConnection(address, user, password, sslEnabled);
RedisConnection redisConnection = new RedisConnection(address, user, password, connectionTimeout, socketTimeout, sslEnabled);
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_REDIS_SINK_CLIENT_NAME);
}

View File

@ -261,6 +261,14 @@ There are also other options available
|
|The maximum delay in case of a connection retry to Redis if using `RedisDatabaseHistory`. Default: 10000 (ms)
|[[debezium-source-database-history-redis-connection-timeout-ms]]<<debezium-source-database-history-redis-connection-timeout-ms, `debezium.source.database.history.redis.connection.timeout.ms`>>
|
|Connection timeout of Redis client if using `RedisDatabaseHistory`. Default: 2000 (ms)
|[[debezium-source-database-history-redis-socket-timeout-ms]]<<debezium-source-database-history-redis-socket-timeout-ms, `debezium.source.database.history.redis.socket.timeout.ms`>>
|
|Socket timeout of Redis client if using `RedisDatabaseHistory`. Default: 2000 (ms)
|===
[id="debezium-format-configuration-options"]
@ -775,6 +783,14 @@ This value will be doubled upon every retry but won't exceed `debezium.sink.redi
|`10000`
|Max delay when encountering Redis connection or OOM issues.
|[[redis-connection-timeout-ms]]<<redis-connection-timeout-ms, `debezium.sink.redis.connection.timeout.ms`>>
|`2000`
|Connection timeout for Redis client.
|[[redis-socket-timeout-ms]]<<redis-socket-timeout-ms, `debezium.sink.redis.socket.timeout.ms`>>
|`2000`
|Socket timeout for Redis client.
|===