DBZ-1830 Add support for new connector property datatype.propagate.source.type

This commit is contained in:
Fabio Cantarini 2020-03-01 14:36:12 +01:00 committed by Gunnar Morling
parent b1a3fcc516
commit 61cc791d32
3 changed files with 144 additions and 7 deletions

View File

@ -156,4 +156,100 @@ public void shouldPropagateSourceTypeAsSchemaParameter() throws SQLException, In
assertThat(f2SchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "8"), entry(TYPE_SCALE_PARAMETER_KEY, "4"));
}
@Test
@FixFor("DBZ-1830")
public void shouldPropagateSourceTypeByDatatype() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with("datatype.propagate.source.type", "FLOAT,VARCHAR")
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 1;
int numInserts = 1;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts);
stopConnector();
assertThat(records).isNotNull();
records.forEach(this::validate);
List<SourceRecord> dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_644_source_type_mapped_as_schema_parameter_test"));
assertThat(dmls).hasSize(1);
SourceRecord insert = dmls.get(0);
Field before = insert.valueSchema().field("before");
// no type info requested as per given datatypes
Map<String, String> idSchemaParameters = before
.schema()
.field("id")
.schema()
.parameters();
assertThat(idSchemaParameters).isNull();
// no type info requested as per given datatypes
Map<String, String> c1SchemaParameters = before
.schema()
.field("c1")
.schema()
.parameters();
assertThat(c1SchemaParameters).isNull();
// no type info requested as per given datatypes
Map<String, String> c2SchemaParameters = before
.schema()
.field("c2")
.schema()
.parameters();
assertThat(c2SchemaParameters).isNull();
// no type info requested as per given datatypes
Map<String, String> c3aSchemaParameters = before
.schema()
.field("c3a")
.schema()
.parameters();
assertThat(c3aSchemaParameters).excludes(entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC"));
// variable width, name and length info
Map<String, String> c3bSchemaParameters = before
.schema()
.field("c3b")
.schema()
.parameters();
assertThat(c3bSchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"), entry(TYPE_LENGTH_PARAMETER_KEY, "128"));
// float info
Map<String, String> f1SchemaParameters = before
.schema()
.field("f1")
.schema()
.parameters();
assertThat(f1SchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "10"));
Map<String, String> f2SchemaParameters = before
.schema()
.field("f2")
.schema()
.parameters();
assertThat(f2SchemaParameters).includes(
entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "8"), entry(TYPE_SCALE_PARAMETER_KEY, "4"));
}
}

View File

@ -12,6 +12,8 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@ -199,10 +201,19 @@ public static <T> Predicate<T> includes(String regexPatterns, Function<T, String
return includedInPatterns(patterns, conversion);
}
public static <T, U> BiPredicate<T, U> includes(String regexPatterns, BiFunction<T, U, String> conversion) {
Set<Pattern> patterns = Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE);
return includedInPatterns(patterns, conversion);
}
protected static <T> Predicate<T> includedInPatterns(Collection<Pattern> patterns, Function<T, String> conversion) {
return (t) -> matchedByPattern(patterns, conversion).apply(t).isPresent();
}
protected static <T, U> BiPredicate<T, U> includedInPatterns(Collection<Pattern> patterns, BiFunction<T, U, String> conversion) {
return (t, u) -> matchedByPattern(patterns, conversion).apply(t, u).isPresent();
}
/**
* Generate a predicate function that for any supplied string returns a {@link Pattern} representing the first regular expression
* in the supplied comma-separated list that matches the predicate parameter in a case-insensitive manner.
@ -230,6 +241,20 @@ protected static <T> Function<T, Optional<Pattern>> matchedByPattern(Collection<
};
}
protected static <T, U> BiFunction<T, U, Optional<Pattern>> matchedByPattern(Collection<Pattern> patterns, BiFunction<T, U, String> conversion) {
return (t, u) -> {
String str = conversion.apply(t, u);
if (str != null) {
for (Pattern p : patterns) {
if (p.matcher(str).matches()) {
return Optional.of(p);
}
}
}
return Optional.empty();
};
}
protected static <T> Predicate<T> includedInLiterals(Collection<String> literals, Function<T, String> conversion) {
return (s) -> {
String str = conversion.apply(s).toLowerCase();

View File

@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
@ -50,6 +51,7 @@ public static ColumnMappers create(Configuration config) {
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings);
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", builder::maskStrings);
config.forEachMatchingFieldName("column\\.propagate\\.source\\.type", builder::propagateSourceTypeToSchemaParameter);
config.forEachMatchingFieldName("datatype\\.propagate\\.source\\.type", builder::propagateSourceTypeToSchemaParameterByDatatype);
return builder.build();
}
@ -73,7 +75,18 @@ public static class Builder {
* @return this object so that methods can be chained together; never null
*/
public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) {
Predicate<ColumnId> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, ColumnId::toString);
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> fullyQualifiedColumnName(tableId, column));
rules.add(new MapperRule(columnMatcher, mapper));
return this;
}
public static String fullyQualifiedColumnName(TableId tableId, Column column) {
ColumnId id = new ColumnId(tableId, column.name());
return id.toString();
}
public Builder mapByDatatype(String columnDatatypes, ColumnMapper mapper) {
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> column.typeName());
rules.add(new MapperRule(columnMatcher, mapper));
return this;
}
@ -158,6 +171,10 @@ public Builder propagateSourceTypeToSchemaParameter(String fullyQualifiedColumnN
return map(value, new PropagateSourceTypeToSchemaParameter());
}
public Builder propagateSourceTypeToSchemaParameterByDatatype(String columnDatatypes, String value) {
return mapByDatatype(value, new PropagateSourceTypeToSchemaParameter());
}
/**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
* expression patterns.
@ -250,8 +267,7 @@ public ValueConverter mappingConverterFor(TableId tableId, Column column) {
* @return the mapping function, or null if there is no mapping function
*/
public ColumnMapper mapperFor(TableId tableId, Column column) {
ColumnId id = new ColumnId(tableId, column.name());
Optional<MapperRule> matchingRule = rules.stream().filter(rule -> rule.matches(id)).findFirst();
Optional<MapperRule> matchingRule = rules.stream().filter(rule -> rule.matches(tableId, column)).findFirst();
if (matchingRule.isPresent()) {
return matchingRule.get().mapper;
}
@ -260,16 +276,16 @@ public ColumnMapper mapperFor(TableId tableId, Column column) {
@Immutable
protected static final class MapperRule {
protected final Predicate<ColumnId> predicate;
protected final BiPredicate<TableId, Column> predicate;
protected final ColumnMapper mapper;
protected MapperRule(Predicate<ColumnId> predicate, ColumnMapper mapper) {
protected MapperRule(BiPredicate<TableId, Column> predicate, ColumnMapper mapper) {
this.predicate = predicate;
this.mapper = mapper;
}
protected boolean matches(ColumnId id) {
return predicate.test(id);
protected boolean matches(TableId tableId, Column column) {
return predicate.test(tableId, column);
}
}