diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java index b7b9b458e..4a3953e06 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java @@ -59,7 +59,7 @@ final class TableConverters { private final Map tableNumbersByTableName = new HashMap<>(); private final boolean recordSchemaChangesInSourceRecords; private final Predicate tableFilter; - private final Set ignoredQueryStatements = Collect.unmodifiableSet("BEGIN","END","FLUSH PRIVILEGES"); + private final Set ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); private final Set unknownTableIds = new HashSet<>(); public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory, @@ -76,10 +76,10 @@ public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory, Predicate knownTables = (id) -> !unknownTableIds.contains(id); // known if not unknown this.tableFilter = tableFilter != null ? tableFilter.and(knownTables) : knownTables; } - + public void loadTables() { // Create TableSchema instances for any existing table ... - this.tables.tableIds().forEach(id->{ + this.tables.tableIds().forEach(id -> { Table table = this.tables.forTable(id); TableSchema schema = schemaBuilder.create(table, false); tableSchemaByTableId.put(id, schema); @@ -90,7 +90,7 @@ public void updateTableCommand(Event event, SourceInfo source, Consumer Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.inserted(values, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping insert row event: {}", event); @@ -263,7 +265,7 @@ public void handleUpdate(Event event, SourceInfo source, Consumer logger.debug("Received update row event: {}", event); String topic = converter.topic(); Integer partition = converter.partition(); - List> rows = update.getRows(); + List> rows = update.getRows(); for (int row = 0; row != rows.size(); ++row) { Map.Entry changes = rows.get(row); Serializable[] before = changes.getKey(); @@ -272,9 +274,11 @@ public void handleUpdate(Event event, SourceInfo source, Consumer Object key = converter.createKey(after, includedColumns); Schema valueSchema = converter.valueSchema(); Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping update row event: {}", event); @@ -296,10 +300,12 @@ public void handleDelete(Event event, SourceInfo source, Consumer Schema keySchema = converter.keySchema(); Object key = converter.createKey(values, includedColumns); Schema valueSchema = converter.valueSchema(); - Struct value = converter.inserted(values, includedColumns); - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + Struct value = converter.deleted(values, includedColumns); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } } else if (logger.isDebugEnabled()) { logger.debug("Skipping delete row event: {}", event);