DBZ-1134 Make converted field type parametrized

This commit is contained in:
Jiri Pechanec 2020-02-20 08:42:04 +01:00 committed by Gunnar Morling
parent 06e1eca085
commit d13e48cc55
7 changed files with 231 additions and 34 deletions

View File

@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.converter;
import io.debezium.annotation.Incubating;
/**
* An the base interface for a converted field that provides naming characteristics.
*
* @author Jiri Pechanec
*/
@Incubating
public interface ConvertedField {
/**
* @return the name of the data field in data collection (like column in a table)
*/
String name();
/**
* @return the qualified name of the data collection (like a table)
*/
String dataCollection();
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi;
package io.debezium.spi.converter;
import java.util.Optional;
import java.util.Properties;
@ -16,7 +16,7 @@
* @author Jiri Pechanec
*/
@Incubating
public interface CustomConverter<S> {
public interface CustomConverter<S, F extends ConvertedField> {
/**
* An Actual conversion converting data from one type to another.
@ -48,10 +48,8 @@ public ConverterDefinition(S fieldSchema, Converter converter) {
/**
* A custom converter injected by the Debezium user.
*
* @param fieldType - full description of the field type, same as {@link }
* @param fieldName - name of the field
* @param dataCollectionName - fully qualified name of the data collection
* @param field - converted field metadata
* @return empty if the converter is not applicable for this field
*/
Optional<ConverterDefinition<S>> converterFor(String fieldType, String fieldName, String dataCollectionName);
Optional<ConverterDefinition<S>> converterFor(F field);
}

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.converter;
import java.sql.Types;
import java.util.Optional;
import io.debezium.annotation.Incubating;
/**
* A definition of a converted relational column.
*
* @author Randall Hauch
*/
@Incubating
public interface RelationalColumn extends ConvertedField {
/**
* Get the {@link Types JDBC type} for this column
*
* @return the type constant
*/
int jdbcType();
/**
* Get the database native type for this column
*
* @return a type constant for the specific database
*/
int nativeType();
/**
* Get the database-specific name of the column's data type.
*
* @return the name of the type
*/
String typeName();
/**
* Get the database-specific complete expression defining the column's data type, including dimensions, length, precision,
* character sets, constraints, etc.
*
* @return the complete type expression
*/
String typeExpression();
/**
* Get the maximum length of this column's values. For numeric columns, this represents the precision.
*
* @return the length of the column
*/
int length();
/**
* Get the scale of the column.
*
* @return the scale if it applies to this type
*/
Optional<Integer> scale();
/**
* Determine whether this column is optional.
*
* @return {@code true} if it is optional, or {@code false} otherwise
*/
boolean isOptional();
/**
* Get the default value of the column
*
* @return the default value
*/
Object defaultValue();
/**
* Determine whether this column's has a default value
*
* @return {@code true} if the default value was provided, or {@code false} otherwise
*/
boolean hasDefaultValue();
}

View File

@ -12,25 +12,53 @@
import org.fest.assertions.Assertions;
import org.junit.Test;
import io.debezium.spi.CustomConverter.ConverterDefinition;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.CustomConverter.ConverterDefinition;
/**
* @author Jiri Pechanec
*/
public class ValueConverterTest {
final CustomConverter<SchemaBuilder> testConverter = new CustomConverter<SchemaBuilder>() {
private String field = "myfield";
public static class BasicField implements ConvertedField {
private final String name;
private final String dataCollection;
private final String type;
@Override
public void configure(Properties props) {
field = props.getProperty("field", field);
public BasicField(String name, String dataCollection, String type) {
super();
this.name = name;
this.dataCollection = dataCollection;
this.type = type;
}
@Override
public Optional<ConverterDefinition<SchemaBuilder>> converterFor(String fieldType, String fieldName,
String dataCollectionName) {
if (field.equals(fieldName)) {
public String name() {
return name;
}
@Override
public String dataCollection() {
return dataCollection;
}
public String type() {
return type;
}
}
final CustomConverter<SchemaBuilder, BasicField> testConverter = new CustomConverter<SchemaBuilder, BasicField>() {
private String convertedField = "myfield";
@Override
public void configure(Properties props) {
convertedField = props.getProperty("field", convertedField);
}
@Override
public Optional<ConverterDefinition<SchemaBuilder>> converterFor(BasicField field) {
if (convertedField.equals(field.name())) {
return Optional.of(new ConverterDefinition<>(SchemaBuilder.string().name("CUSTOM_STRING").optional(), (x) -> {
if (x instanceof Integer) {
return Integer.toString((Integer) x);
@ -45,7 +73,7 @@ public Optional<ConverterDefinition<SchemaBuilder>> converterFor(String fieldTyp
@Test
public void matchingField() {
testConverter.configure(new Properties());
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor("VARCHAR2(30)", "myfield", "db1.table1").get();
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)")).get();
Assertions.assertThat(definition.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
Assertions.assertThat(definition.converter.convert(34)).isEqualTo("34");
}
@ -53,7 +81,7 @@ public void matchingField() {
@Test
public void nonMatchingField() {
testConverter.configure(new Properties());
Assertions.assertThat(testConverter.converterFor("VARCHAR2(30)", "wrongfield", "db1.table1").isPresent()).isFalse();
Assertions.assertThat(testConverter.converterFor(new BasicField("wrongfield", "db1.table1", "VARCHAR2(30)")).isPresent()).isFalse();
}
@Test
@ -61,9 +89,9 @@ public void configuredField() {
final Properties props = new Properties();
props.setProperty("field", "otherfield");
testConverter.configure(props);
Assertions.assertThat(testConverter.converterFor("VARCHAR2(30)", "myfield", "db1.table1").isPresent()).isFalse();
Assertions.assertThat(testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)")).isPresent()).isFalse();
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor("VARCHAR2(30)", "otherfield", "db1.table1").get();
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor(new BasicField("otherfield", "db1.table1", "VARCHAR2(30)")).get();
Assertions.assertThat(definition.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
Assertions.assertThat(definition.converter.convert(34)).isEqualTo("34");
}

View File

@ -52,7 +52,8 @@
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.spi.CustomConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@ -98,7 +99,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
}
}
public static class CustomDatatypeConverter implements CustomConverter<SchemaBuilder> {
public static class CustomDatatypeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@ -108,9 +109,8 @@ public void configure(Properties props) {
}
@Override
public Optional<ConverterDefinition<SchemaBuilder>> converterFor(String fieldType, String fieldName,
String dataCollectionName) {
if ("isbn".equals(fieldType)) {
public Optional<ConverterDefinition<SchemaBuilder>> converterFor(RelationalColumn column) {
if ("isbn".equals(column.typeName())) {
return Optional.of(new ConverterDefinition<>(isbnSchema, x -> x.toString()));
}
return Optional.empty();

View File

@ -22,7 +22,8 @@
import io.debezium.heartbeat.Heartbeat;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.spi.CustomConverter;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.util.Strings;
/**
@ -344,13 +345,13 @@ public CustomConverterRegistry customConverterRegistry() {
}
@SuppressWarnings("unchecked")
private List<CustomConverter<SchemaBuilder>> getCustomConverters() {
final List<CustomConverter<SchemaBuilder>> converters = new ArrayList<>();
private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters() {
final List<CustomConverter<SchemaBuilder, ConvertedField>> converters = new ArrayList<>();
final String converterNameList = config.getString(CUSTOM_CONVERTERS);
final List<String> converterNames = Strings.listOf(converterNameList, x -> x.split(","), String::trim);
for (String name : converterNames) {
final CustomConverter<SchemaBuilder> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
final CustomConverter<SchemaBuilder, ConvertedField> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
converter.configure(config.subset(name, true).asProperties());
converters.add(converter);
}

View File

@ -13,8 +13,10 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.spi.CustomConverter;
import io.debezium.spi.CustomConverter.ConverterDefinition;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.CustomConverter.ConverterDefinition;
import io.debezium.spi.converter.RelationalColumn;
/**
* The registry of all converters that were provided by the connector configuration.
@ -24,10 +26,10 @@
*/
public class CustomConverterRegistry {
private final List<CustomConverter<SchemaBuilder>> converters;
private final List<CustomConverter<SchemaBuilder, ConvertedField>> converters;
private final Map<String, ConverterDefinition<SchemaBuilder>> conversionFunctionMap = new HashMap<>();
public CustomConverterRegistry(List<CustomConverter<SchemaBuilder>> converters) {
public CustomConverterRegistry(List<CustomConverter<SchemaBuilder, ConvertedField>> converters) {
if (converters == null) {
this.converters = Collections.emptyList();
}
@ -46,8 +48,64 @@ public CustomConverterRegistry(List<CustomConverter<SchemaBuilder>> converters)
public Optional<SchemaBuilder> registerConverterFor(TableId table, Column column) {
final String fullColumnName = fullColumnName(table, column);
for (CustomConverter<SchemaBuilder> converter : converters) {
final Optional<ConverterDefinition<SchemaBuilder>> definition = converter.converterFor(column.typeExpression(), column.name(), table.toString());
for (CustomConverter<SchemaBuilder, ConvertedField> converter : converters) {
final Optional<ConverterDefinition<SchemaBuilder>> definition = converter.converterFor(new RelationalColumn() {
@Override
public String name() {
return column.name();
}
@Override
public String dataCollection() {
return table.toString();
}
@Override
public String typeName() {
return column.typeName();
}
@Override
public String typeExpression() {
return column.typeExpression();
}
@Override
public Optional<Integer> scale() {
return column.scale();
}
@Override
public int nativeType() {
return column.nativeType();
}
@Override
public int length() {
return column.length();
}
@Override
public int jdbcType() {
return column.jdbcType();
}
@Override
public boolean isOptional() {
return column.isOptional();
}
@Override
public boolean hasDefaultValue() {
return column.hasDefaultValue();
}
@Override
public Object defaultValue() {
return column.defaultValue();
}
});
if (definition.isPresent()) {
conversionFunctionMap.put(fullColumnName, definition.get());
return Optional.of(definition.get().fieldSchema);