DBZ-28 Prevent the MySQL connector from sending a record with a null key and null value

There is no point in sending a record that contains a null key and null value. While this may not be likely for insert or update cases (since at least the value should not be null), it is possible when a row is deleted (meaning the record value will be null) but the table has no primary/unique key (meaning the record key will be null).
This commit is contained in:
Randall Hauch 2016-03-04 09:51:32 -06:00
parent 64d0e0b458
commit 2d99cb264c

View File

@ -237,9 +237,11 @@ public void handleInsert(Event event, SourceInfo source, Consumer<SourceRecord>
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping insert row event: {}", event);
@ -272,9 +274,11 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
Object key = converter.createKey(after, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update row event: {}", event);
@ -297,9 +301,11 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.deleted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping delete row event: {}", event);