DBZ-4912 Debezium Server Redis sink: multi-shard support
This commit is contained in:
parent
0581fe1a6b
commit
2e0ec4a86c
@ -5,9 +5,11 @@
|
||||
*/
|
||||
package io.debezium.server.redis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -31,8 +33,8 @@
|
||||
|
||||
import redis.clients.jedis.HostAndPort;
|
||||
import redis.clients.jedis.Jedis;
|
||||
import redis.clients.jedis.Pipeline;
|
||||
import redis.clients.jedis.StreamEntryID;
|
||||
import redis.clients.jedis.Transaction;
|
||||
import redis.clients.jedis.exceptions.JedisConnectionException;
|
||||
import redis.clients.jedis.exceptions.JedisDataException;
|
||||
|
||||
@ -40,6 +42,7 @@
|
||||
* Implementation of the consumer that delivers the messages into Redis (stream) destination.
|
||||
*
|
||||
* @author M Sazzadul Hoque
|
||||
* @author Yossi Shirizli
|
||||
*/
|
||||
@Named("redis")
|
||||
@Dependent
|
||||
@ -137,6 +140,10 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
|
||||
batches(records, batchSize).forEach(batch -> {
|
||||
boolean completedSuccessfully = false;
|
||||
|
||||
// Clone the batch and remove the records that have been successfully processed.
|
||||
// Move to the next batch once this list is empty.
|
||||
List<ChangeEvent<Object, Object>> clonedBatch = batch.stream().collect(Collectors.toList());
|
||||
|
||||
// As long as we failed to execute the current batch to the stream, we should retry if the reason was either a connection error or OOM in Redis.
|
||||
while (!completedSuccessfully) {
|
||||
if (client == null) {
|
||||
@ -151,46 +158,63 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
|
||||
}
|
||||
}
|
||||
else {
|
||||
Transaction transaction;
|
||||
Pipeline pipeline;
|
||||
try {
|
||||
LOGGER.trace("Preparing a Redis Transaction of {} records", batch.size());
|
||||
transaction = client.multi();
|
||||
LOGGER.trace("Preparing a Redis Pipeline of {} records", clonedBatch.size());
|
||||
pipeline = client.pipelined();
|
||||
|
||||
// Add the batch records to the stream(s) via Transaction
|
||||
for (ChangeEvent<Object, Object> record : batch) {
|
||||
// Add the batch records to the stream(s) via Pipeline
|
||||
for (ChangeEvent<Object, Object> record : clonedBatch) {
|
||||
String destination = streamNameMapper.map(record.destination());
|
||||
String key = (record.key() != null) ? getString(record.key()) : nullKey;
|
||||
String value = (record.value() != null) ? getString(record.value()) : nullValue;
|
||||
|
||||
// Add the record to the destination stream
|
||||
transaction.xadd(destination, StreamEntryID.NEW_ENTRY, Collections.singletonMap(key, value));
|
||||
pipeline.xadd(destination, StreamEntryID.NEW_ENTRY, Collections.singletonMap(key, value));
|
||||
}
|
||||
|
||||
// Execute the transaction in Redis
|
||||
transaction.exec();
|
||||
// Sync the pipeline in Redis and parse the responses (response per command with the same order)
|
||||
List<Object> responses = pipeline.syncAndReturnAll();
|
||||
List<ChangeEvent<Object, Object>> processedRecords = new ArrayList<ChangeEvent<Object, Object>>();
|
||||
int index = 0;
|
||||
int totalOOMResponses = 0;
|
||||
|
||||
// Mark all the batch records as processed only when the transaction succeeds
|
||||
for (ChangeEvent<Object, Object> record : batch) {
|
||||
committer.markProcessed(record);
|
||||
for (Object response : responses) {
|
||||
String message = response.toString();
|
||||
// When Redis reaches its max memory limitation, an OOM error message will be retrieved.
|
||||
// In this case, we will retry execute the failed commands, assuming some memory will be freed eventually as result
|
||||
// of evicting elements from the stream by the target DB.
|
||||
if (message.contains("OOM command not allowed when used memory > 'maxmemory'")) {
|
||||
totalOOMResponses++;
|
||||
}
|
||||
else {
|
||||
// Mark the record as processed
|
||||
ChangeEvent<Object, Object> currentRecord = clonedBatch.get(index);
|
||||
committer.markProcessed(currentRecord);
|
||||
processedRecords.add(currentRecord);
|
||||
}
|
||||
|
||||
index++;
|
||||
}
|
||||
|
||||
clonedBatch.removeAll(processedRecords);
|
||||
|
||||
if (totalOOMResponses > 0) {
|
||||
LOGGER.warn("Redis runs OOM, {} command(s) failed", totalOOMResponses);
|
||||
}
|
||||
|
||||
if (clonedBatch.size() == 0) {
|
||||
completedSuccessfully = true;
|
||||
}
|
||||
completedSuccessfully = true;
|
||||
}
|
||||
catch (JedisConnectionException jce) {
|
||||
LOGGER.error("Connection error", jce);
|
||||
close();
|
||||
}
|
||||
catch (JedisDataException jde) {
|
||||
// When Redis reaches its max memory limitation, a JedisDataException will be thrown with one of the messages listed below.
|
||||
// In this case, we will retry execute the batch, assuming some memory will be freed eventually as result
|
||||
// of evicting elements from the stream by the target DB.
|
||||
if (jde.getMessage().equals("EXECABORT Transaction discarded because of: OOM command not allowed when used memory > 'maxmemory'.")) {
|
||||
LOGGER.error("Redis runs OOM", jde);
|
||||
}
|
||||
else if (jde.getMessage().startsWith("EXECABORT")) {
|
||||
LOGGER.error("Redis transaction error", jde);
|
||||
}
|
||||
// When Redis is starting, a JedisDataException will be thrown with this message.
|
||||
// We will retry communicating with the target DB as once of the Redis is available, this message will be gone.
|
||||
else if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
|
||||
if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory")) {
|
||||
LOGGER.error("Redis is starting", jde);
|
||||
}
|
||||
else {
|
||||
@ -212,4 +236,4 @@ else if (jde.getMessage().equals("LOADING Redis is loading the dataset in memory
|
||||
// Mark the whole batch as finished once the sub batches completed
|
||||
committer.markBatchFinished();
|
||||
}
|
||||
}
|
||||
}
|
@ -21,10 +21,9 @@
|
||||
import redis.clients.jedis.Jedis;
|
||||
|
||||
/**
|
||||
* Integration tests that verify basic reading from PostgreSQL database and writing to Redis stream
|
||||
* and retry mechanism in case of connectivity issues or OOM in Redis
|
||||
* Integration tests for secured Redis
|
||||
*
|
||||
* @author M Sazzadul Hoque
|
||||
* @author Oren Elias
|
||||
*/
|
||||
@QuarkusIntegrationTest
|
||||
@TestProfile(RedisSSLStreamTestProfile.class)
|
||||
|
@ -30,6 +30,7 @@
|
||||
* and retry mechanism in case of connectivity issues or OOM in Redis
|
||||
*
|
||||
* @author M Sazzadul Hoque
|
||||
* @author Yossi Shirizli
|
||||
*/
|
||||
@QuarkusIntegrationTest
|
||||
@TestProfile(RedisStreamTestProfile.class)
|
||||
@ -77,7 +78,6 @@ public void testRedisStream() throws Exception {
|
||||
@Test
|
||||
@FixFor("DBZ-4510")
|
||||
public void testRedisConnectionRetry() throws Exception {
|
||||
Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
|
||||
final int MESSAGE_COUNT = 5;
|
||||
final String STREAM_NAME = "testc.inventory.redis_test";
|
||||
Testing.print("Pausing container");
|
||||
@ -99,6 +99,7 @@ public void testRedisConnectionRetry() throws Exception {
|
||||
Testing.print("Unpausing container");
|
||||
RedisTestResourceLifecycleManager.unpause();
|
||||
|
||||
Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
|
||||
Long streamLength = getStreamLength(jedis, STREAM_NAME, MESSAGE_COUNT);
|
||||
|
||||
Testing.print("Entries in " + STREAM_NAME + ":" + streamLength);
|
||||
@ -109,24 +110,18 @@ public void testRedisConnectionRetry() throws Exception {
|
||||
/**
|
||||
* Test retry mechanism when encountering Redis Out of Memory:
|
||||
* 1. Simulate a Redis OOM by setting its max memory to 1M
|
||||
* 2. Create a new table named redis_test2 in PostgreSQL and insert 10 records to it
|
||||
* 3. Then, delete all the records in this table and expect the stream to contain 30 records
|
||||
* (10 inserted before + 20 as result of this deletion including the tombstone events)
|
||||
* 4. Insert 22 records to redis_test2 table and sleep for 1 second to simulate Redis OOM
|
||||
* 5. Delete the stream and expect those 22 records to be inserted to it as there's enough memory to complete this operation
|
||||
* 2. Create a new table named redis_test2 in PostgreSQL and insert 50 records to it
|
||||
* 3. Sleep for 1 second to simulate Redis OOM (stream does not contain 50 records)
|
||||
* 4. Unlimit memory and verify that all 50 records have been streamed
|
||||
*/
|
||||
@Test
|
||||
@FixFor("DBZ-4510")
|
||||
public void testRedisOOMRetry() throws Exception {
|
||||
Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
|
||||
final String STREAM_NAME = "testc.inventory.redis_test2";
|
||||
final int FIRST_BATCH_SIZE = 10;
|
||||
final int EXPECTED_STREAM_LENGTH_AFTER_DELETION = 30; // Every delete change record is followed by a tombstone event
|
||||
final int SECOND_BATCH_SIZE = 22;
|
||||
final String INSERT_SQL = "INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
|
||||
"SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)";
|
||||
final int TOTAL_RECORDS = 50;
|
||||
|
||||
Testing.print("Setting Redis' maxmemory to 2M");
|
||||
Testing.print("Setting Redis' maxmemory to 1M");
|
||||
jedis.configSet("maxmemory", "1M");
|
||||
|
||||
PostgresConnection connection = getPostgresConnection();
|
||||
@ -134,31 +129,18 @@ public void testRedisOOMRetry() throws Exception {
|
||||
"(id VARCHAR(100) PRIMARY KEY, " +
|
||||
"first_name VARCHAR(100), " +
|
||||
"last_name VARCHAR(100))");
|
||||
connection.execute(
|
||||
String.format(INSERT_SQL, FIRST_BATCH_SIZE));
|
||||
connection.execute(String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) " +
|
||||
"SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", TOTAL_RECORDS));
|
||||
connection.commit();
|
||||
|
||||
Long streamLengthAfterInserts = getStreamLength(jedis, STREAM_NAME, FIRST_BATCH_SIZE);
|
||||
Testing.print("Entries in " + STREAM_NAME + ":" + streamLengthAfterInserts);
|
||||
|
||||
connection.execute("DELETE FROM inventory.redis_test2");
|
||||
Long streamLengthAfterDeletion = getStreamLength(jedis, STREAM_NAME, EXPECTED_STREAM_LENGTH_AFTER_DELETION);
|
||||
Testing.print("Entries in " + STREAM_NAME + ":" + streamLengthAfterDeletion);
|
||||
|
||||
connection.execute(String.format(INSERT_SQL, SECOND_BATCH_SIZE));
|
||||
connection.close();
|
||||
Thread.sleep(1000);
|
||||
Testing.print("Entries in " + STREAM_NAME + ":" + jedis.xlen(STREAM_NAME));
|
||||
assertTrue(jedis.xlen(STREAM_NAME) < TOTAL_RECORDS);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
Testing.print("Deleting stream in order to free memory");
|
||||
jedis.del(STREAM_NAME);
|
||||
|
||||
Long streamLength = getStreamLength(jedis, STREAM_NAME, SECOND_BATCH_SIZE);
|
||||
|
||||
Testing.print("Entries in " + STREAM_NAME + ":" + streamLength);
|
||||
jedis.configSet("maxmemory", "0");
|
||||
jedis.close();
|
||||
Long streamLength = getStreamLength(jedis, STREAM_NAME, TOTAL_RECORDS);
|
||||
|
||||
assertTrue("Redis OOM Test Failed", streamLength == SECOND_BATCH_SIZE);
|
||||
assertTrue("Redis OOM Test Failed", streamLength == TOTAL_RECORDS);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user