DBZ-644 add JDBC data type as additional metadata to DBZ events, if configuration is opted in

This commit is contained in:
orr.ganani 2018-04-22 10:34:09 +03:00 committed by Jiri Pechanec
parent 0182eecd06
commit 747184c572
3 changed files with 40 additions and 13 deletions

View File

@ -99,6 +99,7 @@ public Filters(Configuration config) {
ColumnMappers.Builder columnMapperBuilder = ColumnMappers.create();
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", columnMapperBuilder::truncateStrings);
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", columnMapperBuilder::maskStrings);
config.forEachMatchingFieldName("column\\.add\\.original\\.type", columnMapperBuilder::addOriginalType);
this.columnMappers = columnMapperBuilder.build();
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import org.apache.kafka.connect.data.SchemaBuilder;
public class AddOriginalDataType implements ColumnMapper {
@Override
public ValueConverter create(Column column) {
return null;
}
@Override
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
schemaBuilder.parameter("originalType", column.typeName().toUpperCase());
}
}

View File

@ -5,24 +5,24 @@
*/
package io.debezium.relational.mapping;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.function.Predicates;
import io.debezium.relational.Selectors;
import io.debezium.relational.ValueConverter;
import io.debezium.relational.ColumnId;
import io.debezium.relational.TableId;
import io.debezium.relational.Table;
import io.debezium.relational.Column;
import io.debezium.util.Strings;
import org.apache.kafka.connect.errors.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.function.Predicates;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnId;
import io.debezium.relational.Selectors;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.ValueConverter;
import io.debezium.util.Strings;
/**
* A set of {@link ColumnMapper} objects for columns.
*
@ -140,6 +140,10 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
return map(fullyQualifiedColumnNames, new MaskStrings(maskValue));
}
public Builder addOriginalType(String fullyQualifiedColumnNames, String value) {
return map(value, new AddOriginalDataType());
}
/**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
* expression patterns.