DBZ-644 Unifying retrieval of ColumnMappers in RelationalDatabaseSchema
This commit is contained in:
parent
ac515b8064
commit
8744488c8a
@ -18,7 +18,6 @@
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.Tables.TableNameFilter;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.util.Collect;
|
||||
|
||||
/**
|
||||
@ -60,7 +59,6 @@ protected static List<String> withoutBuiltInDatabases(Collection<String> dbNames
|
||||
private final Predicate<String> isBuiltInDb;
|
||||
private final Predicate<TableId> isBuiltInTable;
|
||||
private final Predicate<ColumnId> columnFilter;
|
||||
private final ColumnMappers columnMappers;
|
||||
|
||||
/**
|
||||
* @param config the configuration; may not be null
|
||||
@ -94,13 +92,6 @@ public Filters(Configuration config) {
|
||||
|
||||
// Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
|
||||
this.columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));
|
||||
|
||||
// Define the truncated, masked, and mapped columns ...
|
||||
ColumnMappers.Builder columnMapperBuilder = ColumnMappers.create();
|
||||
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", columnMapperBuilder::truncateStrings);
|
||||
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", columnMapperBuilder::maskStrings);
|
||||
config.forEachMatchingFieldName("column\\.add\\.original\\.type", columnMapperBuilder::addOriginalType);
|
||||
this.columnMappers = columnMapperBuilder.build();
|
||||
}
|
||||
|
||||
public Predicate<String> databaseFilter() {
|
||||
@ -132,9 +123,4 @@ public Predicate<String> builtInDatabaseFilter() {
|
||||
public Predicate<ColumnId> columnFilter() {
|
||||
return columnFilter;
|
||||
}
|
||||
|
||||
public ColumnMappers columnMappers() {
|
||||
return columnMappers;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -84,11 +84,11 @@ public class MySqlSchema extends RelationalDatabaseSchema {
|
||||
*/
|
||||
public MySqlSchema(MySqlConnectorConfig configuration, String serverName, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, MySqlTopicSelector topicSelector) {
|
||||
super(
|
||||
configuration.getConfig(),
|
||||
serverName,
|
||||
topicSelector,
|
||||
new Filters(configuration.getConfig()).tableFilter(),
|
||||
new Filters(configuration.getConfig()).columnFilter(),
|
||||
new Filters(configuration.getConfig()).columnMappers(),
|
||||
new TableSchemaBuilder(
|
||||
getValueConverters(configuration.getConfig()), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)
|
||||
,
|
||||
|
@ -52,8 +52,8 @@ public class PostgresSchema extends RelationalDatabaseSchema {
|
||||
*/
|
||||
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry,
|
||||
PostgresTopicSelector topicSelector) {
|
||||
super(config.serverName(), topicSelector, new Filters(config).tableFilter(), new Filters(config).columnFilter(),
|
||||
null, getTableSchemaBuilder(config, typeRegistry), false);
|
||||
super(config.getConfig(), config.serverName(), topicSelector, new Filters(config).tableFilter(),
|
||||
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry), false);
|
||||
|
||||
this.filters = new Filters(config);
|
||||
this.typeRegistry = typeRegistry;
|
||||
|
@ -7,15 +7,15 @@
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.schema.HistorizedDatabaseSchema;
|
||||
import io.debezium.schema.TopicSelector;
|
||||
|
||||
public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatabaseSchema implements HistorizedDatabaseSchema<TableId> {
|
||||
|
||||
protected HistorizedRelationalDatabaseSchema(String serverName, TopicSelector<TableId> topicSelector,
|
||||
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnMappers,
|
||||
protected HistorizedRelationalDatabaseSchema(Configuration config, String serverName,
|
||||
TopicSelector<TableId> topicSelector, Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter,
|
||||
TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) {
|
||||
super(serverName, topicSelector, tableFilter, columnFilter, columnMappers, schemaBuilder, tableIdCaseInsensitive);
|
||||
super(config, serverName, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive);
|
||||
}
|
||||
}
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.mapping.ColumnMappers;
|
||||
import io.debezium.schema.DatabaseSchema;
|
||||
import io.debezium.schema.TopicSelector;
|
||||
@ -33,15 +34,15 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId
|
||||
private final SchemasByTableId schemasByTableId;
|
||||
private final Tables tables;
|
||||
|
||||
protected RelationalDatabaseSchema(String serverName, TopicSelector<TableId> topicSelector,
|
||||
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnMappers,
|
||||
TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) {
|
||||
protected RelationalDatabaseSchema(Configuration config, String serverName, TopicSelector<TableId> topicSelector,
|
||||
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, TableSchemaBuilder schemaBuilder,
|
||||
boolean tableIdCaseInsensitive) {
|
||||
|
||||
this.topicSelector = topicSelector;
|
||||
this.schemaBuilder = schemaBuilder;
|
||||
this.tableFilter = tableFilter;
|
||||
this.columnFilter = columnFilter;
|
||||
this.columnMappers = columnMappers;
|
||||
this.columnMappers = ColumnMappers.create(config);
|
||||
|
||||
this.schemaPrefix = getSchemaPrefix(serverName);
|
||||
this.schemasByTableId = new SchemasByTableId(tableIdCaseInsensitive);
|
||||
|
@ -5,24 +5,24 @@
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.function.Predicates;
|
||||
|
||||
import io.debezium.relational.Selectors;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.relational.ColumnId;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.util.Strings;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.function.Predicates;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ColumnId;
|
||||
import io.debezium.relational.Selectors;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* A set of {@link ColumnMapper} objects for columns.
|
||||
*
|
||||
@ -36,10 +36,24 @@ public class ColumnMappers {
|
||||
*
|
||||
* @return the builder; never null
|
||||
*/
|
||||
public static Builder create() {
|
||||
public static Builder build() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a new {@link ColumnMappers} instance based on the given configuration.
|
||||
*/
|
||||
public static ColumnMappers create(Configuration config) {
|
||||
Builder builder = new Builder();
|
||||
|
||||
// Define the truncated, masked, and mapped columns ...
|
||||
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings);
|
||||
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", builder::maskStrings);
|
||||
config.forEachMatchingFieldName("column\\.add\\.original\\.type", builder::addOriginalType);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* A builder of {@link Selectors}.
|
||||
*
|
||||
|
@ -5,13 +5,14 @@
|
||||
*/
|
||||
package io.debezium.relational.mapping;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.sql.Types;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.ValueConverter;
|
||||
@ -22,7 +23,7 @@
|
||||
*/
|
||||
public class ColumnMappersTest {
|
||||
|
||||
private TableId tableId = new TableId("db", null, "A");
|
||||
private final TableId tableId = new TableId("db", null, "A");
|
||||
private Column column;
|
||||
private Column column2;
|
||||
private Column column3;
|
||||
@ -41,14 +42,22 @@ public void beforeEach() {
|
||||
|
||||
@Test
|
||||
public void shouldNotFindMapperForUnmatchedColumn() {
|
||||
mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames, 10).build();
|
||||
Configuration config = Configuration.create()
|
||||
.with("column.truncate.to.10.chars", fullyQualifiedNames)
|
||||
.build();
|
||||
|
||||
mappers = ColumnMappers.create(config);
|
||||
converter = mappers.mappingConverterFor(tableId, column2);
|
||||
assertThat(converter).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldTruncateStrings() {
|
||||
mappers = ColumnMappers.create().truncateStrings(fullyQualifiedNames.toUpperCase(), 10).build(); // inexact case
|
||||
Configuration config = Configuration.create()
|
||||
.with("column.truncate.to.10.chars", fullyQualifiedNames.toUpperCase())
|
||||
.build();
|
||||
|
||||
mappers = ColumnMappers.create(config);
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890").toString()).isEqualTo("1234567890");
|
||||
@ -65,7 +74,12 @@ public void shouldTruncateStrings() {
|
||||
@Test
|
||||
public void shouldMaskStringsToFixedLength() {
|
||||
String maskValue = "**********";
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length()).build(); // exact case
|
||||
|
||||
Configuration config = Configuration.create()
|
||||
.with("column.mask.with.10.chars", fullyQualifiedNames)
|
||||
.build();
|
||||
|
||||
mappers = ColumnMappers.create(config); // exact case
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
@ -79,7 +93,7 @@ public void shouldMaskStringsToFixedLength() {
|
||||
public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() {
|
||||
char maskChar = '=';
|
||||
String maskValue = Strings.createString(maskChar, 10);
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue.length(), maskChar).build();
|
||||
mappers = ColumnMappers.build().maskStrings(fullyQualifiedNames, maskValue.length(), maskChar).build();
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
@ -92,7 +106,7 @@ public void shouldMaskStringsToFixedNumberOfSpecifiedCharacters() {
|
||||
@Test
|
||||
public void shouldMaskStringsWithSpecificValue() {
|
||||
String maskValue = "*-*-*-*-*";
|
||||
mappers = ColumnMappers.create().maskStrings(fullyQualifiedNames, maskValue).build(); // exact case
|
||||
mappers = ColumnMappers.build().maskStrings(fullyQualifiedNames, maskValue).build(); // exact case
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("12345678901234567890")).isEqualTo(maskValue);
|
||||
@ -105,7 +119,7 @@ public void shouldMaskStringsWithSpecificValue() {
|
||||
@Test
|
||||
public void shouldMapValuesUsingColumnMapperInstance() {
|
||||
RepeatingColumnMapper mapper = new RepeatingColumnMapper();
|
||||
mappers = ColumnMappers.create().map(fullyQualifiedNames, mapper).build();
|
||||
mappers = ColumnMappers.build().map(fullyQualifiedNames, mapper).build();
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("1234")).isEqualTo("12341234");
|
||||
@ -115,7 +129,7 @@ public void shouldMapValuesUsingColumnMapperInstance() {
|
||||
|
||||
@Test
|
||||
public void shouldMapValuesUsingFunctionByClassName() {
|
||||
mappers = ColumnMappers.create().map(fullyQualifiedNames, RepeatingColumnMapper.class.getName()).build();
|
||||
mappers = ColumnMappers.build().map(fullyQualifiedNames, RepeatingColumnMapper.class.getName()).build();
|
||||
converter = mappers.mappingConverterFor(tableId, column);
|
||||
assertThat(converter).isNotNull();
|
||||
assertThat(converter.convert("1234")).isEqualTo("12341234");
|
||||
|
Loading…
Reference in New Issue
Block a user