DBZ-1086 Added functionality to gracefully handle key-less messages in ByLogicalTableRouter transformation.

This commit is contained in:
Ivan Kovbas 2019-01-17 19:36:47 +02:00 committed by Gunnar Morling
parent 54b47e5205
commit 81f031cb7c
2 changed files with 33 additions and 3 deletions

View File

@ -181,9 +181,15 @@ public R apply(R record) {
logger.debug("Applying topic name transformation from {} to {}", oldTopic, newTopic); logger.debug("Applying topic name transformation from {} to {}", oldTopic, newTopic);
final Struct oldKey = requireStruct(record.key(), "Updating schema"); Schema newKeySchema = null;
final Schema newKeySchema = updateKeySchema(oldKey.schema(), newTopic); Struct newKey = null;
final Struct newKey = updateKey(newKeySchema, oldKey, oldTopic);
// Key could be null in the case of a table without a primary key
if (record.key() != null) {
final Struct oldKey = requireStruct(record.key(), "Updating schema");
newKeySchema = updateKeySchema(oldKey.schema(), newTopic);
newKey = updateKey(newKeySchema, oldKey, oldTopic);
}
if (record.value() == null) { if (record.value() == null) {
// Value will be null in the case of a delete event tombstone // Value will be null in the case of a delete event tombstone

View File

@ -196,6 +196,30 @@ public void testBrokenTopicReplacementConfigurationEmptyValue() {
subject.configure(props); subject.configure(props);
} }
@Test
public void testKeyNullValue() {
final ByLogicalTableRouter<SourceRecord> router = new ByLogicalTableRouter<>();
final Map<String, String> props = new HashMap<>();
props.put("topic.regex", "(.*)customers_shard(.*)");
props.put("topic.replacement", "$1customers_all_shards");
props.put("key.field.name", "shard_id");
props.put("key.field.regex", "(.*)customers_shard_(.*)");
props.put("key.field.replacement", "$2");
router.configure(props);
SourceRecord record1 = new SourceRecord(
new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_1", null, null, null, null
);
SourceRecord transformed1 = router.apply(record1);
assertThat(transformed1).isNotNull();
assertThat(transformed1.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards");
assertThat(transformed1.keySchema()).isNull();
assertThat(transformed1.key()).isNull();
}
// FIXME: This SMT can use more tests for more detailed coverage. // FIXME: This SMT can use more tests for more detailed coverage.
// The creation of a DBZ-ish SourceRecord is required for each test // The creation of a DBZ-ish SourceRecord is required for each test
} }