DBZ-2975: Include database name into topic and schema names

Co-authored-by: Mike Kamornikov <mikekamornikov@gmail.com>
This commit is contained in:
Sergei Morozov 2021-10-01 16:12:20 -07:00 committed by Gunnar Morling
parent aa2d8539f9
commit 4a03c5df17
10 changed files with 120 additions and 53 deletions

View File

@ -95,7 +95,8 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
schemaNameAdjuster, schemaNameAdjuster,
connectorConfig.customConverterRegistry(), connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()), connectorConfig.getSanitizeFieldNames(),
false),
tableIdCaseInsensitive, connectorConfig.getKeyMapper()); tableIdCaseInsensitive, connectorConfig.getKeyMapper());
this.ddlParser = new MySqlAntlrDdlParser(valueConverter, getTableFilter()); this.ddlParser = new MySqlAntlrDdlParser(valueConverter, getTableFilter());

View File

@ -105,7 +105,8 @@ public MySqlSchema(MySqlConnectorConfig configuration,
getValueConverters(configuration), SchemaNameAdjuster.create(), getValueConverters(configuration), SchemaNameAdjuster.create(),
configuration.customConverterRegistry(), configuration.customConverterRegistry(),
configuration.getSourceInfoStructMaker().schema(), configuration.getSourceInfoStructMaker().schema(),
configuration.getSanitizeFieldNames()), configuration.getSanitizeFieldNames(),
false),
tableIdCaseInsensitive, tableIdCaseInsensitive,
configuration.getKeyMapper()); configuration.getKeyMapper());

View File

@ -42,7 +42,8 @@ public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueCo
schemaNameAdjuster, schemaNameAdjuster,
connectorConfig.customConverterRegistry(), connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()), connectorConfig.getSanitizeFieldNames(),
false),
TableNameCaseSensitivity.INSENSITIVE.equals(tableNameCaseSensitivity), TableNameCaseSensitivity.INSENSITIVE.equals(tableNameCaseSensitivity),
connectorConfig.getKeyMapper()); connectorConfig.getKeyMapper());

View File

@ -69,7 +69,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter) { private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, PostgresValueConverter valueConverter) {
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getSanitizeFieldNames()); config.getSanitizeFieldNames(), false);
} }
/** /**

View File

@ -37,7 +37,8 @@ public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, ValueCo
schemaNameAdjuster, schemaNameAdjuster,
connectorConfig.customConverterRegistry(), connectorConfig.customConverterRegistry(),
connectorConfig.getSourceInfoStructMaker().schema(), connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.getSanitizeFieldNames()), connectorConfig.getSanitizeFieldNames(),
connectorConfig.isMultiPartitionModeEnabled()),
false, connectorConfig.getKeyMapper()); false, connectorConfig.getKeyMapper());
} }

View File

@ -7,6 +7,7 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
import io.debezium.schema.TopicSelector.DataCollectionTopicNamer;
/** /**
* The topic naming strategy based on connector configuration and table name * The topic naming strategy based on connector configuration and table name
@ -17,7 +18,14 @@
public class SqlServerTopicSelector { public class SqlServerTopicSelector {
public static TopicSelector<TableId> defaultSelector(SqlServerConnectorConfig connectorConfig) { public static TopicSelector<TableId> defaultSelector(SqlServerConnectorConfig connectorConfig) {
return TopicSelector.defaultSelector(connectorConfig, DataCollectionTopicNamer<TableId> topicNamer;
(tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.schema(), tableId.table())); if (connectorConfig.isMultiPartitionModeEnabled()) {
topicNamer = (tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.catalog(), tableId.schema(), tableId.table());
}
else {
topicNamer = (tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.schema(), tableId.table());
}
return TopicSelector.defaultSelector(connectorConfig, topicNamer);
} }
} }

View File

@ -104,7 +104,7 @@ public void takeSnapshotWithOverridesInSinglePartitionMode() throws Exception {
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3", RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [dbo].[table3] where soft_deleted = 0") "SELECT * FROM [dbo].[table3] where soft_deleted = 0")
.build(); .build();
takeSnapshotWithOverrides(config); takeSnapshotWithOverrides(config, "server1.dbo.");
} }
@Test @Test
@ -121,17 +121,17 @@ public void takeSnapshotWithOverridesInMultiPartitionMode() throws Exception {
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3", RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [" + TestHelper.TEST_DATABASE + "].[dbo].[table3] where soft_deleted = 0") "SELECT * FROM [" + TestHelper.TEST_DATABASE + "].[dbo].[table3] where soft_deleted = 0")
.build(); .build();
takeSnapshotWithOverrides(config); takeSnapshotWithOverrides(config, "server1.testDB.dbo.");
} }
private void takeSnapshotWithOverrides(Configuration config) throws Exception { private void takeSnapshotWithOverrides(Configuration config, String topicPrefix) throws Exception {
start(SqlServerConnector.class, config); start(SqlServerConnector.class, config);
assertConnectorIsRunning(); assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2); SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2);
List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1"); List<SourceRecord> table1 = records.recordsForTopic(topicPrefix + "table1");
List<SourceRecord> table2 = records.recordsForTopic("server1.dbo.table2"); List<SourceRecord> table2 = records.recordsForTopic(topicPrefix + "table2");
List<SourceRecord> table3 = records.recordsForTopic("server1.dbo.table3"); List<SourceRecord> table3 = records.recordsForTopic(topicPrefix + "table3");
// soft_deleted records should be excluded for table1 and table3 // soft_deleted records should be excluded for table1 and table3
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2); assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2);
@ -170,7 +170,7 @@ public void takeSnapshotWithOverridesWithAdditionalWhitespaceInSinglePartitionMo
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3", RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [dbo].[table3] where soft_deleted = 0") "SELECT * FROM [dbo].[table3] where soft_deleted = 0")
.build(); .build();
takeSnapshotWithOverridesWithAdditionalWhitespace(config); takeSnapshotWithOverridesWithAdditionalWhitespace(config, "server1.dbo.");
} }
@Test @Test
@ -187,17 +187,17 @@ public void takeSnapshotWithOverridesWithAdditionalWhitespaceInMultiPartitionMod
RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3", RelationalDatabaseConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".dbo.table3",
"SELECT * FROM [" + TestHelper.TEST_DATABASE + "].[dbo].[table3] where soft_deleted = 0") "SELECT * FROM [" + TestHelper.TEST_DATABASE + "].[dbo].[table3] where soft_deleted = 0")
.build(); .build();
takeSnapshotWithOverridesWithAdditionalWhitespace(config); takeSnapshotWithOverridesWithAdditionalWhitespace(config, "server1.testDB.dbo.");
} }
private void takeSnapshotWithOverridesWithAdditionalWhitespace(Configuration config) throws Exception { private void takeSnapshotWithOverridesWithAdditionalWhitespace(Configuration config, String topicPrefix) throws Exception {
start(SqlServerConnector.class, config); start(SqlServerConnector.class, config);
assertConnectorIsRunning(); assertConnectorIsRunning();
SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2); SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE + (INITIAL_RECORDS_PER_TABLE + INITIAL_RECORDS_PER_TABLE) / 2);
List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1"); List<SourceRecord> table1 = records.recordsForTopic(topicPrefix + "table1");
List<SourceRecord> table2 = records.recordsForTopic("server1.dbo.table2"); List<SourceRecord> table2 = records.recordsForTopic(topicPrefix + "table2");
List<SourceRecord> table3 = records.recordsForTopic("server1.dbo.table3"); List<SourceRecord> table3 = records.recordsForTopic(topicPrefix + "table3");
// soft_deleted records should be excluded for table1 and table3 // soft_deleted records should be excluded for table1 and table3
assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2); assertThat(table1).hasSize(INITIAL_RECORDS_PER_TABLE / 2);

View File

@ -2515,6 +2515,27 @@ public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Excep
} }
@Test
@FixFor("DBZ-2975")
public void shouldIncludeDatabaseNameIntoTopicAndSchemaNamesInMultiPartitionMode() throws Exception {
final Configuration config = TestHelper.defaultMultiPartitionConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
TestHelper.waitForSnapshotToBeCompleted();
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> tableA = records.recordsForTopic("server1.testDB.dbo.tablea");
Assertions.assertThat(tableA).hasSize(1);
final SourceRecord record = tableA.get(0);
assertThat(record.keySchema().name()).isEqualTo("server1.testDB.dbo.tablea.Key");
assertThat(record.valueSchema().name()).isEqualTo("server1.testDB.dbo.tablea.Envelope");
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) { private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
} }

View File

@ -57,6 +57,7 @@ public class TableSchemaBuilder {
private final Schema sourceInfoSchema; private final Schema sourceInfoSchema;
private final FieldNamer<Column> fieldNamer; private final FieldNamer<Column> fieldNamer;
private final CustomConverterRegistry customConverterRegistry; private final CustomConverterRegistry customConverterRegistry;
private final boolean multiPartitionMode;
/** /**
* Create a new instance of the builder. * Create a new instance of the builder.
@ -65,13 +66,15 @@ public class TableSchemaBuilder {
* null * null
* @param schemaNameAdjuster the adjuster for schema names; may not be null * @param schemaNameAdjuster the adjuster for schema names; may not be null
*/ */
public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, SchemaNameAdjuster schemaNameAdjuster, CustomConverterRegistry customConverterRegistry, public TableSchemaBuilder(ValueConverterProvider valueConverterProvider, SchemaNameAdjuster schemaNameAdjuster,
Schema sourceInfoSchema, boolean sanitizeFieldNames) { CustomConverterRegistry customConverterRegistry, Schema sourceInfoSchema,
boolean sanitizeFieldNames, boolean multiPartitionMode) {
this.schemaNameAdjuster = schemaNameAdjuster; this.schemaNameAdjuster = schemaNameAdjuster;
this.valueConverterProvider = valueConverterProvider; this.valueConverterProvider = valueConverterProvider;
this.sourceInfoSchema = sourceInfoSchema; this.sourceInfoSchema = sourceInfoSchema;
this.fieldNamer = FieldNameSelector.defaultSelector(sanitizeFieldNames); this.fieldNamer = FieldNameSelector.defaultSelector(sanitizeFieldNames);
this.customConverterRegistry = customConverterRegistry; this.customConverterRegistry = customConverterRegistry;
this.multiPartitionMode = multiPartitionMode;
} }
/** /**
@ -156,6 +159,9 @@ private String tableSchemaName(TableId tableId) {
else if (Strings.isNullOrEmpty(tableId.schema())) { else if (Strings.isNullOrEmpty(tableId.schema())) {
return tableId.catalog() + "." + tableId.table(); return tableId.catalog() + "." + tableId.table();
} }
else if (multiPartitionMode) {
return tableId.catalog() + "." + tableId.schema() + "." + tableId.table();
}
// When both catalog and schema is present then only schema is used // When both catalog and schema is present then only schema is used
else { else {
return tableId.schema() + "." + tableId.table(); return tableId.schema() + "." + tableId.table();

View File

@ -140,13 +140,15 @@ public void checkPreconditions() {
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void shouldFailToBuildTableSchemaFromNullTable() { public void shouldFailToBuildTableSchemaFromNullTable() {
new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", null, null, null, null); .create(prefix, "sometopic", null, null, null, null);
} }
@Test @Test
public void shouldBuildTableSchemaFromTable() { public void shouldBuildTableSchemaFromTable() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
} }
@ -155,7 +157,8 @@ public void shouldBuildTableSchemaFromTable() {
@FixFor("DBZ-1089") @FixFor("DBZ-1089")
public void shouldBuildCorrectSchemaNames() { public void shouldBuildCorrectSchemaNames() {
// table id with catalog and schema // table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key"); assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key");
@ -166,7 +169,8 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId("testDb", null, "testTable")) .tableId(new TableId("testDb", null, "testTable"))
.create(); .create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
@ -178,7 +182,8 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId(null, "testSchema", "testTable")) .tableId(new TableId(null, "testSchema", "testTable"))
.create(); .create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
@ -190,7 +195,8 @@ public void shouldBuildCorrectSchemaNames() {
.tableId(new TableId(null, null, "testTable")) .tableId(new TableId(null, null, "testTable"))
.create(); .create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
@ -198,10 +204,23 @@ public void shouldBuildCorrectSchemaNames() {
assertThat(schema.valueSchema().name()).isEqualTo("testTable.Value"); assertThat(schema.valueSchema().name()).isEqualTo("testTable.Value");
} }
@Test
@FixFor("DBZ-2975")
public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, true)
.create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("catalog.schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("catalog.schema.table.Value");
}
@Test @Test
public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() { public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create(); table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
// Check the keys ... // Check the keys ...
@ -261,7 +280,8 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
@FixFor("DBZ-1044") @FixFor("DBZ-1044")
public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKey() { public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create(); table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), true) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
// Check the keys ... // Check the keys ...
@ -328,7 +348,8 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
public void shouldSanitizeFieldNamesAndValidateSerialization() { public void shouldSanitizeFieldNamesAndValidateSerialization() {
LogInterceptor logInterceptor = new LogInterceptor(TableSchemaBuilder.class); LogInterceptor logInterceptor = new LogInterceptor(TableSchemaBuilder.class);
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), true) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
Struct key = (Struct) schema.keyFromColumnData(keyData); Struct key = (Struct) schema.keyFromColumnData(keyData);
@ -346,7 +367,8 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
@FixFor("DBZ-1015") @FixFor("DBZ-1015")
public void shouldBuildTableSchemaFromTableWithCustomKey() { public void shouldBuildTableSchemaFromTableWithCustomKey() {
table = table.edit().setPrimaryKeyNames().create(); table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null)); .create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
Schema keys = schema.keySchema(); Schema keys = schema.keySchema();
@ -359,7 +381,8 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
@Test @Test
@FixFor("DBZ-1015") @FixFor("DBZ-1015")
public void shouldOverrideIdentityKey() { public void shouldOverrideIdentityKey() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null)); .create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
Schema keys = schema.keySchema(); Schema keys = schema.keySchema();
@ -373,7 +396,8 @@ public void shouldOverrideIdentityKey() {
@Test @Test
@FixFor("DBZ-1015") @FixFor("DBZ-1015")
public void shouldFallbackToIdentyKeyWhenCustomMapperIsNull() { public void shouldFallbackToIdentyKeyWhenCustomMapperIsNull() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null); .create(prefix, "sometopic", table, null, null, null);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
Schema keys = schema.keySchema(); Schema keys = schema.keySchema();
@ -404,7 +428,8 @@ public void customKeyMapperShouldMapMultipleTables() {
KeyMapper keyMapper = CustomKeyMapper.getInstance("(.*).table:C2,C3;(.*).table2:C1", null); KeyMapper keyMapper = CustomKeyMapper.getInstance("(.*).table:C2,C3;(.*).table2:C1", null);
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, keyMapper); .create(prefix, "sometopic", table, null, null, keyMapper);
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
@ -415,7 +440,8 @@ public void customKeyMapperShouldMapMultipleTables() {
assertThat(keys.field("C2").name()).isEqualTo("C2"); assertThat(keys.field("C2").name()).isEqualTo("C2");
assertThat(keys.field("C3").name()).isEqualTo("C3"); assertThat(keys.field("C3").name()).isEqualTo("C3");
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, keyMapper); .create(prefix, "sometopic", table2, null, null, keyMapper);
assertThat(schema2).isNotNull(); assertThat(schema2).isNotNull();
@ -442,7 +468,8 @@ public void defaultKeyMapperShouldOrderKeyColumnsBasedOnPrimaryKeyColumnNamesOrd
.setPrimaryKeyNames("t2ID", "t1ID") .setPrimaryKeyNames("t2ID", "t1ID")
.create(); .create();
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, null); .create(prefix, "sometopic", table2, null, null, null);
Schema key2 = schema2.keySchema(); Schema key2 = schema2.keySchema();
@ -473,7 +500,8 @@ public void mapperConvertersShouldLeaveEmptyDatesAsZero() {
ColumnMappers mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); ColumnMappers mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0));
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, mappers, null); .create(prefix, "sometopic", table2, null, mappers, null);
Struct value = schema.valueFromColumnData(data); Struct value = schema.valueFromColumnData(data);