DBZ-7120 Add a connection max retry limit to Redis schema history
This commit is contained in:
parent
9d84d01f64
commit
7532e52228
@ -64,6 +64,11 @@ public class RedisCommonConfig {
|
|||||||
.withDescription("Maximum retry delay (in ms)")
|
.withDescription("Maximum retry delay (in ms)")
|
||||||
.withDefault(DEFAULT_RETRY_MAX_DELAY);
|
.withDefault(DEFAULT_RETRY_MAX_DELAY);
|
||||||
|
|
||||||
|
private static final Integer DEFAULT_MAX_RETRIES = 10;
|
||||||
|
private static final Field PROP_MAX_RETRIES = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "retry.max.attempts")
|
||||||
|
.withDescription("Maximum number of retry attempts before giving up.")
|
||||||
|
.withDefault(DEFAULT_MAX_RETRIES);
|
||||||
|
|
||||||
private static final boolean DEFAULT_WAIT_ENABLED = false;
|
private static final boolean DEFAULT_WAIT_ENABLED = false;
|
||||||
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
|
private static final Field PROP_WAIT_ENABLED = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "wait.enabled")
|
||||||
.withDescription(
|
.withDescription(
|
||||||
@ -93,6 +98,7 @@ public class RedisCommonConfig {
|
|||||||
|
|
||||||
private Integer initialRetryDelay;
|
private Integer initialRetryDelay;
|
||||||
private Integer maxRetryDelay;
|
private Integer maxRetryDelay;
|
||||||
|
private Integer maxRetryCount;
|
||||||
|
|
||||||
private Integer connectionTimeout;
|
private Integer connectionTimeout;
|
||||||
private Integer socketTimeout;
|
private Integer socketTimeout;
|
||||||
@ -127,6 +133,7 @@ protected void init(Configuration config) {
|
|||||||
|
|
||||||
initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY);
|
initialRetryDelay = config.getInteger(PROP_RETRY_INITIAL_DELAY);
|
||||||
maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY);
|
maxRetryDelay = config.getInteger(PROP_RETRY_MAX_DELAY);
|
||||||
|
maxRetryCount = config.getInteger(PROP_MAX_RETRIES);
|
||||||
|
|
||||||
connectionTimeout = config.getInteger(PROP_CONNECTION_TIMEOUT);
|
connectionTimeout = config.getInteger(PROP_CONNECTION_TIMEOUT);
|
||||||
socketTimeout = config.getInteger(PROP_SOCKET_TIMEOUT);
|
socketTimeout = config.getInteger(PROP_SOCKET_TIMEOUT);
|
||||||
@ -169,6 +176,10 @@ public Integer getMaxRetryDelay() {
|
|||||||
return maxRetryDelay;
|
return maxRetryDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getMaxRetryCount() {
|
||||||
|
return maxRetryCount;
|
||||||
|
}
|
||||||
|
|
||||||
public Integer getConnectionTimeout() {
|
public Integer getConnectionTimeout() {
|
||||||
return connectionTimeout;
|
return connectionTimeout;
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,7 @@ public class RedisSchemaHistory extends AbstractSchemaHistory {
|
|||||||
|
|
||||||
private Duration initialRetryDelay;
|
private Duration initialRetryDelay;
|
||||||
private Duration maxRetryDelay;
|
private Duration maxRetryDelay;
|
||||||
|
private Integer maxRetryCount;
|
||||||
|
|
||||||
private final DocumentWriter writer = DocumentWriter.defaultWriter();
|
private final DocumentWriter writer = DocumentWriter.defaultWriter();
|
||||||
private final DocumentReader reader = DocumentReader.defaultReader();
|
private final DocumentReader reader = DocumentReader.defaultReader();
|
||||||
@ -65,6 +66,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
|
|||||||
this.config = new RedisSchemaHistoryConfig(config);
|
this.config = new RedisSchemaHistoryConfig(config);
|
||||||
this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay());
|
this.initialRetryDelay = Duration.ofMillis(this.config.getInitialRetryDelay());
|
||||||
this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay());
|
this.maxRetryDelay = Duration.ofMillis(this.config.getMaxRetryDelay());
|
||||||
|
this.maxRetryCount = this.config.getMaxRetryCount();
|
||||||
super.configure(config, comparator, listener, useCatalogBeforeSchema);
|
super.configure(config, comparator, listener, useCatalogBeforeSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -137,8 +139,8 @@ public boolean exists() {
|
|||||||
private <T> T doWithRetry(Supplier<T> action, String description) {
|
private <T> T doWithRetry(Supplier<T> action, String description) {
|
||||||
final var delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
|
final var delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
|
||||||
|
|
||||||
// loop and retry until successful
|
// loop and retry until successful or maximum attempts reached
|
||||||
for (;;) {
|
for (int i = 1; i <= maxRetryCount; i++) {
|
||||||
try {
|
try {
|
||||||
if (client == null) {
|
if (client == null) {
|
||||||
this.connect();
|
this.connect();
|
||||||
@ -147,7 +149,7 @@ private <T> T doWithRetry(Supplier<T> action, String description) {
|
|||||||
return action.get();
|
return action.get();
|
||||||
}
|
}
|
||||||
catch (RedisClientConnectionException e) {
|
catch (RedisClientConnectionException e) {
|
||||||
LOGGER.warn("Connection to Redis failed, will try to reconnect");
|
LOGGER.warn("Connection to Redis failed, will try to reconnect [attempt {} of {}]", i, maxRetryCount);
|
||||||
try {
|
try {
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
client.disconnect();
|
client.disconnect();
|
||||||
@ -164,5 +166,7 @@ private <T> T doWithRetry(Supplier<T> action, String description) {
|
|||||||
// Failed to execute the operation, retry...
|
// Failed to execute the operation, retry...
|
||||||
delayStrategy.sleepWhen(true);
|
delayStrategy.sleepWhen(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
throw new SchemaHistoryException(String.format("Failed to connect to Redis after %d attempts.", maxRetryCount));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -297,6 +297,10 @@ There are also other options available
|
|||||||
|
|
|
|
||||||
|The maximum delay in case of a connection retry to Redis if using `RedisSchemaHistory`. Default: 10000 (ms)
|
|The maximum delay in case of a connection retry to Redis if using `RedisSchemaHistory`. Default: 10000 (ms)
|
||||||
|
|
||||||
|
|[[debezium-source-database-history-redis-retry-max-attempts]]<<debezium-source-database-history-redis-retry-max-attempts, `debezium.source.schema.history.internal.redis.retry.max.attempts`>>
|
||||||
|
|
|
||||||
|
|The maximum number of attempts to connect to Redis. Default: 10
|
||||||
|
|
||||||
|[[debezium-source-database-history-redis-connection-timeout-ms]]<<debezium-source-database-history-redis-connection-timeout-ms, `debezium.source.schema.history.internal.redis.connection.timeout.ms`>>
|
|[[debezium-source-database-history-redis-connection-timeout-ms]]<<debezium-source-database-history-redis-connection-timeout-ms, `debezium.source.schema.history.internal.redis.connection.timeout.ms`>>
|
||||||
|
|
|
|
||||||
|Connection timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)
|
|Connection timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)
|
||||||
|
Loading…
Reference in New Issue
Block a user