diff --git a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java index 5b7bbecf2..73d4f8446 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java @@ -181,9 +181,15 @@ public R apply(R record) { logger.debug("Applying topic name transformation from {} to {}", oldTopic, newTopic); - final Struct oldKey = requireStruct(record.key(), "Updating schema"); - final Schema newKeySchema = updateKeySchema(oldKey.schema(), newTopic); - final Struct newKey = updateKey(newKeySchema, oldKey, oldTopic); + Schema newKeySchema = null; + Struct newKey = null; + + // Key could be null in the case of a table without a primary key + if (record.key() != null) { + final Struct oldKey = requireStruct(record.key(), "Updating schema"); + newKeySchema = updateKeySchema(oldKey.schema(), newTopic); + newKey = updateKey(newKeySchema, oldKey, oldTopic); + } if (record.value() == null) { // Value will be null in the case of a delete event tombstone diff --git a/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java index ada07a82b..e17e31817 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java @@ -196,6 +196,30 @@ public void testBrokenTopicReplacementConfigurationEmptyValue() { subject.configure(props); } + @Test + public void testKeyNullValue() { + final ByLogicalTableRouter router = new ByLogicalTableRouter<>(); + final Map props = new HashMap<>(); + + props.put("topic.regex", "(.*)customers_shard(.*)"); + props.put("topic.replacement", "$1customers_all_shards"); + props.put("key.field.name", "shard_id"); + props.put("key.field.regex", "(.*)customers_shard_(.*)"); + props.put("key.field.replacement", "$2"); + router.configure(props); + + SourceRecord record1 = new SourceRecord( + new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_1", null, null, null, null + ); + + SourceRecord transformed1 = router.apply(record1); + assertThat(transformed1).isNotNull(); + assertThat(transformed1.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards"); + + assertThat(transformed1.keySchema()).isNull(); + assertThat(transformed1.key()).isNull(); + } + // FIXME: This SMT can use more tests for more detailed coverage. // The creation of a DBZ-ish SourceRecord is required for each test }