From 5358167446937c0aa96701bb2371c9748738e30e Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 19 Mar 2018 09:25:19 +0100 Subject: [PATCH] DBZ-655 Adding test --- .../transforms/ByLogicalTableRouter.java | 6 +- .../transforms/ByLogicalTableRouterTest.java | 84 ++++++++++++++++--- 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java index 2ebe294d1..5b7bbecf2 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java @@ -240,7 +240,8 @@ private String determineNewTopic(String oldTopic) { String newTopic = topicRegexReplaceCache.get(oldTopic); if (newTopic != null) { return newTopic; - } else { + } + else { final Matcher matcher = topicRegex.matcher(oldTopic); if (matcher.matches()) { newTopic = matcher.replaceFirst(topicReplacement); @@ -284,7 +285,8 @@ private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) { if (matcher.matches()) { physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement); keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier); - } else { + } + else { physicalTableIdentifier = oldTopic; } } diff --git a/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java index 6b8c39832..5586ef0dc 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/ByLogicalTableRouterTest.java @@ -5,14 +5,17 @@ */ package io.debezium.transforms; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; +import static org.fest.assertions.Assertions.assertThat; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertTrue; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; /** * @author Mario Mueller @@ -44,15 +47,74 @@ public void testBrokenKeyReplacementConfigurationEmptyValue() { @Test public void testKeyReplacementWorkingConfiguration() { - final ByLogicalTableRouter subject = new ByLogicalTableRouter<>(); + final ByLogicalTableRouter router = new ByLogicalTableRouter<>(); final Map props = new HashMap<>(); - props.put("topic.regex", "someValidRegex(.*)"); - props.put("topic.replacement", "$1"); - props.put("key.field.regex", "anotherValidRegex(.*)"); - props.put("key.field.replacement", "$1"); - subject.configure(props); - assertTrue(true); + props.put("topic.regex", "(.*)customers_shard(.*)"); + props.put("topic.replacement", "$1customers_all_shards"); + props.put("key.field.name", "shard_id"); + props.put("key.field.regex", "(.*)customers_shard_(.*)"); + props.put("key.field.replacement", "$2"); + router.configure(props); + + Schema keySchema = SchemaBuilder.struct() + .name("mysql-server-1.inventory.customers_shard_1.Key") + .field("id", SchemaBuilder.int64().build()) + .build(); + + Struct key1 = new Struct(keySchema).put("id", 123L); + + SourceRecord record1 = new SourceRecord( + new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_1", keySchema, key1, null, null + ); + + SourceRecord transformed1 = router.apply(record1); + assertThat(transformed1).isNotNull(); + assertThat(transformed1.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards"); + + assertThat(transformed1.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key"); + assertThat(transformed1.keySchema().fields()).hasSize(2); + assertThat(transformed1.keySchema().fields().get(0).name()).isEqualTo("id"); + assertThat(transformed1.keySchema().fields().get(1).name()).isEqualTo("shard_id"); + + assertThat(((Struct) transformed1.key()).get("id")).isEqualTo(123L); + assertThat(((Struct) transformed1.key()).get("shard_id")).isEqualTo("1"); + + Struct key2 = new Struct(keySchema).put("id", 123L); + + SourceRecord record2 = new SourceRecord( + new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_2", keySchema, key2, null, null + ); + + SourceRecord transformed2 = router.apply(record2); + assertThat(transformed2).isNotNull(); + assertThat(transformed2.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards"); + + assertThat(transformed2.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key"); + assertThat(transformed2.keySchema().fields()).hasSize(2); + assertThat(transformed2.keySchema().fields().get(0).name()).isEqualTo("id"); + assertThat(transformed2.keySchema().fields().get(1).name()).isEqualTo("shard_id"); + + assertThat(((Struct) transformed2.key()).get("id")).isEqualTo(123L); + assertThat(((Struct) transformed2.key()).get("shard_id")).isEqualTo("2"); + + Struct key3 = new Struct(keySchema).put("id", 456L); + + SourceRecord record3 = new SourceRecord( + new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_2", keySchema, key3, null, null + ); + + SourceRecord transformed3 = router.apply(record3); + assertThat(transformed3).isNotNull(); + assertThat(transformed3.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards"); + + assertThat(transformed3.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key"); + assertThat(transformed3.keySchema().fields()).hasSize(2); + assertThat(transformed3.keySchema().fields().get(0).name()).isEqualTo("id"); + assertThat(transformed3.keySchema().fields().get(1).name()).isEqualTo("shard_id"); + + assertThat(((Struct) transformed3.key()).get("id")).isEqualTo(456L); + assertThat(((Struct) transformed3.key()).get("shard_id")).isEqualTo("2"); } @Test(expected = ConnectException.class)