From fb3ed4e2eaf37c5cac4e4fe718386a470133a71c Mon Sep 17 00:00:00 2001 From: ggaborg Date: Sun, 11 Dec 2022 15:20:49 +0200 Subject: [PATCH] DBZ-5911 Debezium Server stops with NPE when Redis does not report the "maxmemory" field in "info memory" command --- .../server/redis/RedisMemoryThreshold.java | 131 +++++++++++++ .../redis/RedisStreamChangeConsumer.java | 49 +---- .../RedisStreamChangeConsumerConfig.java | 11 ++ .../redis/RedisMemoryThresholdTest.java | 182 ++++++++++++++++++ .../pages/operations/debezium-server.adoc | 10 +- 5 files changed, 336 insertions(+), 47 deletions(-) create mode 100644 debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisMemoryThreshold.java create mode 100644 debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisMemoryThresholdTest.java diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisMemoryThreshold.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisMemoryThreshold.java new file mode 100644 index 000000000..f7d4be6ee --- /dev/null +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisMemoryThreshold.java @@ -0,0 +1,131 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.redis; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.storage.redis.RedisClient; +import io.debezium.util.IoUtil; +import io.smallrye.mutiny.tuples.Tuple2; + +public class RedisMemoryThreshold { + + private static final Logger LOGGER = LoggerFactory.getLogger(RedisMemoryThreshold.class); + + private static final String INFO_MEMORY = "memory"; + private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory"; + private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory"; + + private static final Supplier MEMORY_OK = () -> true; + + private RedisClient client; + + private int memoryThreshold; + + private long memoryLimit; + + private Supplier isMemoryOk; + + public RedisMemoryThreshold(RedisClient client, RedisStreamChangeConsumerConfig config) { + this.client = client; + this.memoryThreshold = config.getMemoryThreshold(); + this.memoryLimit = 1024L * 1024 * config.getMemoryLimitMb(); + if (memoryThreshold == 0 || memoryTuple(memoryLimit) == null) { + disable(); + } + else { + this.isMemoryOk = () -> isMemoryOk(); + } + } + + public boolean check() { + return isMemoryOk.get(); + } + + private boolean isMemoryOk() { + Tuple2 memoryTuple = memoryTuple(memoryLimit); + if (memoryTuple == null) { + disable(); + return true; + } + long maxMemory = memoryTuple.getItem2(); + if (maxMemory > 0) { + long usedMemory = memoryTuple.getItem1(); + long percentage = usedMemory * 100 / maxMemory; + if (percentage >= memoryThreshold) { + LOGGER.warn("Memory threshold percentage was reached (current: {}%, configured: {}%, used_memory: {}, maxmemory: {}).", percentage, memoryThreshold, + usedMemory, maxMemory); + return false; + } + } + return true; + } + + private Tuple2 memoryTuple(long defaultMaxMemory) { + String memory = client.info(INFO_MEMORY); + Map infoMemory = new HashMap<>(); + try { + IoUtil.readLines(new ByteArrayInputStream(memory.getBytes(StandardCharsets.UTF_8)), line -> { + String[] pair = line.split(":"); + if (pair.length == 2) { + infoMemory.put(pair[0], pair[1]); + } + }); + } + catch (IOException e) { + LOGGER.error("Cannot parse Redis 'info memory' result '{}'.", memory, e); + return null; + } + + Long usedMemory = parseLong(INFO_MEMORY_SECTION_USEDMEMORY, infoMemory.get(INFO_MEMORY_SECTION_USEDMEMORY)); + if (usedMemory == null) { + return null; + } + + Long maxMemory = parseLong(INFO_MEMORY_SECTION_MAXMEMORY, infoMemory.get(INFO_MEMORY_SECTION_MAXMEMORY)); + if (maxMemory == null) { + if (defaultMaxMemory == 0) { + LOGGER.warn("Memory limit is disabled '{}'.", defaultMaxMemory); + return null; + } + LOGGER.debug("Using memory limit with value '{}'.", defaultMaxMemory); + maxMemory = defaultMaxMemory; + } + else if (maxMemory == 0) { + LOGGER.debug("Redis 'info memory' field '{}' is {}. Consider configuring it.", INFO_MEMORY_SECTION_MAXMEMORY, maxMemory); + if (defaultMaxMemory > 0) { + maxMemory = defaultMaxMemory; + LOGGER.debug("Using memory limit with value '{}'.", defaultMaxMemory); + } + } + + return Tuple2.of(usedMemory, maxMemory); + } + + private void disable() { + isMemoryOk = MEMORY_OK; + LOGGER.warn("Memory threshold percentage check is disabled!"); + } + + private Long parseLong(String name, String value) { + try { + return Long.valueOf(value); + } + catch (NumberFormatException e) { + LOGGER.debug("Cannot parse Redis 'info memory' field '{}' with value '{}'.", name, value); + } + return null; + } + +} diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java index c9afed159..92bcb0432 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumer.java @@ -8,19 +8,14 @@ import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_COMPACT; import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_EXTENDED; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -44,7 +39,6 @@ import io.debezium.storage.redis.RedisClientConnectionException; import io.debezium.storage.redis.RedisConnection; import io.debezium.util.DelayStrategy; -import io.debezium.util.IoUtil; /** * Implementation of the consumer that delivers the messages into Redis (stream) destination. @@ -64,15 +58,11 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer private static final String EXTENDED_MESSAGE_KEY_KEY = "key"; private static final String EXTENDED_MESSAGE_VALUE_KEY = "value"; - private static final String INFO_MEMORY = "memory"; - private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory"; - private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory"; - private RedisClient client; private BiFunction> recordMapFunction; - private Supplier isMemoryOk; + private RedisMemoryThreshold isMemoryOk; private RedisStreamChangeConsumerConfig config; @@ -93,13 +83,12 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) { recordMapFunction = Collections::singletonMap; } - int memoryThreshold = config.getMemoryThreshold(); - isMemoryOk = memoryThreshold > 0 ? () -> isMemoryOk(memoryThreshold) : () -> true; - RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getUser(), config.getPassword(), config.getConnectionTimeout(), config.getSocketTimeout(), config.isSslEnabled()); client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(), config.isWaitRetryEnabled(), config.getWaitRetryDelay()); + + isMemoryOk = new RedisMemoryThreshold(client, config); } @PreDestroy @@ -159,7 +148,7 @@ public void handleBatch(List> records, LOGGER.error("Can't connect to Redis", e); } } - else if (canHandleBatch()) { + else if (isMemoryOk.check()) { try { LOGGER.trace("Preparing a Redis Pipeline of {} records", clonedBatch.size()); @@ -225,34 +214,4 @@ else if (canHandleBatch()) { committer.markBatchFinished(); } - private boolean canHandleBatch() { - return isMemoryOk.get(); - } - - private boolean isMemoryOk(int memoryThreshold) { - String memory = client.info(INFO_MEMORY); - Map infoMemory = new HashMap<>(); - try { - IoUtil.readLines(new ByteArrayInputStream(memory.getBytes(StandardCharsets.UTF_8)), line -> { - String[] pair = line.split(":"); - if (pair.length == 2) { - infoMemory.put(pair[0], pair[1]); - } - }); - } - catch (IOException e) { - LOGGER.error("Cannot parse Redis info memory {}", memory, e); - return true; - } - long maxMemory = Long.parseLong(infoMemory.get(INFO_MEMORY_SECTION_MAXMEMORY)); - if (maxMemory > 0) { - long usedMemory = Long.parseLong(infoMemory.get(INFO_MEMORY_SECTION_USEDMEMORY)); - long percentage = 100 * usedMemory / maxMemory; - if (percentage >= memoryThreshold) { - LOGGER.warn("Used memory percentage of {}% is higher than configured threshold of {}%", percentage, memoryThreshold); - return false; - } - } - return true; - } } \ No newline at end of file diff --git a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumerConfig.java b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumerConfig.java index b9bc0118a..7483f5916 100644 --- a/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumerConfig.java +++ b/debezium-server/debezium-server-redis/src/main/java/io/debezium/server/redis/RedisStreamChangeConsumerConfig.java @@ -41,11 +41,17 @@ public class RedisStreamChangeConsumerConfig extends RedisCommonConfig { .withDefault(DEFAULT_MEMORY_THRESHOLD_PERCENTAGE) .withValidation(RangeValidator.between(0, 100)); + private static final int DEFAULT_MEMORY_LIMIT_MB = 0; + private static final Field PROP_MEMORY_LIMIT_MB = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "memory.limit.mb") + .withDefault(DEFAULT_MEMORY_LIMIT_MB) + .withValidation(RangeValidator.atLeast(0)); + private int batchSize; private String nullKey; private String nullValue; private String messageFormat; private int memoryThreshold; + private int memoryLimitMb; public RedisStreamChangeConsumerConfig(Configuration config) { super(config, PROP_PREFIX); @@ -59,6 +65,7 @@ protected void init(Configuration config) { nullValue = config.getString(PROP_NULL_VALUE); messageFormat = config.getString(PROP_MESSAGE_FORMAT); memoryThreshold = config.getInteger(PROP_MEMORY_THRESHOLD_PERCENTAGE); + memoryLimitMb = config.getInteger(PROP_MEMORY_LIMIT_MB); } @Override @@ -88,4 +95,8 @@ public int getMemoryThreshold() { return memoryThreshold; } + public int getMemoryLimitMb() { + return memoryLimitMb; + } + } \ No newline at end of file diff --git a/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisMemoryThresholdTest.java b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisMemoryThresholdTest.java new file mode 100644 index 000000000..faf10a2ed --- /dev/null +++ b/debezium-server/debezium-server-redis/src/test/java/io/debezium/server/redis/RedisMemoryThresholdTest.java @@ -0,0 +1,182 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.redis; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.jupiter.api.Test; + +import io.debezium.config.Configuration; +import io.debezium.storage.redis.RedisClient; +import io.debezium.util.Collect; + +public class RedisMemoryThresholdTest { + + private static final String _1MB = String.valueOf(1 * 1024 * 1024); + private static final String _2MB = String.valueOf(2 * 1024 * 1024); + private static final String _3MB = String.valueOf(3 * 1024 * 1024); + private static final String _4MB = String.valueOf(4 * 1024 * 1024); + + @Test + public void testThresholdPercentageDisabled() { + int[] thresholdList = { 0 }; + int[] limitMbList = { 0, 1, 2, 3, 4 }; + String[] usedMemoryList = { "asd3f", "2048L", null, _1MB, _2MB, _3MB, _4MB }; + String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB }; + + for (int threshold : thresholdList) { + for (int limit : limitMbList) { + for (String used : usedMemoryList) { + for (String max : maxMemoryList) { + isMemoryOk(threshold, limit, used, max, true); + } + } + } + } + } + + @Test + public void testUsedMemoryBad() { + int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 }; + int[] limitMbList = { 0, 1, 2, 3, 4 }; + String[] usedMemoryList = { "asd3f", "2048L" }; + String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB }; + + for (int threshold : thresholdList) { + for (int limit : limitMbList) { + for (String used : usedMemoryList) { + for (String max : maxMemoryList) { + isMemoryOk(threshold, limit, used, max, true); + } + } + } + } + } + + @Test + public void testUsedMemoryNotReported() { + int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 }; + int[] limitMbList = { 0, 1, 2, 3, 4 }; + String[] usedMemoryList = { null }; + String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB }; + + for (int threshold : thresholdList) { + for (int limit : limitMbList) { + for (String used : usedMemoryList) { + for (String max : maxMemoryList) { + isMemoryOk(threshold, limit, used, max, true); + } + } + } + } + } + + @Test + public void testMemoryLimit() { + int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 }; + int[] limitMbList = { 0, 1, 2, 3, 4 }; + String[] usedMemoryList = { _1MB, _2MB, _3MB, _4MB }; + String[] maxMemoryList = { "asd3f", "2048L", null, "0" }; + + for (int threshold : thresholdList) { + for (int limit : limitMbList) { + for (String used : usedMemoryList) { + for (String max : maxMemoryList) { + isMemoryOk(threshold, limit, used, max, 0 == limit ? true : Long.parseLong(used) * 100 / (limit * 1024 * 1024) < threshold); + } + } + } + } + } + + @Test + public void testMaxMemory() { + int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 }; + int[] limitMbList = { 0, 1, 2, 3, 4 }; + String[] usedMemoryList = { _1MB, _2MB, _3MB, _4MB }; + String[] maxMemoryList = { _1MB, _2MB, _3MB, _4MB }; + + for (int threshold : thresholdList) { + for (int limit : limitMbList) { + for (String used : usedMemoryList) { + for (String max : maxMemoryList) { + isMemoryOk(threshold, limit, used, max, Long.parseLong(used) * 100 / Long.parseLong(max) < threshold); + } + } + } + } + } + + private void isMemoryOk(int threshold, int memoryLimitMb, String usedMemoryBytes, String maxMemoryBytes, boolean expectedResult) { + Configuration config = Configuration.from(Collect.hashMapOf("debezium.sink.redis.address", "localhost", "debezium.sink.redis.memory.threshold.percentage", + threshold, "debezium.sink.redis.memory.limit.mb", memoryLimitMb)); + RedisMemoryThreshold isMemoryOk = new RedisMemoryThreshold(new RedisClientImpl(usedMemoryBytes, maxMemoryBytes), new RedisStreamChangeConsumerConfig(config)); + Assert.assertEquals(String.format("isMemoryOk failed for threshold %s, limit %s, used %s, max %s)", threshold, memoryLimitMb, usedMemoryBytes, maxMemoryBytes), + expectedResult, isMemoryOk.check()); + } + + private static class RedisClientImpl implements RedisClient { + + private String infoMemory; + + private RedisClientImpl(String usedMemoryBytes, String maxMemoryBytes) { + this.infoMemory = (usedMemoryBytes == null ? "" : "used_memory:" + usedMemoryBytes + "\n") + (maxMemoryBytes == null ? "" : "maxmemory:" + maxMemoryBytes); + } + + @Override + public String info(String section) { + return infoMemory; + } + + @Override + public void disconnect() { + } + + @Override + public void close() { + } + + @Override + public String xadd(String key, Map hash) { + return null; + } + + @Override + public List xadd(List>> hashes) { + return null; + } + + @Override + public List> xrange(String key) { + return null; + } + + @Override + public long xlen(String key) { + return 0; + } + + @Override + public Map hgetAll(String key) { + return null; + } + + @Override + public long hset(byte[] key, byte[] field, byte[] value) { + return 0; + } + + @Override + public long waitReplicas(int replicas, long timeout) { + return 0; + } + + } + +} diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index 973a56fef..37a720f1c 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -912,8 +912,14 @@ Read more about the message format xref:#p-redis-message-format[below]. |[[redis-memory-threshold-percentage]]<> |`85` -|The sink will stop consuming records if the used memory percentage (out of Redis configured maxmemory) is higher than this threshold. -If Redis configured maxmemory is `0` (unlimited) then this threshold is disabled (same as if configuring it with `0`) +|The sink will stop consuming records if the `used_memory` percentage (out of Redis configured `maxmemory`) is higher or equal to this threshold. +If the configured value is `0` then this threshold is disabled. + +|[[redis-memory-limit-mb]]<> +|`0` +|If Redis `maxmemory` is not available or `0`, the `xref:#redis-memory-threshold-percentage[debezium.sink.redis.memory.threshold.percentage]` +will apply to this value (if this value is positive). +By default it is `0` (disabled). |===