DBZ-3 Fixes topic naming to include the name of the server

This commit is contained in:
Horia Chiorean 2016-12-27 14:44:04 +02:00
parent d681c11eee
commit ae85656851
10 changed files with 54 additions and 32 deletions

View File

@ -290,8 +290,8 @@ public static TopicSelectionStrategy parse(String value) {
.withWidth(Width.MEDIUM) .withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM) .withImportance(Importance.MEDIUM)
.withDefault(ReplicationConnection.Builder.DEFAULT_SLOT_NAME) .withDefault(ReplicationConnection.Builder.DEFAULT_SLOT_NAME)
.withDescription("The name of the Postgres logical decoding slot created for monitoring a particular database and plugin. " + .withDescription("The name of the Postgres logical decoding slot created for streaming changes from a plugin." +
"Defaults to 'debezium_connector'"); "Defaults to 'debezium");
public static final Field DROP_SLOT_ON_STOP = Field.create("slot.drop_on_stop") public static final Field DROP_SLOT_ON_STOP = Field.create("slot.drop_on_stop")
.withDisplayName("Drop slot on stop") .withDisplayName("Drop slot on stop")
@ -343,7 +343,7 @@ public static TopicSelectionStrategy parse(String value) {
.withValidation(Field::isRequired) .withValidation(Field::isRequired)
.withDescription("The name of the database the connector should be monitoring"); .withDescription("The name of the database the connector should be monitoring");
public static final Field SERVER_NAME = Field.create("database.server.name") public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace") .withDisplayName("Namespace")
.withType(Type.STRING) .withType(Type.STRING)
.withWidth(Width.MEDIUM) .withWidth(Width.MEDIUM)
@ -376,7 +376,7 @@ public static TopicSelectionStrategy parse(String value) {
.withType(Type.INT) .withType(Type.INT)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM) .withImportance(Importance.MEDIUM)
.withDescription("Maximum size of each batch of source records. Defaults to 1024.") .withDescription("Maximum size of each batch of source records. Defaults to 10024.")
.withDefault(DEFAULT_MAX_BATCH_SIZE) .withDefault(DEFAULT_MAX_BATCH_SIZE)
.withValidation(Field::isPositiveInteger); .withValidation(Field::isPositiveInteger);

View File

@ -40,9 +40,9 @@ private TopicSelector initTopicSelector() {
PostgresConnectorConfig.TopicSelectionStrategy topicSelectionStrategy = config.topicSelectionStrategy(); PostgresConnectorConfig.TopicSelectionStrategy topicSelectionStrategy = config.topicSelectionStrategy();
switch (topicSelectionStrategy) { switch (topicSelectionStrategy) {
case TOPIC_PER_SCHEMA: case TOPIC_PER_SCHEMA:
return TopicSelector.TOPIC_PER_SCHEMA; return TopicSelector.topicPerSchema(config.serverName());
case TOPIC_PER_TABLE: case TOPIC_PER_TABLE:
return TopicSelector.TOPIC_PER_TABLE; return TopicSelector.topicPerTable(config.serverName());
default: default:
throw new IllegalArgumentException("Unknown topic selection strategy: " + topicSelectionStrategy); throw new IllegalArgumentException("Unknown topic selection strategy: " + topicSelectionStrategy);
} }

View File

@ -15,14 +15,26 @@
* @author Horia Chiorean (hchiorea@redhat.com) * @author Horia Chiorean (hchiorea@redhat.com)
*/ */
public interface TopicSelector { public interface TopicSelector {
/** /**
* Selector which generates a separate topic name for each table, based on the FQN name of the table * Generates a topic name for each table, based on the table schema, table name and a prefix
*
* @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null}
*/ */
TopicSelector TOPIC_PER_TABLE = tableId -> String.join(".", tableId.schema(), tableId.table()); static TopicSelector topicPerTable(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema(), tableId.table());
}
/** /**
* Selector which generates a separate topic for an entire DB schema * Generates a topic name for each table, based on the table schema and a prefix
*
* @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null}
*/ */
TopicSelector TOPIC_PER_SCHEMA = TableId::schema; static TopicSelector topicPerSchema(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema());
}
/** /**
* Returns the name of the Kafka topic for a given table identifier * Returns the name of the Kafka topic for a given table identifier

View File

@ -11,6 +11,7 @@
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY; import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.NEVER; import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.NEVER;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -323,9 +324,9 @@ public void shouldTakeBlacklistFiltersIntoAccount() throws Exception {
//check the records from the snapshot take the filters into account //check the records from the snapshot take the filters into account
SourceRecords actualRecords = consumeRecordsByTopic(4); //3 records in s1.a and 1 in s1.b SourceRecords actualRecords = consumeRecordsByTopic(4); //3 records in s1.a and 1 in s1.b
assertThat(actualRecords.recordsForTopic("s2.a")).isNullOrEmpty(); assertThat(actualRecords.recordsForTopic(topicName("s2.a"))).isNullOrEmpty();
assertThat(actualRecords.recordsForTopic("s1.b")).isNullOrEmpty(); assertThat(actualRecords.recordsForTopic(topicName("s1.b"))).isNullOrEmpty();
List<SourceRecord> recordsForS1a = actualRecords.recordsForTopic("s1.a"); List<SourceRecord> recordsForS1a = actualRecords.recordsForTopic(topicName("s1.a"));
assertThat(recordsForS1a.size()).isEqualTo(3); assertThat(recordsForS1a.size()).isEqualTo(3);
AtomicInteger pkValue = new AtomicInteger(1); AtomicInteger pkValue = new AtomicInteger(1);
recordsForS1a.forEach(record -> { recordsForS1a.forEach(record -> {
@ -444,12 +445,12 @@ private void assertRecordsFromSnapshot(int expectedCount, int...pks) throws Inte
// we have 2 schemas/topics that we expect // we have 2 schemas/topics that we expect
int expectedCountPerSchema = expectedCount / 2; int expectedCountPerSchema = expectedCount / 2;
List<SourceRecord> recordsForTopicS1 = actualRecords.recordsForTopic("s1.a"); List<SourceRecord> recordsForTopicS1 = actualRecords.recordsForTopic(topicName("s1.a"));
assertThat(recordsForTopicS1.size()).isEqualTo(expectedCountPerSchema); assertThat(recordsForTopicS1.size()).isEqualTo(expectedCountPerSchema);
IntStream.range(0, expectedCountPerSchema) IntStream.range(0, expectedCountPerSchema)
.forEach(i -> VerifyRecord.isValidRead(recordsForTopicS1.remove(0), PK_FIELD, pks[i])); .forEach(i -> VerifyRecord.isValidRead(recordsForTopicS1.remove(0), PK_FIELD, pks[i]));
List<SourceRecord> recordsForTopicS2 = actualRecords.recordsForTopic("s2.a"); List<SourceRecord> recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(recordsForTopicS2.size()).isEqualTo(expectedCountPerSchema); assertThat(recordsForTopicS2.size()).isEqualTo(expectedCountPerSchema);
IntStream.range(0, expectedCountPerSchema) IntStream.range(0, expectedCountPerSchema)
.forEach(i -> VerifyRecord.isValidRead(recordsForTopicS2.remove(0), PK_FIELD, pks[i + expectedCountPerSchema])); .forEach(i -> VerifyRecord.isValidRead(recordsForTopicS2.remove(0), PK_FIELD, pks[i + expectedCountPerSchema]));
@ -462,11 +463,11 @@ private void assertRecordsAfterInsert(int expectedCount, int...pks) throws Inter
// we have 2 schemas // we have 2 schemas
int expectedCountPerSchema = expectedCount / 2; int expectedCountPerSchema = expectedCount / 2;
List<SourceRecord> recordsForTopicS1 = actualRecords.recordsForTopic("s1.a"); List<SourceRecord> recordsForTopicS1 = actualRecords.recordsForTopic(topicName("s1.a"));
assertThat(recordsForTopicS1.size()).isEqualTo(expectedCountPerSchema); assertThat(recordsForTopicS1.size()).isEqualTo(expectedCountPerSchema);
IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS1.remove(0), PK_FIELD, pks[i])); IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS1.remove(0), PK_FIELD, pks[i]));
List<SourceRecord> recordsForTopicS2 = actualRecords.recordsForTopic("s2.a"); List<SourceRecord> recordsForTopicS2 = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(recordsForTopicS2.size()).isEqualTo(expectedCountPerSchema); assertThat(recordsForTopicS2.size()).isEqualTo(expectedCountPerSchema);
IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS2.remove(0), PK_FIELD, pks[i])); IntStream.range(0, expectedCountPerSchema).forEach(i -> VerifyRecord.isValidInsert(recordsForTopicS2.remove(0), PK_FIELD, pks[i]));
} }

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -102,12 +103,12 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
SourceRecord first = consumer.remove(); SourceRecord first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 2); VerifyRecord.isValidInsert(first, PK_FIELD, 2);
assertEquals("s1.a", first.topic()); assertEquals(topicName("s1.a"), first.topic());
assertRecordOffset(first, false, false); assertRecordOffset(first, false, false);
SourceRecord second = consumer.remove(); SourceRecord second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 2); VerifyRecord.isValidInsert(second, PK_FIELD, 2);
assertEquals("s2.a", second.topic()); assertEquals(topicName("s2.a"), second.topic());
assertRecordOffset(second, false, false); assertRecordOffset(second, false, false);
// now shut down the producers and insert some more records // now shut down the producers and insert some more records
@ -147,8 +148,9 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
private void assertReadRecord(SourceRecord record, Map<String, List<SchemaAndValueField>> expectedValuesByTableName) { private void assertReadRecord(SourceRecord record, Map<String, List<SchemaAndValueField>> expectedValuesByTableName) {
VerifyRecord.isValidRead(record, PK_FIELD, 1); VerifyRecord.isValidRead(record, PK_FIELD, 1);
String actualTopicName = record.topic(); String actualTopicName = record.topic();
assertTrue("Invalid topic name for records", actualTopicName.startsWith("public.")); String topicPrefix = topicName("public.");
String tableName = actualTopicName.replace("public.", ""); assertTrue("Invalid topic name for records", actualTopicName.startsWith(topicPrefix));
String tableName = actualTopicName.replace(topicPrefix, "");
List<SchemaAndValueField> expectedValuesAndSchemasForTable = expectedValuesByTableName.get(tableName); List<SchemaAndValueField> expectedValuesAndSchemasForTable = expectedValuesByTableName.get(tableName);
assertNotNull("No expected values for " + tableName + " found", expectedValuesAndSchemasForTable); assertNotNull("No expected values for " + tableName + " found", expectedValuesAndSchemasForTable);
assertRecordSchemaAndValues(expectedValuesAndSchemasForTable, record, Envelope.FieldName.AFTER); assertRecordSchemaAndValues(expectedValuesAndSchemasForTable, record, Envelope.FieldName.AFTER);

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD; import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -119,7 +120,7 @@ public void shouldReceiveChangesForUpdates() throws Exception {
// the update record should be the last record // the update record should be the last record
SourceRecord updatedRecord = consumer.remove(); SourceRecord updatedRecord = consumer.remove();
String topicName = "public.test_table"; String topicName = topicName("public.test_table");
assertEquals(topicName, updatedRecord.topic()); assertEquals(topicName, updatedRecord.topic());
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1); VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
@ -159,7 +160,7 @@ public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
// the update should be the last record // the update should be the last record
SourceRecord updatedRecord = consumer.remove(); SourceRecord updatedRecord = consumer.remove();
String topicName = "public.test_table"; String topicName = topicName("public.test_table");
assertEquals(topicName, updatedRecord.topic()); assertEquals(topicName, updatedRecord.topic());
VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1); VerifyRecord.isValidUpdate(updatedRecord, PK_FIELD, 1);
@ -204,7 +205,7 @@ public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
recordsProducer.start(consumer); recordsProducer.start(consumer);
executeAndWait("UPDATE test_table SET text = 'update', pk = 2"); executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
String topicName = "public.test_table"; String topicName = topicName("public.test_table");
// first should be a delete of the old pk // first should be a delete of the old pk
SourceRecord deleteRecord = consumer.remove(); SourceRecord deleteRecord = consumer.remove();
@ -232,7 +233,7 @@ public void shouldReceiveChangesForDefaultValues() throws Exception {
executeAndWait(statements); executeAndWait(statements);
SourceRecord insertRecord = consumer.remove(); SourceRecord insertRecord = consumer.remove();
assertEquals("public.test_table", insertRecord.topic()); assertEquals(topicName("public.test_table"), insertRecord.topic());
VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2); VerifyRecord.isValidInsert(insertRecord, PK_FIELD, 2);
List<SchemaAndValueField> expectedSchemaAndValues = Arrays.asList( List<SchemaAndValueField> expectedSchemaAndValues = Arrays.asList(
new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"), new SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"),
@ -249,8 +250,10 @@ public void shouldReceiveChangesForDeletes() throws Exception {
recordsProducer.start(consumer); recordsProducer.start(consumer);
executeAndWait(statements); executeAndWait(statements);
String topicName = "public.test_table";
assertRecordInserted(topicName, PK_FIELD, 2); String topicPrefix = "public.test_table";
String topicName = topicName(topicPrefix);
assertRecordInserted(topicPrefix, PK_FIELD, 2);
// first entry removed // first entry removed
SourceRecord record = consumer.remove(); SourceRecord record = consumer.remove();
@ -289,7 +292,7 @@ private void assertInsert(String statement, List<SchemaAndValueField> expectedSc
private SourceRecord assertRecordInserted(String expectedTopicName, String pkColumn, int pk) throws InterruptedException { private SourceRecord assertRecordInserted(String expectedTopicName, String pkColumn, int pk) throws InterruptedException {
assertFalse("records not generated", consumer.isEmpty()); assertFalse("records not generated", consumer.isEmpty());
SourceRecord insertedRecord = consumer.remove(); SourceRecord insertedRecord = consumer.remove();
assertEquals(expectedTopicName, insertedRecord.topic()); assertEquals(topicName(expectedTopicName), insertedRecord.topic());
VerifyRecord.isValidInsert(insertedRecord, pkColumn, pk); VerifyRecord.isValidInsert(insertedRecord, pkColumn, pk);
return insertedRecord; return insertedRecord;
} }

View File

@ -128,4 +128,8 @@ protected static void executeDDL(String ddlFile) throws Exception {
connection.executeWithoutCommitting(statements); connection.executeWithoutCommitting(statements);
} }
} }
protected static String topicName(String suffix) {
return TestHelper.TEST_SERVER + "." + suffix;
}
} }

View File

@ -16,7 +16,7 @@
*/ */
public class Uuid { public class Uuid {
public static final String LOGICAL_NAME = "io.debezium.data.uuid"; public static final String LOGICAL_NAME = "io.debezium.data.Uuid";
/** /**
* Returns a {@link SchemaBuilder} for a Uuid field. You can use the resulting SchemaBuilder * Returns a {@link SchemaBuilder} for a Uuid field. You can use the resulting SchemaBuilder

View File

@ -17,7 +17,7 @@
*/ */
public class Point { public class Point {
public static final String LOGICAL_NAME = "io.debezium.data.geometry.point"; public static final String LOGICAL_NAME = "io.debezium.data.geometry.Point";
public static final String X_FIELD = "x"; public static final String X_FIELD = "x";
public static final String Y_FIELD = "y"; public static final String Y_FIELD = "y";

View File

@ -22,7 +22,7 @@
*/ */
public class MicroTimestamp { public class MicroTimestamp {
public static final String SCHEMA_NAME = "io.debezium.time.NanoTimestamp"; public static final String SCHEMA_NAME = "io.debezium.time.MicroTimestamp";
/** /**
* Returns a {@link SchemaBuilder} for a {@link MicroTimestamp}. The resulting schema will describe a field * Returns a {@link SchemaBuilder} for a {@link MicroTimestamp}. The resulting schema will describe a field