DBZ-4510 Using DelayStrategy

This commit is contained in:
Gunnar Morling 2022-02-07 17:19:56 +01:00
parent 9793f71cf0
commit 1edc5ef3c3
2 changed files with 7 additions and 10 deletions

View File

@ -28,6 +28,7 @@
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.util.DelayStrategy;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
@ -50,6 +51,7 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final String PROP_USER = PROP_PREFIX + "user";
private static final String PROP_PASSWORD = PROP_PREFIX + "password";
private DelayStrategy delayStrategy;
private HostAndPort address;
private Optional<String> user;
private Optional<String> password;
@ -74,6 +76,8 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
@PostConstruct
void connect() {
delayStrategy = DelayStrategy.exponential(initialRetryDelay, maxRetryDelay);
if (customClient.isResolvable()) {
client = customClient.get();
try {
@ -127,7 +131,6 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
String destination = streamNameMapper.map(record.destination());
String key = (record.key() != null) ? getString(record.key()) : nullKey;
String value = (record.value() != null) ? getString(record.value()) : nullValue;
int currentRetryTime = initialRetryDelay;
boolean completedSuccessfully = false;
// As long as we failed to add the current record to the stream, we should retry if the reason was either a connection error or OOM in Redis.
@ -165,13 +168,7 @@ else if (e.getMessage().equals("LOADING Redis is loading the dataset in memory")
}
// Failed to add the record to the stream, retry...
if (!completedSuccessfully) {
LOGGER.info("Retrying in {} ms", currentRetryTime);
Thread.sleep(currentRetryTime);
// Exponential backoff: As long as the current retry time does not exceed the max retry time, double it
currentRetryTime = Math.min(currentRetryTime *= 2, maxRetryDelay);
}
delayStrategy.sleepWhen(!completedSuccessfully);
}
committer.markProcessed(record);

View File

@ -41,6 +41,7 @@
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
public class RedisStreamIT {
@ConfigProperty(name = "debezium.source.database.hostname")
String dbHostname;
@ -149,8 +150,7 @@ public void testRedisConnectionRetry() throws Exception {
* 1. Simulate a Redis OOM by setting its max memory to 1M.
* 2. Create a new table named redis_test2 in PostgreSQL and insert 1000 records to it.
* 3. As result, after inserting ~22 records, Redis runs OOM.
* 4. Sleep for additional 5 seconds to ensure the Sink is retrying.
* 5. Revert max memory setting so Redis is no longer in OOM and make sure all 100 records have been streamed successfully.
* 4. Revert max memory setting so Redis is no longer in OOM and make sure all 100 records have been streamed successfully.
*/
@Test
@FixFor("DBZ-4510")