DBZ-4510 Retrying in case of communication errors/OOM

This commit is contained in:
spicy-sauce 2022-02-01 16:29:41 +02:00 committed by Gunnar Morling
parent 5dc44b834f
commit 9793f71cf0
5 changed files with 200 additions and 27 deletions

View File

@ -31,6 +31,7 @@
import redis.clients.jedis.HostAndPort; import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis; import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
/** /**
* Implementation of the consumer that delivers the messages into Redis (stream) destination. * Implementation of the consumer that delivers the messages into Redis (stream) destination.
@ -53,6 +54,12 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private Optional<String> user; private Optional<String> user;
private Optional<String> password; private Optional<String> password;
@ConfigProperty(name = PROP_PREFIX + "retry.initial.delay.ms", defaultValue = "300")
Integer initialRetryDelay;
@ConfigProperty(name = PROP_PREFIX + "retry.max.delay.ms", defaultValue = "10000")
Integer maxRetryDelay;
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default") @ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey; String nullKey;
@ -115,21 +122,61 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
throws InterruptedException { throws InterruptedException {
for (ChangeEvent<Object, Object> record : records) { for (ChangeEvent<Object, Object> record : records) {
try {
LOGGER.trace("Received event '{}'", record); LOGGER.trace("Received event '{}'", record);
String destination = streamNameMapper.map(record.destination()); String destination = streamNameMapper.map(record.destination());
String key = (record.key() != null) ? getString(record.key()) : nullKey; String key = (record.key() != null) ? getString(record.key()) : nullKey;
String value = (record.value() != null) ? getString(record.value()) : nullValue; String value = (record.value() != null) ? getString(record.value()) : nullValue;
client.xadd(destination, null, Collections.singletonMap(key, value)); int currentRetryTime = initialRetryDelay;
boolean completedSuccessfully = false;
committer.markProcessed(record); // As long as we failed to add the current record to the stream, we should retry if the reason was either a connection error or OOM in Redis.
while (!completedSuccessfully) {
try {
// Add the record to the destination stream
client.xadd(destination, null, Collections.singletonMap(key, value));
completedSuccessfully = true;
}
catch (JedisConnectionException jce) {
// Try to reconnect
try {
connect();
} }
catch (Exception e) { catch (Exception e) {
LOGGER.error("Can't connect to Redis", e);
}
}
catch (Exception e) {
// When Redis reaches its max memory limitation, a JedisDataException will be thrown with this message.
// In this case, we will retry adding this record to the stream, assuming some memory will be freed eventually as result
// of evicting elements from the stream by the target DB.
if (e.getMessage().equals("OOM command not allowed when used memory > 'maxmemory'.")) {
LOGGER.error("Redis runs OOM", e);
}
// When Redis is starting, a JedisDataException will be thrown with this message.
// We will retry communicating with the target DB as once of the Redis is available, this message will be gone.
else if (e.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
LOGGER.error("Redis is starting", e);
}
// In case of unexpected runtime error, throw a DebeziumException which terminates the process
else {
throw new DebeziumException(e); throw new DebeziumException(e);
} }
} }
// Failed to add the record to the stream, retry...
if (!completedSuccessfully) {
LOGGER.info("Retrying in {} ms", currentRetryTime);
Thread.sleep(currentRetryTime);
// Exponential backoff: As long as the current retry time does not exceed the max retry time, double it
currentRetryTime = Math.min(currentRetryTime *= 2, maxRetryDelay);
}
}
committer.markProcessed(record);
}
committer.markBatchFinished(); committer.markBatchFinished();
} }
} }

View File

@ -5,6 +5,8 @@
*/ */
package io.debezium.server.redis; package io.debezium.server.redis;
import static org.junit.Assert.assertTrue;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -12,9 +14,12 @@
import javax.enterprise.event.Observes; import javax.enterprise.event.Observes;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.fest.assertions.Assertions; import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.server.events.ConnectorCompletedEvent; import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent; import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager; import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
@ -27,7 +32,8 @@
import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntry;
/** /**
* Integration test that verifies basic reading from PostgreSQL database and writing to Redis stream. * Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream
* and retry mechanism in case of connectivity issues or OOM in Redis
* *
* @author M Sazzadul Hoque * @author M Sazzadul Hoque
*/ */
@ -35,9 +41,20 @@
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class) @QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(RedisTestResourceLifecycleManager.class) @QuarkusTestResource(RedisTestResourceLifecycleManager.class)
public class RedisStreamIT { public class RedisStreamIT {
@ConfigProperty(name = "debezium.source.database.hostname")
String dbHostname;
private static final int MESSAGE_COUNT = 4; @ConfigProperty(name = "debezium.source.database.port")
private static final String STREAM_NAME = "testc.inventory.customers"; String dbPort;
@ConfigProperty(name = "debezium.source.database.user")
String dbUser;
@ConfigProperty(name = "debezium.source.database.password")
String dbPassword;
@ConfigProperty(name = "debezium.source.database.dbname")
String dbName;
protected static Jedis jedis; protected static Jedis jedis;
@ -48,11 +65,7 @@ public class RedisStreamIT {
void setupDependencies(@Observes ConnectorStartedEvent event) { void setupDependencies(@Observes ConnectorStartedEvent event) {
Testing.Print.enable(); Testing.Print.enable();
jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress())); jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
// clean-up STREAM_NAME
jedis.del(STREAM_NAME);
} }
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception { void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
@ -61,15 +74,111 @@ void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exceptio
} }
} }
private PostgresConnection getPostgresConnection() {
return new PostgresConnection(Configuration.create()
.with("hostname", dbHostname)
.with("port", dbPort)
.with("user", dbUser)
.with("password", dbPassword)
.with("dbname", dbName)
.build());
}
private List<StreamEntry> getStreamElements(String streamName, int messageCount) {
final List<StreamEntry> entries = new ArrayList<>();
Awaitility.await().atMost(Duration.ofSeconds(RedisTestConfigSource.waitForSeconds())).until(() -> {
final List<StreamEntry> response = jedis.xrange(streamName, null, null, messageCount);
entries.clear();
entries.addAll(response);
return entries.size() == messageCount;
});
return entries;
}
/**
* Verifies that all the records of a PostgreSQL table are streamed to Redis
*/
@Test @Test
public void testRedisStream() throws Exception { public void testRedisStream() throws Exception {
Testing.Print.enable(); final int MESSAGE_COUNT = 4;
final List<StreamEntry> entries = new ArrayList<>(); final String STREAM_NAME = "testc.inventory.customers";
Awaitility.await().atMost(Duration.ofSeconds(RedisTestConfigSource.waitForSeconds())).until(() -> {
final List<StreamEntry> response = jedis.xrange(STREAM_NAME, null, null, MESSAGE_COUNT); final List<StreamEntry> entries = getStreamElements(STREAM_NAME, MESSAGE_COUNT);
entries.addAll(response); assertTrue("Redis Basic Stream Test Failed", entries.size() == MESSAGE_COUNT);
return entries.size() >= MESSAGE_COUNT; }
});
Assertions.assertThat(entries.size() >= MESSAGE_COUNT); /**
* Test retry mechanism when encountering Redis connectivity issues:
* 1. Make Redis to be unavailable while the server is up
* 2. Create a new table named redis_test in PostgreSQL and insert 5 records to it
* 3. Bring Redis up again and make sure these records have been streamed successfully
*/
@Test
@FixFor("DBZ-4510")
public void testRedisConnectionRetry() throws Exception {
final int MESSAGE_COUNT = 5;
final String STREAM_NAME = "testc.inventory.redis_test";
Testing.print("Pausing container");
RedisTestResourceLifecycleManager.pause();
final PostgresConnection connection = getPostgresConnection();
Testing.print("Creating new redis_test table and inserting 5 records to it");
connection.execute(
"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)",
"INSERT INTO inventory.redis_test VALUES (1)",
"INSERT INTO inventory.redis_test VALUES (2)",
"INSERT INTO inventory.redis_test VALUES (3)",
"INSERT INTO inventory.redis_test VALUES (4)",
"INSERT INTO inventory.redis_test VALUES (5)");
connection.close();
Testing.print("Sleeping for 5 seconds to simulate no connection errors");
Thread.sleep(5000);
Testing.print("Unpausing container");
RedisTestResourceLifecycleManager.unpause();
final List<StreamEntry> entries = getStreamElements(STREAM_NAME, MESSAGE_COUNT);
Testing.print("Entries in " + STREAM_NAME + ":" + entries.size());
assertTrue("Redis Connection Test Failed", entries.size() == MESSAGE_COUNT);
}
/**
* Test retry mechanism when encountering Redis Out of Memory:
* 1. Simulate a Redis OOM by setting its max memory to 1M.
* 2. Create a new table named redis_test2 in PostgreSQL and insert 1000 records to it.
* 3. As result, after inserting ~22 records, Redis runs OOM.
* 4. Sleep for additional 5 seconds to ensure the Sink is retrying.
* 5. Revert max memory setting so Redis is no longer in OOM and make sure all 100 records have been streamed successfully.
*/
@Test
@FixFor("DBZ-4510")
public void testRedisOOMRetry() throws Exception {
final int MESSAGE_COUNT = 100;
final String STREAM_NAME = "testc.inventory.redis_test2";
Testing.print("Setting Redis' maxmemory to 1M");
jedis.configSet("maxmemory", "1M");
final PostgresConnection connection = getPostgresConnection();
Testing.print("Creating new table redis_test2 and inserting 100 records to it");
connection.execute("CREATE TABLE inventory.redis_test2 " +
"(id VARCHAR(100) PRIMARY KEY, " +
"first_name VARCHAR(100), " +
"last_name VARCHAR(100))",
String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
"SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)",
MESSAGE_COUNT));
connection.close();
final List<StreamEntry> entriesWhenOOM = getStreamElements(STREAM_NAME, 22);
Testing.print("Stream size in OOM:" + entriesWhenOOM.size());
Testing.print("Reverting Redis' maxmemory");
jedis.configSet("maxmemory", "0");
final List<StreamEntry> entries = getStreamElements(STREAM_NAME, MESSAGE_COUNT);
Testing.print("Entries in " + STREAM_NAME + ":" + entries.size());
assertTrue("Redis OOM Test Failed", entries.size() == MESSAGE_COUNT);
} }
} }

View File

@ -25,7 +25,7 @@ public RedisTestConfigSource() {
redisTest.put("debezium.source.offset.flush.interval.ms", "0"); redisTest.put("debezium.source.offset.flush.interval.ms", "0");
redisTest.put("debezium.source.database.server.name", "testc"); redisTest.put("debezium.source.database.server.name", "testc");
redisTest.put("debezium.source.schema.include.list", "inventory"); redisTest.put("debezium.source.schema.include.list", "inventory");
redisTest.put("debezium.source.table.include.list", "inventory.customers"); redisTest.put("debezium.source.table.include.list", "inventory.customers,inventory.redis_test,inventory.redis_test2");
config = redisTest; config = redisTest;
} }

View File

@ -49,6 +49,14 @@ public void stop() {
running.set(false); running.set(false);
} }
public static void pause() {
container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec();
}
public static void unpause() {
container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec();
}
public static String getRedisContainerAddress() { public static String getRedisContainerAddress() {
start(true); start(true);

View File

@ -619,6 +619,15 @@ So this string will be used as key for records without primary key.
|Redis does not support the notion of null payloads, as is the case with tombstone events. |Redis does not support the notion of null payloads, as is the case with tombstone events.
So this string will be used as value for records without a payload. So this string will be used as value for records without a payload.
|[[redis-retry-initial-delay-ms]]<<redis-retry-initial-delay-ms, `debezium.sink.redis.retry.initial.delay.ms`>>
|`300`
|Initial retry delay when encountering Redis connection or OOM issues.
This value will be doubled upon every retry but won't exceed `debezium.sink.redis.retry.max.delay.ms`
|[[redis-retry-max-delay-ms]]<<redis-retry-max-delay-ms, `debezium.sink.redis.retry.max.delay.ms`>>
|`10000`
|Max delay when encountering Redis connection or OOM issues.
|=== |===