DBZ-7879 Improve tests for RedisOffsetBackingStoreTest

This commit is contained in:
Minh Son Nguyen 2024-08-02 15:31:12 +03:00 committed by Jiri Pechanec
parent 7c9526be38
commit 212687bd2f

View File

@ -15,6 +15,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
@ -53,32 +54,70 @@ public void tearDown() {
@Test
public void testRedisConnection() throws InterruptedException {
RedisOffsetBackingStoreConfig config = getRedisOffsetBackingStoreConfig();
RedisOffsetBackingStore redisOffsetBackingStore = new RedisOffsetBackingStore();
redisOffsetBackingStore.configure(config);
redisOffsetBackingStore.startNoLoad();
RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore();
RedisClient client = redisOffsetBackingStore.getRedisClient();
int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
RedisClient mockClient = Mockito.spy(client);
when(mockClient.hgetAll(anyString())).thenThrow(RedisClientConnectionException.class);
redisOffsetBackingStore.setRedisClient(mockClient);
redisOffsetBackingStore.load();
client = redisOffsetBackingStore.getRedisClient();
clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
redisOffsetBackingStore.stop();
assert (redisOffsetBackingStore.getRedisClient() == null);
}
@Test
@Timeout(5) // Ensure we don't stuck in endless retry loop
public void testLoadWithRetry() throws InterruptedException {
RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore();
RedisClient client = redisOffsetBackingStore.getRedisClient();
RedisClient mockClient = Mockito.spy(client);
when(mockClient.hgetAll(anyString())).thenThrow(RedisClientConnectionException.class);
redisOffsetBackingStore.setRedisClient(mockClient);
redisOffsetBackingStore.load();
client = redisOffsetBackingStore.getRedisClient();
int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
redisOffsetBackingStore.stop();
}
@Test
@Timeout(5) // Ensure we don't stuck in endless retry loop
public void testSaveWithRetry() throws InterruptedException {
RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore();
RedisClient client = redisOffsetBackingStore.getRedisClient();
// Load test key-value pair to redis to able to test save()
// besides redisKey, values of field and value are not important for this test
byte[] redisKey = "metadata:debezium:offsets".getBytes();
byte[] field = "".getBytes();
byte[] value = "".getBytes();
client.hset(redisKey, field, value);
redisOffsetBackingStore.load();
RedisClient mockClient = Mockito.spy(client);
when(mockClient.hset(redisKey, field, value)).thenThrow(RedisClientConnectionException.class);
redisOffsetBackingStore.setRedisClient(mockClient);
redisOffsetBackingStore.save();
client = redisOffsetBackingStore.getRedisClient();
int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client);
assert (clientsNum == 1);
redisOffsetBackingStore.stop();
}
private long getClientsNumber(String name, RedisClient client) {
return Arrays.stream(client.clientList().split(NEW_LINE)).filter(entry -> entry.contains(name)).count();
}
private RedisOffsetBackingStore getRedisOffsetBackingStore() {
RedisOffsetBackingStoreConfig config = getRedisOffsetBackingStoreConfig();
RedisOffsetBackingStore redisOffsetBackingStore = new RedisOffsetBackingStore();
redisOffsetBackingStore.configure(config);
redisOffsetBackingStore.startNoLoad();
return redisOffsetBackingStore;
}
private RedisOffsetBackingStoreConfig getRedisOffsetBackingStoreConfig() {
Map<String, String> dummyConfig = new HashMap<>();
dummyConfig.put(PROP_PREFIX + "address", this.address + ":" + this.port);