From 61cc791d3235fd15a848687ec77829a3cd37b8ea Mon Sep 17 00:00:00 2001 From: Fabio Cantarini Date: Sun, 1 Mar 2020 14:36:12 +0100 Subject: [PATCH] DBZ-1830 Add support for new connector property datatype.propagate.source.type --- .../mysql/MySqlSourceTypeInSchemaIT.java | 96 +++++++++++++++++++ .../java/io/debezium/function/Predicates.java | 25 +++++ .../relational/mapping/ColumnMappers.java | 30 ++++-- 3 files changed, 144 insertions(+), 7 deletions(-) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSourceTypeInSchemaIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSourceTypeInSchemaIT.java index ffaf4b7c8..e4dda3483 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSourceTypeInSchemaIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSourceTypeInSchemaIT.java @@ -156,4 +156,100 @@ public void shouldPropagateSourceTypeAsSchemaParameter() throws SQLException, In assertThat(f2SchemaParameters).includes( entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "8"), entry(TYPE_SCALE_PARAMETER_KEY, "4")); } + + @Test + @FixFor("DBZ-1830") + public void shouldPropagateSourceTypeByDatatype() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER) + .with("datatype.propagate.source.type", "FLOAT,VARCHAR") + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + int numCreateDatabase = 1; + int numCreateTables = 1; + int numInserts = 1; + SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts); + stopConnector(); + assertThat(records).isNotNull(); + records.forEach(this::validate); + + List dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_644_source_type_mapped_as_schema_parameter_test")); + assertThat(dmls).hasSize(1); + + SourceRecord insert = dmls.get(0); + Field before = insert.valueSchema().field("before"); + + // no type info requested as per given datatypes + Map idSchemaParameters = before + .schema() + .field("id") + .schema() + .parameters(); + + assertThat(idSchemaParameters).isNull(); + + // no type info requested as per given datatypes + Map c1SchemaParameters = before + .schema() + .field("c1") + .schema() + .parameters(); + + assertThat(c1SchemaParameters).isNull(); + + // no type info requested as per given datatypes + Map c2SchemaParameters = before + .schema() + .field("c2") + .schema() + .parameters(); + + assertThat(c2SchemaParameters).isNull(); + + // no type info requested as per given datatypes + Map c3aSchemaParameters = before + .schema() + .field("c3a") + .schema() + .parameters(); + + assertThat(c3aSchemaParameters).excludes(entry(TYPE_NAME_PARAMETER_KEY, "NUMERIC")); + + // variable width, name and length info + Map c3bSchemaParameters = before + .schema() + .field("c3b") + .schema() + .parameters(); + + assertThat(c3bSchemaParameters).includes( + entry(TYPE_NAME_PARAMETER_KEY, "VARCHAR"), entry(TYPE_LENGTH_PARAMETER_KEY, "128")); + + // float info + Map f1SchemaParameters = before + .schema() + .field("f1") + .schema() + .parameters(); + + assertThat(f1SchemaParameters).includes( + entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "10")); + + Map f2SchemaParameters = before + .schema() + .field("f2") + .schema() + .parameters(); + + assertThat(f2SchemaParameters).includes( + entry(TYPE_NAME_PARAMETER_KEY, "FLOAT"), entry(TYPE_LENGTH_PARAMETER_KEY, "8"), entry(TYPE_SCALE_PARAMETER_KEY, "4")); + } } diff --git a/debezium-core/src/main/java/io/debezium/function/Predicates.java b/debezium-core/src/main/java/io/debezium/function/Predicates.java index 55d684178..c4c8e4d95 100644 --- a/debezium-core/src/main/java/io/debezium/function/Predicates.java +++ b/debezium-core/src/main/java/io/debezium/function/Predicates.java @@ -12,6 +12,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Pattern; @@ -199,10 +201,19 @@ public static Predicate includes(String regexPatterns, Function BiPredicate includes(String regexPatterns, BiFunction conversion) { + Set patterns = Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE); + return includedInPatterns(patterns, conversion); + } + protected static Predicate includedInPatterns(Collection patterns, Function conversion) { return (t) -> matchedByPattern(patterns, conversion).apply(t).isPresent(); } + protected static BiPredicate includedInPatterns(Collection patterns, BiFunction conversion) { + return (t, u) -> matchedByPattern(patterns, conversion).apply(t, u).isPresent(); + } + /** * Generate a predicate function that for any supplied string returns a {@link Pattern} representing the first regular expression * in the supplied comma-separated list that matches the predicate parameter in a case-insensitive manner. @@ -230,6 +241,20 @@ protected static Function> matchedByPattern(Collection< }; } + protected static BiFunction> matchedByPattern(Collection patterns, BiFunction conversion) { + return (t, u) -> { + String str = conversion.apply(t, u); + if (str != null) { + for (Pattern p : patterns) { + if (p.matcher(str).matches()) { + return Optional.of(p); + } + } + } + return Optional.empty(); + }; + } + protected static Predicate includedInLiterals(Collection literals, Function conversion) { return (s) -> { String str = conversion.apply(s).toLowerCase(); diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java index 4f3ccf726..f0854e61e 100644 --- a/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.function.BiPredicate; import java.util.function.Predicate; import org.apache.kafka.connect.errors.ConnectException; @@ -50,6 +51,7 @@ public static ColumnMappers create(Configuration config) { config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", builder::truncateStrings); config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", builder::maskStrings); config.forEachMatchingFieldName("column\\.propagate\\.source\\.type", builder::propagateSourceTypeToSchemaParameter); + config.forEachMatchingFieldName("datatype\\.propagate\\.source\\.type", builder::propagateSourceTypeToSchemaParameterByDatatype); return builder.build(); } @@ -73,7 +75,18 @@ public static class Builder { * @return this object so that methods can be chained together; never null */ public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) { - Predicate columnMatcher = Predicates.includes(fullyQualifiedColumnNames, ColumnId::toString); + BiPredicate columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> fullyQualifiedColumnName(tableId, column)); + rules.add(new MapperRule(columnMatcher, mapper)); + return this; + } + + public static String fullyQualifiedColumnName(TableId tableId, Column column) { + ColumnId id = new ColumnId(tableId, column.name()); + return id.toString(); + } + + public Builder mapByDatatype(String columnDatatypes, ColumnMapper mapper) { + BiPredicate columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> column.typeName()); rules.add(new MapperRule(columnMatcher, mapper)); return this; } @@ -158,6 +171,10 @@ public Builder propagateSourceTypeToSchemaParameter(String fullyQualifiedColumnN return map(value, new PropagateSourceTypeToSchemaParameter()); } + public Builder propagateSourceTypeToSchemaParameterByDatatype(String columnDatatypes, String value) { + return mapByDatatype(value, new PropagateSourceTypeToSchemaParameter()); + } + /** * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular * expression patterns. @@ -250,8 +267,7 @@ public ValueConverter mappingConverterFor(TableId tableId, Column column) { * @return the mapping function, or null if there is no mapping function */ public ColumnMapper mapperFor(TableId tableId, Column column) { - ColumnId id = new ColumnId(tableId, column.name()); - Optional matchingRule = rules.stream().filter(rule -> rule.matches(id)).findFirst(); + Optional matchingRule = rules.stream().filter(rule -> rule.matches(tableId, column)).findFirst(); if (matchingRule.isPresent()) { return matchingRule.get().mapper; } @@ -260,16 +276,16 @@ public ColumnMapper mapperFor(TableId tableId, Column column) { @Immutable protected static final class MapperRule { - protected final Predicate predicate; + protected final BiPredicate predicate; protected final ColumnMapper mapper; - protected MapperRule(Predicate predicate, ColumnMapper mapper) { + protected MapperRule(BiPredicate predicate, ColumnMapper mapper) { this.predicate = predicate; this.mapper = mapper; } - protected boolean matches(ColumnId id) { - return predicate.test(id); + protected boolean matches(TableId tableId, Column column) { + return predicate.test(tableId, column); } }