diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java index 716a7ce71..d50b4217f 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java @@ -237,9 +237,11 @@ public void handleInsert(Event event, SourceInfo source, Consumer Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.inserted(values, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping insert row event: {}", event); @@ -272,9 +274,11 @@ public void handleUpdate(Event event, SourceInfo source, Consumer Object key = converter.createKey(after, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping update row event: {}", event); @@ -297,9 +301,11 @@ public void handleDelete(Event event, SourceInfo source, Consumer Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.deleted(values, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping delete row event: {}", event);