DBZ-1312 Omit constatnt db name in column and key mappers

This commit is contained in:
Jiri Pechanec 2020-05-27 15:24:40 +02:00
parent 444df78139
commit 8e7f3d14ca
10 changed files with 301 additions and 26 deletions

View File

@ -1400,6 +1400,198 @@ public void shouldHonorSourceTimestampMode() throws InterruptedException, SQLExc
stopConnector(); stopConnector();
} }
@Test
@FixFor("DBZ-1312")
public void useShortTableNamesForColumnMapper() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with("column.mask.with.4.chars", "dbo.tablea.cola")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct) recordA.value();
Assertions.assertThat(valueA.getStruct("after").getString("cola")).isEqualTo("****");
final Struct valueB = (Struct) recordB.value();
assertRecord((Struct) valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
stopConnector();
}
@Test
@FixFor("DBZ-1312")
public void useLongTableNamesForColumnMapper() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with("column.mask.with.4.chars", "testDB.dbo.tablea.cola")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct valueA = (Struct) recordA.value();
Assertions.assertThat(valueA.getStruct("after").getString("cola")).isEqualTo("****");
final Struct valueB = (Struct) recordB.value();
assertRecord((Struct) valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
stopConnector();
}
@Test
@FixFor("DBZ-1312")
public void useLongTableNamesForKeyMapper() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "testDB.dbo.tablea:cola")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct keyA = (Struct) recordA.key();
Assertions.assertThat(keyA.getString("cola")).isEqualTo("a");
final Struct valueB = (Struct) recordB.value();
assertRecord((Struct) valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
stopConnector();
}
@Test
@FixFor("DBZ-1312")
public void useShortTableNamesForKeyMapper() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START = 10;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "dbo.tablea:cola")
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
// Wait for snapshot completion
consumeRecordsByTopic(1);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
}
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final SourceRecord recordA = tableA.get(i);
final SourceRecord recordB = tableB.get(i);
final List<SchemaAndValueField> expectedRowB = Arrays.asList(
new SchemaAndValueField("id", Schema.INT32_SCHEMA, i + ID_START),
new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
final Struct keyA = (Struct) recordA.key();
Assertions.assertThat(keyA.getString("cola")).isEqualTo("a");
final Struct valueB = (Struct) recordB.value();
assertRecord((Struct) valueB.get("after"), expectedRowB);
assertNull(valueB.get("before"));
}
stopConnector();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) { private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
} }

View File

@ -13,6 +13,7 @@
import io.debezium.annotation.Immutable; import io.debezium.annotation.Immutable;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
import io.debezium.relational.Selectors.TableIdToStringMapper;
/** /**
* An immutable definition of a table's key. By default, the key will be * An immutable definition of a table's key. By default, the key will be
@ -94,9 +95,10 @@ public static class CustomKeyMapper {
* ex: inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4 * ex: inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
* *
* @param fullyQualifiedColumnNames a list of regex * @param fullyQualifiedColumnNames a list of regex
* @param tableIdMapper mapper of tableIds to a String
* @return a new {@code CustomKeyMapper} or null if fullyQualifiedColumnNames is invalid. * @return a new {@code CustomKeyMapper} or null if fullyQualifiedColumnNames is invalid.
*/ */
public static KeyMapper getInstance(String fullyQualifiedColumnNames) { public static KeyMapper getInstance(String fullyQualifiedColumnNames, TableIdToStringMapper tableIdMapper) {
if (fullyQualifiedColumnNames == null) { if (fullyQualifiedColumnNames == null) {
return null; return null;
} }
@ -122,7 +124,11 @@ public static KeyMapper getInstance(String fullyQualifiedColumnNames) {
.stream() .stream()
.filter(c -> { .filter(c -> {
final TableId tableId = table.id(); final TableId tableId = table.id();
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name())); if (tableIdMapper == null) {
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()));
}
return delegate.test(new ColumnId(tableId.catalog(), tableId.schema(), tableId.table(), c.name()))
|| delegate.test(new ColumnId(new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper), c.name()));
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
return candidates.isEmpty() ? table.primaryKeyColumns() : candidates; return candidates.isEmpty() ? table.primaryKeyColumns() : candidates;

View File

@ -334,13 +334,15 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
private final RelationalTableFilters tableFilters; private final RelationalTableFilters tableFilters;
private final TemporalPrecisionMode temporalPrecisionMode; private final TemporalPrecisionMode temporalPrecisionMode;
private final KeyMapper keyMapper; private final KeyMapper keyMapper;
private final TableIdToStringMapper tableIdMapper;
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter, protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter,
TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) { TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) {
super(config, logicalName, defaultSnapshotFetchSize); super(config, logicalName, defaultSnapshotFetchSize);
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE)); this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS)); this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper);
this.tableIdMapper = tableIdMapper;
if (systemTablesFilter != null && tableIdMapper != null) { if (systemTablesFilter != null && tableIdMapper != null) {
this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper); this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper);
@ -395,6 +397,10 @@ public boolean isSchemaChangesHistoryEnabled() {
return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES); return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES);
} }
public TableIdToStringMapper getTableIdMapper() {
return tableIdMapper;
}
private static int validateTableBlacklist(Configuration config, Field field, ValidationOutput problems) { private static int validateTableBlacklist(Configuration config, Field field, ValidationOutput problems) {
String whitelist = config.getString(TABLE_WHITELIST); String whitelist = config.getString(TABLE_WHITELIST);
String blacklist = config.getString(TABLE_BLACKLIST); String blacklist = config.getString(TABLE_BLACKLIST);

View File

@ -13,7 +13,6 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.relational.Key.KeyMapper; import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.ColumnNameFilter;
@ -42,7 +41,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId
private final SchemasByTableId schemasByTableId; private final SchemasByTableId schemasByTableId;
private final Tables tables; private final Tables tables;
protected RelationalDatabaseSchema(CommonConnectorConfig config, TopicSelector<TableId> topicSelector, protected RelationalDatabaseSchema(RelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector,
TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder, TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder,
boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) { boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) {
@ -50,7 +49,7 @@ protected RelationalDatabaseSchema(CommonConnectorConfig config, TopicSelector<T
this.schemaBuilder = schemaBuilder; this.schemaBuilder = schemaBuilder;
this.tableFilter = tableFilter; this.tableFilter = tableFilter;
this.columnFilter = columnFilter; this.columnFilter = columnFilter;
this.columnMappers = ColumnMappers.create(config.getConfig()); this.columnMappers = ColumnMappers.create(config);
this.customKeysMapper = customKeysMapper; this.customKeysMapper = customKeysMapper;
this.schemaPrefix = getSchemaPrefix(config.getLogicalName()); this.schemaPrefix = getSchemaPrefix(config.getLogicalName());

View File

@ -6,6 +6,7 @@
package io.debezium.relational; package io.debezium.relational;
import io.debezium.annotation.Immutable; import io.debezium.annotation.Immutable;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
/** /**
@ -81,13 +82,27 @@ protected static TableId parse(String[] parts, int numParts, boolean useCatalogB
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
* show a schema for this table * show a schema for this table
* @param tableName the name of the table; may not be null * @param tableName the name of the table; may not be null
* @param tableIdMapper the customization of fully quailified table name
*/ */
public TableId(String catalogName, String schemaName, String tableName) { public TableId(String catalogName, String schemaName, String tableName, TableIdToStringMapper tableIdMapper) {
this.catalogName = catalogName; this.catalogName = catalogName;
this.schemaName = schemaName; this.schemaName = schemaName;
this.tableName = tableName; this.tableName = tableName;
assert this.tableName != null; assert this.tableName != null;
this.id = tableId(this.catalogName, this.schemaName, this.tableName); this.id = tableIdMapper == null ? tableId(this.catalogName, this.schemaName, this.tableName) : tableIdMapper.toString(this);
}
/**
* Create a new table identifier.
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param tableName the name of the table; may not be null
*/
public TableId(String catalogName, String schemaName, String tableName) {
this(catalogName, schemaName, tableName, null);
} }
/** /**

View File

@ -20,7 +20,9 @@
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.ColumnId; import io.debezium.relational.ColumnId;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Selectors; import io.debezium.relational.Selectors;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.ValueConverter; import io.debezium.relational.ValueConverter;
@ -40,14 +42,15 @@ public class ColumnMappers {
* @return the builder; never null * @return the builder; never null
*/ */
public static Builder build() { public static Builder build() {
return new Builder(); return new Builder(null);
} }
/** /**
* Builds a new {@link ColumnMappers} instance based on the given configuration. * Builds a new {@link ColumnMappers} instance based on the given configuration.
*/ */
public static ColumnMappers create(Configuration config) { public static ColumnMappers create(RelationalDatabaseConnectorConfig connectorConfig) {
Builder builder = new Builder(); final Builder builder = new Builder(connectorConfig.getTableIdMapper());
final Configuration config = connectorConfig.getConfig();
// Define the truncated, masked, and mapped columns ... // Define the truncated, masked, and mapped columns ...
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings); config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings);
@ -75,6 +78,11 @@ public static ColumnMappers create(Configuration config) {
public static class Builder { public static class Builder {
private final List<MapperRule> rules = new ArrayList<>(); private final List<MapperRule> rules = new ArrayList<>();
private final TableIdToStringMapper tableIdMapper;
public Builder(TableIdToStringMapper tableIdMapper) {
this.tableIdMapper = tableIdMapper;
}
/** /**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
@ -88,24 +96,41 @@ public static class Builder {
public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) { public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) {
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> fullyQualifiedColumnName(tableId, column)); BiPredicate<TableId, Column> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> fullyQualifiedColumnName(tableId, column));
rules.add(new MapperRule(columnMatcher, mapper)); rules.add(new MapperRule(columnMatcher, mapper));
if (tableIdMapper != null) {
columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> mappedTableColumnName(tableId, column));
rules.add(new MapperRule(columnMatcher, mapper));
}
return this; return this;
} }
public static String fullyQualifiedColumnName(TableId tableId, Column column) { public String fullyQualifiedColumnName(TableId tableId, Column column) {
ColumnId id = new ColumnId(tableId, column.name()); ColumnId id = new ColumnId(tableId, column.name());
return id.toString(); return id.toString();
} }
public String mappedTableColumnName(TableId tableId, Column column) {
ColumnId id = new ColumnId(mappedTableId(tableId), column.name());
return id.toString();
}
public Builder mapByDatatype(String columnDatatypes, ColumnMapper mapper) { public Builder mapByDatatype(String columnDatatypes, ColumnMapper mapper) {
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> fullyQualifiedColumnDatatype(tableId, column)); BiPredicate<TableId, Column> columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> fullyQualifiedColumnDatatype(tableId, column));
rules.add(new MapperRule(columnMatcher, mapper)); rules.add(new MapperRule(columnMatcher, mapper));
if (tableIdMapper != null) {
columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> mappedTableColumnDatatype(tableId, column));
rules.add(new MapperRule(columnMatcher, mapper));
}
return this; return this;
} }
public static String fullyQualifiedColumnDatatype(TableId tableId, Column column) { public String fullyQualifiedColumnDatatype(TableId tableId, Column column) {
return tableId.toString() + "." + column.typeName(); return tableId.toString() + "." + column.typeName();
} }
public String mappedTableColumnDatatype(TableId tableId, Column column) {
return mappedTableId(tableId).toString() + "." + column.typeName();
}
/** /**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
* expression patterns. * expression patterns.
@ -246,6 +271,10 @@ public Builder map(String fullyQualifiedColumnNames, String mapperClassName, Con
public ColumnMappers build() { public ColumnMappers build() {
return new ColumnMappers(rules); return new ColumnMappers(rules);
} }
private TableId mappedTableId(TableId tableId) {
return new TableId(tableId.catalog(), tableId.schema(), tableId.table(), tableIdMapper);
}
} }
private final List<MapperRule> rules; private final List<MapperRule> rules;

View File

@ -320,7 +320,7 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
public void shouldBuildTableSchemaFromTableWithCustomKey() { public void shouldBuildTableSchemaFromTableWithCustomKey() {
table = table.edit().setPrimaryKeyNames().create(); table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3")); .create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
Schema keys = schema.keySchema(); Schema keys = schema.keySchema();
assertThat(keys).isNotNull(); assertThat(keys).isNotNull();
@ -333,7 +333,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
@FixFor("DBZ-1015") @FixFor("DBZ-1015")
public void shouldOverrideIdentityKey() { public void shouldOverrideIdentityKey() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3")); .create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull(); assertThat(schema).isNotNull();
Schema keys = schema.keySchema(); Schema keys = schema.keySchema();
assertThat(keys).isNotNull(); assertThat(keys).isNotNull();
@ -375,7 +375,7 @@ public void customKeyMapperShouldMapMultipleTables() {
.create()) .create())
.create(); .create();
KeyMapper keyMapper = CustomKeyMapper.getInstance("(.*).table:C2,C3;(.*).table2:C1"); KeyMapper keyMapper = CustomKeyMapper.getInstance("(.*).table:C2,C3;(.*).table2:C1", null);
schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false) schema = new TableSchemaBuilder(new JdbcValueConverters(), adjuster, customConverterRegistry, SchemaBuilder.struct().build(), false)
.create(prefix, "sometopic", table, null, null, keyMapper); .create(prefix, "sometopic", table, null, null, keyMapper);

View File

@ -13,8 +13,12 @@
import org.junit.Test; import org.junit.Test;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ValueConverter; import io.debezium.relational.ValueConverter;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -31,6 +35,29 @@ public class ColumnMappersTest {
private ValueConverter converter; private ValueConverter converter;
private String fullyQualifiedNames; private String fullyQualifiedNames;
private static class TestRelationalDatabaseConfig extends RelationalDatabaseConnectorConfig {
protected TestRelationalDatabaseConfig(Configuration config, String logicalName, TableFilter systemTablesFilter,
TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) {
super(config, logicalName, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize);
}
@Override
public String getContextName() {
return null;
}
@Override
public String getConnectorName() {
return null;
}
@Override
protected SourceInfoStructMaker<?> getSourceInfoStructMaker(Version version) {
return null;
}
}
@Before @Before
public void beforeEach() { public void beforeEach() {
column = Column.editor().name("firstName").jdbcType(Types.VARCHAR).type("VARCHAR").position(1).create(); column = Column.editor().name("firstName").jdbcType(Types.VARCHAR).type("VARCHAR").position(1).create();
@ -46,7 +73,7 @@ public void shouldNotFindMapperForUnmatchedColumn() {
.with("column.truncate.to.10.chars", fullyQualifiedNames) .with("column.truncate.to.10.chars", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(config); mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0));
converter = mappers.mappingConverterFor(tableId, column2); converter = mappers.mappingConverterFor(tableId, column2);
assertThat(converter).isNull(); assertThat(converter).isNull();
} }
@ -57,7 +84,8 @@ public void shouldTruncateStrings() {
.with("column.truncate.to.10.chars", fullyQualifiedNames.toUpperCase()) .with("column.truncate.to.10.chars", fullyQualifiedNames.toUpperCase())
.build(); .build();
mappers = ColumnMappers.create(config); mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0));
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
assertThat(converter.convert("12345678901234567890").toString()).isEqualTo("1234567890"); assertThat(converter.convert("12345678901234567890").toString()).isEqualTo("1234567890");
@ -79,7 +107,7 @@ public void shouldMaskStringsToFixedLength() {
.with("column.mask.with.10.chars", fullyQualifiedNames) .with("column.mask.with.10.chars", fullyQualifiedNames)
.build(); .build();
mappers = ColumnMappers.create(config); // exact case mappers = ColumnMappers.create(new TestRelationalDatabaseConfig(config, "test", null, null, 0)); // exact case
converter = mappers.mappingConverterFor(tableId, column); converter = mappers.mappingConverterFor(tableId, column);
assertThat(converter).isNotNull(); assertThat(converter).isNotNull();
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue); assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);

View File

@ -1225,14 +1225,14 @@ Fully-qualified tables could be defined as _pdbName_._schemaName_._tableName_.
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. |An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.
The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively. The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively.
Useful to properly size corresponding columns in sink databases. Useful to properly size corresponding columns in sink databases.
Fully-qualified names for columns are of the form _databaseName_._tableName_._columnName_, or _databaseName_._schemaName_._tableName_._columnName_. Fully-qualified names for columns are of the form _tableName_._columnName_, or _schemaName_._tableName_._columnName_.
|[[oracle-property-datatype-propagate-source-type]]<<oracle-property-datatype-propagate-source-type, `datatype.propagate.source.type`>> |[[oracle-property-datatype-propagate-source-type]]<<oracle-property-datatype-propagate-source-type, `datatype.propagate.source.type`>>
|_n/a_ |_n/a_
|An optional comma-separated list of regular expressions that match the database-specific data type name of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages. |An optional comma-separated list of regular expressions that match the database-specific data type name of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.
The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively. The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively.
Useful to properly size corresponding columns in sink databases. Useful to properly size corresponding columns in sink databases.
Fully-qualified data type names are of the form _databaseName_._tableName_._typeName_, or _databaseName_._schemaName_._tableName_._typeName_. Fully-qualified data type names are of the form _tableName_._typeName_, or _schemaName_._tableName_._typeName_.
See the {link-prefix}:{link-oracle-connector}#oracle-data-types[list of Oracle-specific data type names]. See the {link-prefix}:{link-oracle-connector}#oracle-data-types[list of Oracle-specific data type names].
|[[oracle-property-heartbeat-interval-ms]]<<oracle-property-heartbeat-interval-ms, `heartbeat.interval.ms`>> |[[oracle-property-heartbeat-interval-ms]]<<oracle-property-heartbeat-interval-ms, `heartbeat.interval.ms`>>

View File

@ -1426,11 +1426,11 @@ Note that primary key columns are always included in the event's key, also if bl
Based on the used hash function referential integrity is kept while data is pseudonymized. Supported hash functions are described in the {link-java7-standard-names}[MessageDigest section] of the Java Cryptography Architecture Standard Algorithm Name Documentation. Based on the used hash function referential integrity is kept while data is pseudonymized. Supported hash functions are described in the {link-java7-standard-names}[MessageDigest section] of the Java Cryptography Architecture Standard Algorithm Name Documentation.
The hash is automatically shortened to the length of the column. The hash is automatically shortened to the length of the column.
Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _databaseName_._schemaName_._tableName_._columnName_. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_.
Example: Example:
column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = testDB.dbo.orders.customerName, testDB.dbo.shipment.customerName column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = dbo.orders.customerName, dbo.shipment.customerName
where `CzQMA0cB5K` is a randomly selected salt. where `CzQMA0cB5K` is a randomly selected salt.
@ -1448,11 +1448,11 @@ Emitting the tombstone event (the default behavior) allows Kafka to completely d
|[[sqlserver-property-column-truncate-to-length-chars]]<<sqlserver-property-column-truncate-to-length-chars, `column.truncate.to._length_.chars`>> |[[sqlserver-property-column-truncate-to-length-chars]]<<sqlserver-property-column-truncate-to-length-chars, `column.truncate.to._length_.chars`>>
|_n/a_ |_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be truncated in the change event message values if the field values are longer than the specified number of characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer. Fully-qualified names for columns are of the form _databaseName_._schemaName_._tableName_._columnName_. |An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be truncated in the change event message values if the field values are longer than the specified number of characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer. Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_.
|[[sqlserver-property-column-mask-with-length-chars]]<<sqlserver-property-column-mask-with-length-chars, `column.mask.with._length_.chars`>> |[[sqlserver-property-column-mask-with-length-chars]]<<sqlserver-property-column-mask-with-length-chars, `column.mask.with._length_.chars`>>
|_n/a_ |_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk (`*`) characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _databaseName_._schemaName_._tableName_._columnName_. |An optional comma-separated list of regular expressions that match the fully-qualified names of character-based columns whose values should be replaced in the change event message values with a field value consisting of the specified number of asterisk (`*`) characters. Multiple properties with different lengths can be used in a single configuration, although in each the length must be a positive integer or zero. Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_.
|[[sqlserver-property-column-propagate-source-type]]<<sqlserver-property-column-propagate-source-type, `column.propagate.source.type`>> |[[sqlserver-property-column-propagate-source-type]]<<sqlserver-property-column-propagate-source-type, `column.propagate.source.type`>>
|_n/a_ |_n/a_
@ -1473,7 +1473,7 @@ See {link-prefix}:{link-sqlserver-connector}#sqlserver-data-types[] for the list
|_empty string_ |_empty string_
| A semi-colon list of regular expressions that match fully-qualified tables and columns to map a primary key. + | A semi-colon list of regular expressions that match fully-qualified tables and columns to map a primary key. +
Each item (regular expression) must match the fully-qualified `<fully-qualified table>:<a comma-separated list of columns>` representing the custom key. + Each item (regular expression) must match the fully-qualified `<fully-qualified table>:<a comma-separated list of columns>` representing the custom key. +
Fully-qualified tables could be defined as _databaseName_._schemaName_._tableName_. Fully-qualified tables could be defined as _schemaName_._tableName_.
|=== |===
The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration. The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration.