diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java index 81d355ada..d23f59992 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java @@ -99,6 +99,7 @@ public Filters(Configuration config) { ColumnMappers.Builder columnMapperBuilder = ColumnMappers.create(); config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", columnMapperBuilder::truncateStrings); config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", columnMapperBuilder::maskStrings); + config.forEachMatchingFieldName("column\\.add\\.original\\.type", columnMapperBuilder::addOriginalType); this.columnMappers = columnMapperBuilder.build(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/AddOriginalDataType.java b/debezium-core/src/main/java/io/debezium/relational/mapping/AddOriginalDataType.java new file mode 100644 index 000000000..15639aca6 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/AddOriginalDataType.java @@ -0,0 +1,22 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.mapping; + +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; +import org.apache.kafka.connect.data.SchemaBuilder; + +public class AddOriginalDataType implements ColumnMapper { + @Override + public ValueConverter create(Column column) { + return null; + } + + @Override + public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) { + schemaBuilder.parameter("originalType", column.typeName().toUpperCase()); + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java index 458139b17..fa81e0331 100644 --- a/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java +++ b/debezium-core/src/main/java/io/debezium/relational/mapping/ColumnMappers.java @@ -5,24 +5,24 @@ */ package io.debezium.relational.mapping; +import io.debezium.annotation.Immutable; +import io.debezium.config.Configuration; +import io.debezium.function.Predicates; + +import io.debezium.relational.Selectors; +import io.debezium.relational.ValueConverter; +import io.debezium.relational.ColumnId; +import io.debezium.relational.TableId; +import io.debezium.relational.Table; +import io.debezium.relational.Column; +import io.debezium.util.Strings; +import org.apache.kafka.connect.errors.ConnectException; + import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.Predicate; -import org.apache.kafka.connect.errors.ConnectException; - -import io.debezium.annotation.Immutable; -import io.debezium.config.Configuration; -import io.debezium.function.Predicates; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnId; -import io.debezium.relational.Selectors; -import io.debezium.relational.Table; -import io.debezium.relational.TableId; -import io.debezium.relational.ValueConverter; -import io.debezium.util.Strings; - /** * A set of {@link ColumnMapper} objects for columns. * @@ -140,6 +140,10 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) { return map(fullyQualifiedColumnNames, new MaskStrings(maskValue)); } + public Builder addOriginalType(String fullyQualifiedColumnNames, String value) { + return map(value, new AddOriginalDataType()); + } + /** * Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular * expression patterns.