diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java index 18d3435ee..eca7edb00 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java @@ -70,7 +70,7 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer private static final String PROP_WAIT_TIMEOUT = PROP_PREFIX + "wait.timeout.ms"; private static final String PROP_WAIT_RETRY_ENABLED = PROP_PREFIX + "wait.retry.enabled"; private static final String PROP_WAIT_RETRY_DELAY = PROP_PREFIX + "wait.retry.delay.ms"; - private static final String PROP_MEMORY_THRESHOLD = PROP_PREFIX + "memory.threshold"; + private static final String PROP_MEMORY_THRESHOLD_PERCENTAGE = PROP_PREFIX + "memory.threshold.percentage"; private static final String MESSAGE_FORMAT_COMPACT = "compact"; private static final String MESSAGE_FORMAT_EXTENDED = "extended"; @@ -134,7 +134,10 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) { String.format("Property %s expects value one of '%s' or '%s'", PROP_MESSAGE_FORMAT, MESSAGE_FORMAT_EXTENDED, MESSAGE_FORMAT_COMPACT)); } - int memoryThreshold = config.getOptionalValue(PROP_MEMORY_THRESHOLD, Integer.class).orElse(85); + int memoryThreshold = config.getOptionalValue(PROP_MEMORY_THRESHOLD_PERCENTAGE, Integer.class).orElse(85); + if (memoryThreshold < 0 || memoryThreshold > 100) { + throw new DebeziumException(String.format("Property %s should be between 0 and 100", PROP_MEMORY_THRESHOLD_PERCENTAGE)); + } isMemoryOk = memoryThreshold > 0 ? () -> isMemoryOk(memoryThreshold) : () -> true; boolean waitEnabled = config.getOptionalValue(PROP_WAIT_ENABLED, Boolean.class).orElse(false); @@ -293,7 +296,7 @@ private boolean isMemoryOk(int memoryThreshold) { long usedMemory = Long.parseLong(infoMemory.get("used_memory")); long percentage = 100 * usedMemory / maxMemory; if (percentage >= memoryThreshold) { - LOGGER.warn("Used memory percentage of {}% is higher then configured threshold of {}%", percentage, memoryThreshold); + LOGGER.warn("Used memory percentage of {}% is higher than configured threshold of {}%", percentage, memoryThreshold); return false; } } diff --git a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java index 6335a180f..7126ac9c1 100644 --- a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java +++ b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamMemoryThresholdTestProfile.java @@ -12,7 +12,7 @@ public class RedisStreamMemoryThresholdTestProfile extends RedisStreamTestProfil @Override public Map getConfigOverrides() { Map config = super.getConfigOverrides(); - config.put("debezium.sink.redis.memory.threshold", "75"); + config.put("debezium.sink.redis.memory.threshold.percentage", "75"); return config; } diff --git a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java index 1c8586d2f..2349d22d6 100644 --- a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java +++ b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisStreamTestProfile.java @@ -30,7 +30,7 @@ public Map getConfigOverrides() { Map config = new HashMap(); config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); config.put("debezium.source." + OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); - config.put("debezium.sink.redis.memory.threshold", "0"); + config.put("debezium.sink.redis.memory.threshold.percentage", "0"); return config; } diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index 6abb824fe..dbd82d3e9 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -910,7 +910,7 @@ For more information see Redis https://redis.io/commands/wait/[WAIT] command. |The format of the message sent to the Redis stream. Possible values are `extended`(newer format) and `compact`(the until now, old format). Read more about the message format xref:#p-redis-message-format[below]. -|[[redis-memory-threshold]]<> +|[[redis-memory-threshold-percentage]]<> |`85` |The sink will stop consuming records if the used memory percentage (out of Redis configured maxmemory) is higher than this threshold. If Redis configured maxmemory is `0` (unlimited) then this threshold is disabled (same as if configuring it with `0`)