diff --git a/debezium-storage/debezium-storage-redis/src/test/java/io/debezium/storage/redis/offset/RedisOffsetBackingStoreTest.java b/debezium-storage/debezium-storage-redis/src/test/java/io/debezium/storage/redis/offset/RedisOffsetBackingStoreTest.java index dbed8db4b..37e88b0f8 100644 --- a/debezium-storage/debezium-storage-redis/src/test/java/io/debezium/storage/redis/offset/RedisOffsetBackingStoreTest.java +++ b/debezium-storage/debezium-storage-redis/src/test/java/io/debezium/storage/redis/offset/RedisOffsetBackingStoreTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; import org.testcontainers.containers.GenericContainer; import org.testcontainers.junit.jupiter.Container; @@ -53,32 +54,70 @@ public void tearDown() { @Test public void testRedisConnection() throws InterruptedException { - RedisOffsetBackingStoreConfig config = getRedisOffsetBackingStoreConfig(); - RedisOffsetBackingStore redisOffsetBackingStore = new RedisOffsetBackingStore(); - redisOffsetBackingStore.configure(config); - - redisOffsetBackingStore.startNoLoad(); - + RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore(); RedisClient client = redisOffsetBackingStore.getRedisClient(); - int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client); - assert (clientsNum == 1); - RedisClient mockClient = Mockito.spy(client); - when(mockClient.hgetAll(anyString())).thenThrow(RedisClientConnectionException.class); - redisOffsetBackingStore.setRedisClient(mockClient); - redisOffsetBackingStore.load(); - client = redisOffsetBackingStore.getRedisClient(); - clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client); + int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client); assert (clientsNum == 1); redisOffsetBackingStore.stop(); assert (redisOffsetBackingStore.getRedisClient() == null); } + @Test + @Timeout(5) // Ensure we don't stuck in endless retry loop + public void testLoadWithRetry() throws InterruptedException { + RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore(); + RedisClient client = redisOffsetBackingStore.getRedisClient(); + + RedisClient mockClient = Mockito.spy(client); + when(mockClient.hgetAll(anyString())).thenThrow(RedisClientConnectionException.class); + redisOffsetBackingStore.setRedisClient(mockClient); + redisOffsetBackingStore.load(); + client = redisOffsetBackingStore.getRedisClient(); + int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client); + assert (clientsNum == 1); + + redisOffsetBackingStore.stop(); + } + + @Test + @Timeout(5) // Ensure we don't stuck in endless retry loop + public void testSaveWithRetry() throws InterruptedException { + RedisOffsetBackingStore redisOffsetBackingStore = getRedisOffsetBackingStore(); + RedisClient client = redisOffsetBackingStore.getRedisClient(); + + // Load test key-value pair to redis to able to test save() + // besides redisKey, values of field and value are not important for this test + byte[] redisKey = "metadata:debezium:offsets".getBytes(); + byte[] field = "".getBytes(); + byte[] value = "".getBytes(); + client.hset(redisKey, field, value); + redisOffsetBackingStore.load(); + + RedisClient mockClient = Mockito.spy(client); + when(mockClient.hset(redisKey, field, value)).thenThrow(RedisClientConnectionException.class); + redisOffsetBackingStore.setRedisClient(mockClient); + redisOffsetBackingStore.save(); + client = redisOffsetBackingStore.getRedisClient(); + int clientsNum = (int) getClientsNumber(RedisConnection.DEBEZIUM_OFFSETS_CLIENT_NAME, client); + assert (clientsNum == 1); + + redisOffsetBackingStore.stop(); + } + private long getClientsNumber(String name, RedisClient client) { return Arrays.stream(client.clientList().split(NEW_LINE)).filter(entry -> entry.contains(name)).count(); } + private RedisOffsetBackingStore getRedisOffsetBackingStore() { + RedisOffsetBackingStoreConfig config = getRedisOffsetBackingStoreConfig(); + RedisOffsetBackingStore redisOffsetBackingStore = new RedisOffsetBackingStore(); + redisOffsetBackingStore.configure(config); + redisOffsetBackingStore.startNoLoad(); + return redisOffsetBackingStore; + } + private RedisOffsetBackingStoreConfig getRedisOffsetBackingStoreConfig() { Map dummyConfig = new HashMap<>(); dummyConfig.put(PROP_PREFIX + "address", this.address + ":" + this.port);