From 7532e52228887df40c12b77c9919bdc351865b12 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 8 Nov 2023 13:12:29 -0500 Subject: [PATCH] DBZ-7120 Add a connection max retry limit to Redis schema history --- .../io/debezium/storage/redis/RedisCommonConfig.java | 11 +++++++++++ .../storage/redis/history/RedisSchemaHistory.java | 10 +++++++--- .../ROOT/pages/operations/debezium-server.adoc | 4 ++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java index 6e881019d..aaf2b342d 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/RedisCommonConfig.java @@ -64,6 +64,11 @@ public class RedisCommonConfig { .withDescription("Maximum retry delay (in ms)") .withDefault(DEFAULT_RETRY_MAX_DELAY); + private static final Integer DEFAULT_MAX_RETRIES = 10; + private static final Field PROP_MAX_RETRIES = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.attempts") + .withDescription("Maximum number of retry attempts before giving up.") + .withDefault(DEFAULT_MAX_RETRIES); + private static final boolean DEFAULT_WAIT_ENABLED = false; private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled") .withDescription( @@ -93,6 +98,7 @@ public class RedisCommonConfig { private Integer initialRetryDelay; private Integer maxRetryDelay; + private Integer maxRetryCount; private Integer connectionTimeout; private Integer socketTimeout; @@ -127,6 +133,7 @@ protected void init(Configuration config) { initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY); maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY); + maxRetryCount = config.getInteger(PROP_MAX_RETRIES); connectionTimeout = config.getInteger(PROP_CONNECTION_TIMEOUT); socketTimeout = config.getInteger(PROP_SOCKET_TIMEOUT); @@ -169,6 +176,10 @@ public Integer getMaxRetryDelay() { return maxRetryDelay; } + public Integer getMaxRetryCount() { + return maxRetryCount; + } + public Integer getConnectionTimeout() { return connectionTimeout; } diff --git a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java index 5067dea0d..a0683f51d 100644 --- a/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java +++ b/debezium-storage/debezium-storage-redis/src/main/java/io/debezium/storage/redis/history/RedisSchemaHistory.java @@ -44,6 +44,7 @@ public class RedisSchemaHistory extends AbstractSchemaHistory { private Duration initialRetryDelay; private Duration maxRetryDelay; + private Integer maxRetryCount; private final DocumentWriter writer = DocumentWriter.defaultWriter(); private final DocumentReader reader = DocumentReader.defaultReader(); @@ -65,6 +66,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, this.config = new RedisSchemaHistoryConfig(config); this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay()); this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay()); + this.maxRetryCount = this.config.getMaxRetryCount(); super.configure(config, comparator, listener, useCatalogBeforeSchema); } @@ -137,8 +139,8 @@ public boolean exists() { private T doWithRetry(Supplier action, String description) { final var delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay); - // loop and retry until successful - for (;;) { + // loop and retry until successful or maximum attempts reached + for (int i = 1; i <= maxRetryCount; i++) { try { if (client == null) { this.connect(); @@ -147,7 +149,7 @@ private T doWithRetry(Supplier action, String description) { return action.get(); } catch (RedisClientConnectionException e) { - LOGGER.warn("Connection to Redis failed, will try to reconnect"); + LOGGER.warn("Connection to Redis failed, will try to reconnect [attempt {} of {}]", i, maxRetryCount); try { if (client != null) { client.disconnect(); @@ -164,5 +166,7 @@ private T doWithRetry(Supplier action, String description) { // Failed to execute the operation, retry... delayStrategy.sleepWhen(true); } + + throw new SchemaHistoryException(String.format("Failed to connect to Redis after %d attempts.", maxRetryCount)); } } diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index c29a36b5e..50380d6d0 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -297,6 +297,10 @@ There are also other options available | |The maximum delay in case of a connection retry to Redis if using `RedisSchemaHistory`. Default: 10000 (ms) +|[[debezium-source-database-history-redis-retry-max-attempts]]<> +| +|The maximum number of attempts to connect to Redis. Default: 10 + |[[debezium-source-database-history-redis-connection-timeout-ms]]<> | |Connection timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)