DBZ-6952 Fix reconnect issue with Redis

This commit is contained in:
Sergey Eizner 2023-09-20 14:46:22 -04:00 committed by Jiri Pechanec
parent 3cc70c8a52
commit 84ab7f7b8a
7 changed files with 167 additions and 1 deletions

View File

@ -74,6 +74,32 @@
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<!--

View File

@ -125,4 +125,9 @@ public String info(String section) {
return tryErrors(() -> jedis.info(section));
}
@Override
public String clientList() {
return tryErrors(() -> jedis.clientList());
}
}

View File

@ -90,4 +90,9 @@ public interface RedisClient {
*/
String info(String section);
/**
* @return
* @throws RedisClientConnectionException
*/
String clientList();
}

View File

@ -117,4 +117,8 @@ public String info(String section) {
return delegate.info(section);
}
@Override
public String clientList() {
return delegate.clientList();
}
}

View File

@ -10,12 +10,14 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.Configuration;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
@ -35,7 +37,16 @@ public class RedisOffsetBackingStore extends MemoryOffsetBackingStore {
private RedisClient client;
public RedisClient getRedisClient() {
return client;
}
public void setRedisClient(RedisClient client) {
this.client = client;
}
void connect() {
closeClient();
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getDbIndex(), config.getUser(), config.getPassword(),
config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled());
client = redisConnection.getRedisClient(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(),
@ -49,6 +60,10 @@ public void configure(WorkerConfig config) {
this.config = new RedisOffsetBackingStoreConfig(configuration);
}
public void configure(RedisOffsetBackingStoreConfig config) {
this.config = config;
}
@Override
public synchronized void start() {
super.start();
@ -57,8 +72,22 @@ public synchronized void start() {
this.load();
}
@VisibleForTesting
synchronized void startNoLoad() {
super.start();
this.connect();
}
private void closeClient() {
if (client != null) {
client.close(); // Disconnect from Redis
client = null;
}
}
@Override
public synchronized void stop() {
closeClient();
super.stop();
// Nothing to do since this doesn't maintain any outstanding connections/data
LOGGER.info("Stopped RedisOffsetBackingStore");
@ -67,7 +96,8 @@ public synchronized void stop() {
/**
* Load offsets from Redis keys
*/
private void load() {
@VisibleForTesting
void load() {
// fetch the value from Redis
Map<String, String> offsets = Uni.createFrom().item(() -> {
return (Map<String, String>) client.hgetAll(config.getRedisKeyName());
@ -92,6 +122,10 @@ private void load() {
})
.await().indefinitely();
this.data = new HashMap<>();
LOGGER.info("Offsets: {}", offsets.entrySet().stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(", ", "{ ", " }")));
for (Map.Entry<String, String> mapEntry : offsets.entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey().getBytes()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue().getBytes()) : null;

View File

@ -165,6 +165,11 @@ public String info(String section) {
return "";
}
@Override
public String clientList() {
return null;
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.storage.redis.offset;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import io.debezium.config.Configuration;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
@Testcontainers
class RedisOffsetBackingStoreTest {
@Container
public GenericContainer redis = new GenericContainer(DockerImageName.parse(REDIS_CONTAINER_IMAGE))
.withExposedPorts(6379);
private static final String PROP_PREFIX = "offset.storage.redis.";
private static final String REDIS_CONTAINER_IMAGE = "redis:5.0.3-alpine";
private static final String NEW_LINE = "\n";
private String address;
private int port;
@BeforeEach
public void setUp() {
redis.start();
this.address = redis.getHost();
this.port = redis.getFirstMappedPort();
}
@AfterEach
public void tearDown() {
if (redis != null) {
redis.stop();
}
}
@Test
public void testRedisConnection() throws InterruptedException {
RedisOffsetBackingStoreConfig config = getRedisOffsetBackingStoreConfig();
RedisOffsetBackingStore redisOffsetBackingStore = new RedisOffsetBackingStore();
redisOffsetBackingStore.configure(config);
redisOffsetBackingStore.startNoLoad();
RedisClient client = redisOffsetBackingStore.getRedisClient();
int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
RedisClient mockClient = Mockito.spy(client);
when(mockClient.hgetAll(anyString())).thenThrow(RedisClientConnectionException.class);
redisOffsetBackingStore.setRedisClient(mockClient);
redisOffsetBackingStore.load();
client = redisOffsetBackingStore.getRedisClient();
clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
redisOffsetBackingStore.stop();
assert (redisOffsetBackingStore.getRedisClient() == null);
}
private long getClientsNumber(String name, RedisClient client) {
return Arrays.stream(client.clientList().split(NEW_LINE)).filter(entry -> entry.contains(name)).count();
}
private RedisOffsetBackingStoreConfig getRedisOffsetBackingStoreConfig() {
Map<String, String> dummyConfig = new HashMap<>();
dummyConfig.put(PROP_PREFIX + "address", this.address + ":" + this.port);
return new RedisOffsetBackingStoreConfig(Configuration.from(dummyConfig));
}
}