DBZ-655 Adding test

This commit is contained in:
Gunnar Morling 2018-03-19 09:25:19 +01:00
parent d5abb8415c
commit 5358167446
2 changed files with 77 additions and 13 deletions

View File

@ -240,7 +240,8 @@ private String determineNewTopic(String oldTopic) {
String newTopic = topicRegexReplaceCache.get(oldTopic);
if (newTopic != null) {
return newTopic;
} else {
}
else {
final Matcher matcher = topicRegex.matcher(oldTopic);
if (matcher.matches()) {
newTopic = matcher.replaceFirst(topicReplacement);
@ -284,7 +285,8 @@ private Struct updateKey(Schema newKeySchema, Struct oldKey, String oldTopic) {
if (matcher.matches()) {
physicalTableIdentifier = matcher.replaceFirst(keyFieldReplacement);
keyRegexReplaceCache.put(oldTopic, physicalTableIdentifier);
} else {
}
else {
physicalTableIdentifier = oldTopic;
}
}

View File

@ -5,14 +5,17 @@
*/
package io.debezium.transforms;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertTrue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
/**
* @author Mario Mueller
@ -44,15 +47,74 @@ public void testBrokenKeyReplacementConfigurationEmptyValue() {
@Test
public void testKeyReplacementWorkingConfiguration() {
final ByLogicalTableRouter<SourceRecord> subject = new ByLogicalTableRouter<>();
final ByLogicalTableRouter<SourceRecord> router = new ByLogicalTableRouter<>();
final Map<String, String> props = new HashMap<>();
props.put("topic.regex", "someValidRegex(.*)");
props.put("topic.replacement", "$1");
props.put("key.field.regex", "anotherValidRegex(.*)");
props.put("key.field.replacement", "$1");
subject.configure(props);
assertTrue(true);
props.put("topic.regex", "(.*)customers_shard(.*)");
props.put("topic.replacement", "$1customers_all_shards");
props.put("key.field.name", "shard_id");
props.put("key.field.regex", "(.*)customers_shard_(.*)");
props.put("key.field.replacement", "$2");
router.configure(props);
Schema keySchema = SchemaBuilder.struct()
.name("mysql-server-1.inventory.customers_shard_1.Key")
.field("id", SchemaBuilder.int64().build())
.build();
Struct key1 = new Struct(keySchema).put("id", 123L);
SourceRecord record1 = new SourceRecord(
new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_1", keySchema, key1, null, null
);
SourceRecord transformed1 = router.apply(record1);
assertThat(transformed1).isNotNull();
assertThat(transformed1.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards");
assertThat(transformed1.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key");
assertThat(transformed1.keySchema().fields()).hasSize(2);
assertThat(transformed1.keySchema().fields().get(0).name()).isEqualTo("id");
assertThat(transformed1.keySchema().fields().get(1).name()).isEqualTo("shard_id");
assertThat(((Struct) transformed1.key()).get("id")).isEqualTo(123L);
assertThat(((Struct) transformed1.key()).get("shard_id")).isEqualTo("1");
Struct key2 = new Struct(keySchema).put("id", 123L);
SourceRecord record2 = new SourceRecord(
new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_2", keySchema, key2, null, null
);
SourceRecord transformed2 = router.apply(record2);
assertThat(transformed2).isNotNull();
assertThat(transformed2.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards");
assertThat(transformed2.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key");
assertThat(transformed2.keySchema().fields()).hasSize(2);
assertThat(transformed2.keySchema().fields().get(0).name()).isEqualTo("id");
assertThat(transformed2.keySchema().fields().get(1).name()).isEqualTo("shard_id");
assertThat(((Struct) transformed2.key()).get("id")).isEqualTo(123L);
assertThat(((Struct) transformed2.key()).get("shard_id")).isEqualTo("2");
Struct key3 = new Struct(keySchema).put("id", 456L);
SourceRecord record3 = new SourceRecord(
new HashMap<>(), new HashMap<>(), "mysql-server-1.inventory.customers_shard_2", keySchema, key3, null, null
);
SourceRecord transformed3 = router.apply(record3);
assertThat(transformed3).isNotNull();
assertThat(transformed3.topic()).isEqualTo("mysql-server-1.inventory.customers_all_shards");
assertThat(transformed3.keySchema().name()).isEqualTo("mysql_server_1.inventory.customers_all_shards.Key");
assertThat(transformed3.keySchema().fields()).hasSize(2);
assertThat(transformed3.keySchema().fields().get(0).name()).isEqualTo("id");
assertThat(transformed3.keySchema().fields().get(1).name()).isEqualTo("shard_id");
assertThat(((Struct) transformed3.key()).get("id")).isEqualTo(456L);
assertThat(((Struct) transformed3.key()).get("shard_id")).isEqualTo("2");
}
@Test(expected = ConnectException.class)