From 7b707a0b9b558ff022cf5061f1a4aff00b4b95ec Mon Sep 17 00:00:00 2001 From: Chris Baumbauer Date: Thu, 5 Aug 2021 11:41:17 -0700 Subject: [PATCH] DBZ-3827 Debezium Server Kinesis Sink Cannot Handle Null Events --- .../io/debezium/server/kinesis/KinesisChangeConsumer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java b/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java index e8d6fdf3f..508f30b98 100644 --- a/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java +++ b/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java @@ -100,10 +100,15 @@ public void handleBatch(List> records, RecordCommitt throws InterruptedException { for (ChangeEvent record : records) { LOGGER.trace("Received event '{}'", record); + Object rv = record.value(); + if (rv == null) { + rv = ""; + } + final PutRecordRequest putRecord = PutRecordRequest.builder() .partitionKey((record.key() != null) ? getString(record.key()) : nullKey) .streamName(streamNameMapper.map(record.destination())) - .data(SdkBytes.fromByteArray(getBytes(record.value()))) + .data(SdkBytes.fromByteArray(getBytes(rv))) .build(); client.putRecord(putRecord); committer.markProcessed(record);