DBZ-3827 Debezium Server Kinesis Sink Cannot Handle Null Events
This commit is contained in:
parent
07d937ee0a
commit
7b707a0b9b
@ -100,10 +100,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
|
|||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
for (ChangeEvent<Object, Object> record : records) {
|
for (ChangeEvent<Object, Object> record : records) {
|
||||||
LOGGER.trace("Received event '{}'", record);
|
LOGGER.trace("Received event '{}'", record);
|
||||||
|
Object rv = record.value();
|
||||||
|
if (rv == null) {
|
||||||
|
rv = "";
|
||||||
|
}
|
||||||
|
|
||||||
final PutRecordRequest putRecord = PutRecordRequest.builder()
|
final PutRecordRequest putRecord = PutRecordRequest.builder()
|
||||||
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
|
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
|
||||||
.streamName(streamNameMapper.map(record.destination()))
|
.streamName(streamNameMapper.map(record.destination()))
|
||||||
.data(SdkBytes.fromByteArray(getBytes(record.value())))
|
.data(SdkBytes.fromByteArray(getBytes(rv)))
|
||||||
.build();
|
.build();
|
||||||
client.putRecord(putRecord);
|
client.putRecord(putRecord);
|
||||||
committer.markProcessed(record);
|
committer.markProcessed(record);
|
||||||
|
Loading…
Reference in New Issue
Block a user