DBZ-5911 Debezium Server stops with NPE when Redis does not report the "maxmemory" field in "info memory" command

This commit is contained in:
ggaborg 2022-12-11 15:20:49 +02:00 committed by Jiri Pechanec
parent 1c0300cf2a
commit fb3ed4e2ea
5 changed files with 336 additions and 47 deletions

View File

@ -0,0 +1,131 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.redis;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.storage.redis.RedisClient;
import io.debezium.util.IoUtil;
import io.smallrye.mutiny.tuples.Tuple2;
public class RedisMemoryThreshold {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisMemoryThreshold.class);
private static final String INFO_MEMORY = "memory";
private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory";
private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory";
private static final Supplier<Boolean> MEMORY_OK = () -> true;
private RedisClient client;
private int memoryThreshold;
private long memoryLimit;
private Supplier<Boolean> isMemoryOk;
public RedisMemoryThreshold(RedisClient client, RedisStreamChangeConsumerConfig config) {
this.client = client;
this.memoryThreshold = config.getMemoryThreshold();
this.memoryLimit = 1024L * 1024 * config.getMemoryLimitMb();
if (memoryThreshold == 0 || memoryTuple(memoryLimit) == null) {
disable();
}
else {
this.isMemoryOk = () -> isMemoryOk();
}
}
public boolean check() {
return isMemoryOk.get();
}
private boolean isMemoryOk() {
Tuple2<Long, Long> memoryTuple = memoryTuple(memoryLimit);
if (memoryTuple == null) {
disable();
return true;
}
long maxMemory = memoryTuple.getItem2();
if (maxMemory > 0) {
long usedMemory = memoryTuple.getItem1();
long percentage = usedMemory * 100 / maxMemory;
if (percentage >= memoryThreshold) {
LOGGER.warn("Memory threshold percentage was reached (current: {}%, configured: {}%, used_memory: {}, maxmemory: {}).", percentage, memoryThreshold,
usedMemory, maxMemory);
return false;
}
}
return true;
}
private Tuple2<Long, Long> memoryTuple(long defaultMaxMemory) {
String memory = client.info(INFO_MEMORY);
Map<String, String> infoMemory = new HashMap<>();
try {
IoUtil.readLines(new ByteArrayInputStream(memory.getBytes(StandardCharsets.UTF_8)), line -> {
String[] pair = line.split(":");
if (pair.length == 2) {
infoMemory.put(pair[0], pair[1]);
}
});
}
catch (IOException e) {
LOGGER.error("Cannot parse Redis 'info memory' result '{}'.", memory, e);
return null;
}
Long usedMemory = parseLong(INFO_MEMORY_SECTION_USEDMEMORY, infoMemory.get(INFO_MEMORY_SECTION_USEDMEMORY));
if (usedMemory == null) {
return null;
}
Long maxMemory = parseLong(INFO_MEMORY_SECTION_MAXMEMORY, infoMemory.get(INFO_MEMORY_SECTION_MAXMEMORY));
if (maxMemory == null) {
if (defaultMaxMemory == 0) {
LOGGER.warn("Memory limit is disabled '{}'.", defaultMaxMemory);
return null;
}
LOGGER.debug("Using memory limit with value '{}'.", defaultMaxMemory);
maxMemory = defaultMaxMemory;
}
else if (maxMemory == 0) {
LOGGER.debug("Redis 'info memory' field '{}' is {}. Consider configuring it.", INFO_MEMORY_SECTION_MAXMEMORY, maxMemory);
if (defaultMaxMemory > 0) {
maxMemory = defaultMaxMemory;
LOGGER.debug("Using memory limit with value '{}'.", defaultMaxMemory);
}
}
return Tuple2.of(usedMemory, maxMemory);
}
private void disable() {
isMemoryOk = MEMORY_OK;
LOGGER.warn("Memory threshold percentage check is disabled!");
}
private Long parseLong(String name, String value) {
try {
return Long.valueOf(value);
}
catch (NumberFormatException e) {
LOGGER.debug("Cannot parse Redis 'info memory' field '{}' with value '{}'.", name, value);
}
return null;
}
}

View File

@ -8,19 +8,14 @@
import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_COMPACT;
import static io.debezium.server.redis.RedisStreamChangeConsumerConfig.MESSAGE_FORMAT_EXTENDED;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -44,7 +39,6 @@
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import io.debezium.util.IoUtil;
/**
* Implementation of the consumer that delivers the messages into Redis (stream) destination.
@ -64,15 +58,11 @@ public class RedisStreamChangeConsumer extends BaseChangeConsumer
private static final String EXTENDED_MESSAGE_KEY_KEY = "key";
private static final String EXTENDED_MESSAGE_VALUE_KEY = "value";
private static final String INFO_MEMORY = "memory";
private static final String INFO_MEMORY_SECTION_MAXMEMORY = "maxmemory";
private static final String INFO_MEMORY_SECTION_USEDMEMORY = "used_memory";
private RedisClient client;
private BiFunction<String, String, Map<String, String>> recordMapFunction;
private Supplier<Boolean> isMemoryOk;
private RedisMemoryThreshold isMemoryOk;
private RedisStreamChangeConsumerConfig config;
@ -93,13 +83,12 @@ else if (MESSAGE_FORMAT_COMPACT.equals(messageFormat)) {
recordMapFunction = Collections::singletonMap;
}
int memoryThreshold = config.getMemoryThreshold();
isMemoryOk = memoryThreshold > 0 ? () -> isMemoryOk(memoryThreshold) : () -> true;
RedisConnection redisConnection = new RedisConnection(config.getAddress(), config.getUser(), config.getPassword(), config.getConnectionTimeout(),
config.getSocketTimeout(), config.isSslEnabled());
client = redisConnection.getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, config.isWaitEnabled(), config.getWaitTimeout(), config.isWaitRetryEnabled(),
config.getWaitRetryDelay());
isMemoryOk = new RedisMemoryThreshold(client, config);
}
@PreDestroy
@ -159,7 +148,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records,
LOGGER.error("Can't connect to Redis", e);
}
}
else if (canHandleBatch()) {
else if (isMemoryOk.check()) {
try {
LOGGER.trace("Preparing a Redis Pipeline of {} records", clonedBatch.size());
@ -225,34 +214,4 @@ else if (canHandleBatch()) {
committer.markBatchFinished();
}
private boolean canHandleBatch() {
return isMemoryOk.get();
}
private boolean isMemoryOk(int memoryThreshold) {
String memory = client.info(INFO_MEMORY);
Map<String, String> infoMemory = new HashMap<>();
try {
IoUtil.readLines(new ByteArrayInputStream(memory.getBytes(StandardCharsets.UTF_8)), line -> {
String[] pair = line.split(":");
if (pair.length == 2) {
infoMemory.put(pair[0], pair[1]);
}
});
}
catch (IOException e) {
LOGGER.error("Cannot parse Redis info memory {}", memory, e);
return true;
}
long maxMemory = Long.parseLong(infoMemory.get(INFO_MEMORY_SECTION_MAXMEMORY));
if (maxMemory > 0) {
long usedMemory = Long.parseLong(infoMemory.get(INFO_MEMORY_SECTION_USEDMEMORY));
long percentage = 100 * usedMemory / maxMemory;
if (percentage >= memoryThreshold) {
LOGGER.warn("Used memory percentage of {}% is higher than configured threshold of {}%", percentage, memoryThreshold);
return false;
}
}
return true;
}
}

View File

@ -41,11 +41,17 @@ public class RedisStreamChangeConsumerConfig extends RedisCommonConfig {
.withDefault(DEFAULT_MEMORY_THRESHOLD_PERCENTAGE)
.withValidation(RangeValidator.between(0, 100));
private static final int DEFAULT_MEMORY_LIMIT_MB = 0;
private static final Field PROP_MEMORY_LIMIT_MB = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "memory.limit.mb")
.withDefault(DEFAULT_MEMORY_LIMIT_MB)
.withValidation(RangeValidator.atLeast(0));
private int batchSize;
private String nullKey;
private String nullValue;
private String messageFormat;
private int memoryThreshold;
private int memoryLimitMb;
public RedisStreamChangeConsumerConfig(Configuration config) {
super(config, PROP_PREFIX);
@ -59,6 +65,7 @@ protected void init(Configuration config) {
nullValue = config.getString(PROP_NULL_VALUE);
messageFormat = config.getString(PROP_MESSAGE_FORMAT);
memoryThreshold = config.getInteger(PROP_MEMORY_THRESHOLD_PERCENTAGE);
memoryLimitMb = config.getInteger(PROP_MEMORY_LIMIT_MB);
}
@Override
@ -88,4 +95,8 @@ public int getMemoryThreshold() {
return memoryThreshold;
}
public int getMemoryLimitMb() {
return memoryLimitMb;
}
}

View File

@ -0,0 +1,182 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.redis;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import io.debezium.config.Configuration;
import io.debezium.storage.redis.RedisClient;
import io.debezium.util.Collect;
public class RedisMemoryThresholdTest {
private static final String _1MB = String.valueOf(1 * 1024 * 1024);
private static final String _2MB = String.valueOf(2 * 1024 * 1024);
private static final String _3MB = String.valueOf(3 * 1024 * 1024);
private static final String _4MB = String.valueOf(4 * 1024 * 1024);
@Test
public void testThresholdPercentageDisabled() {
int[] thresholdList = { 0 };
int[] limitMbList = { 0, 1, 2, 3, 4 };
String[] usedMemoryList = { "asd3f", "2048L", null, _1MB, _2MB, _3MB, _4MB };
String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB };
for (int threshold : thresholdList) {
for (int limit : limitMbList) {
for (String used : usedMemoryList) {
for (String max : maxMemoryList) {
isMemoryOk(threshold, limit, used, max, true);
}
}
}
}
}
@Test
public void testUsedMemoryBad() {
int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 };
int[] limitMbList = { 0, 1, 2, 3, 4 };
String[] usedMemoryList = { "asd3f", "2048L" };
String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB };
for (int threshold : thresholdList) {
for (int limit : limitMbList) {
for (String used : usedMemoryList) {
for (String max : maxMemoryList) {
isMemoryOk(threshold, limit, used, max, true);
}
}
}
}
}
@Test
public void testUsedMemoryNotReported() {
int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 };
int[] limitMbList = { 0, 1, 2, 3, 4 };
String[] usedMemoryList = { null };
String[] maxMemoryList = { "asd3f", "2048L", null, "0", _1MB, _2MB, _3MB, _4MB };
for (int threshold : thresholdList) {
for (int limit : limitMbList) {
for (String used : usedMemoryList) {
for (String max : maxMemoryList) {
isMemoryOk(threshold, limit, used, max, true);
}
}
}
}
}
@Test
public void testMemoryLimit() {
int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 };
int[] limitMbList = { 0, 1, 2, 3, 4 };
String[] usedMemoryList = { _1MB, _2MB, _3MB, _4MB };
String[] maxMemoryList = { "asd3f", "2048L", null, "0" };
for (int threshold : thresholdList) {
for (int limit : limitMbList) {
for (String used : usedMemoryList) {
for (String max : maxMemoryList) {
isMemoryOk(threshold, limit, used, max, 0 == limit ? true : Long.parseLong(used) * 100 / (limit * 1024 * 1024) < threshold);
}
}
}
}
}
@Test
public void testMaxMemory() {
int[] thresholdList = { 1, 24, 25, 26, 49, 50, 52, 74, 75, 76, 99, 100 };
int[] limitMbList = { 0, 1, 2, 3, 4 };
String[] usedMemoryList = { _1MB, _2MB, _3MB, _4MB };
String[] maxMemoryList = { _1MB, _2MB, _3MB, _4MB };
for (int threshold : thresholdList) {
for (int limit : limitMbList) {
for (String used : usedMemoryList) {
for (String max : maxMemoryList) {
isMemoryOk(threshold, limit, used, max, Long.parseLong(used) * 100 / Long.parseLong(max) < threshold);
}
}
}
}
}
private void isMemoryOk(int threshold, int memoryLimitMb, String usedMemoryBytes, String maxMemoryBytes, boolean expectedResult) {
Configuration config = Configuration.from(Collect.hashMapOf("debezium.sink.redis.address", "localhost", "debezium.sink.redis.memory.threshold.percentage",
threshold, "debezium.sink.redis.memory.limit.mb", memoryLimitMb));
RedisMemoryThreshold isMemoryOk = new RedisMemoryThreshold(new RedisClientImpl(usedMemoryBytes, maxMemoryBytes), new RedisStreamChangeConsumerConfig(config));
Assert.assertEquals(String.format("isMemoryOk failed for threshold %s, limit %s, used %s, max %s)", threshold, memoryLimitMb, usedMemoryBytes, maxMemoryBytes),
expectedResult, isMemoryOk.check());
}
private static class RedisClientImpl implements RedisClient {
private String infoMemory;
private RedisClientImpl(String usedMemoryBytes, String maxMemoryBytes) {
this.infoMemory = (usedMemoryBytes == null ? "" : "used_memory:" + usedMemoryBytes + "\n") + (maxMemoryBytes == null ? "" : "maxmemory:" + maxMemoryBytes);
}
@Override
public String info(String section) {
return infoMemory;
}
@Override
public void disconnect() {
}
@Override
public void close() {
}
@Override
public String xadd(String key, Map<String, String> hash) {
return null;
}
@Override
public List<String> xadd(List<SimpleEntry<String, Map<String, String>>> hashes) {
return null;
}
@Override
public List<Map<String, String>> xrange(String key) {
return null;
}
@Override
public long xlen(String key) {
return 0;
}
@Override
public Map<String, String> hgetAll(String key) {
return null;
}
@Override
public long hset(byte[] key, byte[] field, byte[] value) {
return 0;
}
@Override
public long waitReplicas(int replicas, long timeout) {
return 0;
}
}
}

View File

@ -912,8 +912,14 @@ Read more about the message format xref:#p-redis-message-format[below].
|[[redis-memory-threshold-percentage]]<<redis-memory-threshold-percentage, `debezium.sink.redis.memory.threshold.percentage`>>
|`85`
|The sink will stop consuming records if the used memory percentage (out of Redis configured maxmemory) is higher than this threshold.
If Redis configured maxmemory is `0` (unlimited) then this threshold is disabled (same as if configuring it with `0`)
|The sink will stop consuming records if the `used_memory` percentage (out of Redis configured `maxmemory`) is higher or equal to this threshold.
If the configured value is `0` then this threshold is disabled.
|[[redis-memory-limit-mb]]<<redis-memory-limit-mb, `debezium.sink.redis.memory.limit.mb`>>
|`0`
|If Redis `maxmemory` is not available or `0`, the `xref:#redis-memory-threshold-percentage[debezium.sink.redis.memory.threshold.percentage]`
will apply to this value (if this value is positive).
By default it is `0` (disabled).
|===