DBZ-1134 Fix tests for new API
This commit is contained in:
parent
0f6d2c54fc
commit
7d525d4baa
@ -5,16 +5,17 @@
|
|||||||
*/
|
*/
|
||||||
package io.debezium.spi;
|
package io.debezium.spi;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
import org.fest.assertions.Assertions;
|
import org.fest.assertions.Assertions;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import io.debezium.spi.converter.ConvertedField;
|
import io.debezium.spi.converter.ConvertedField;
|
||||||
import io.debezium.spi.converter.CustomConverter;
|
import io.debezium.spi.converter.CustomConverter;
|
||||||
import io.debezium.spi.converter.CustomConverter.ConverterDefinition;
|
import io.debezium.spi.converter.CustomConverter.Converter;
|
||||||
|
import io.debezium.spi.converter.CustomConverter.ConverterRegistration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Jiri Pechanec
|
* @author Jiri Pechanec
|
||||||
@ -57,31 +58,49 @@ public void configure(Properties props) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<ConverterDefinition<SchemaBuilder>> converterFor(BasicField field) {
|
public void converterFor(BasicField field, ConverterRegistration<SchemaBuilder> registration) {
|
||||||
if (convertedField.equals(field.name())) {
|
if (convertedField.equals(field.name())) {
|
||||||
return Optional.of(new ConverterDefinition<>(SchemaBuilder.string().name("CUSTOM_STRING").optional(), (x) -> {
|
registration.register(SchemaBuilder.string().name("CUSTOM_STRING").optional(), (x) -> {
|
||||||
if (x instanceof Integer) {
|
if (x instanceof Integer) {
|
||||||
return Integer.toString((Integer) x);
|
return Integer.toString((Integer) x);
|
||||||
}
|
}
|
||||||
return x.toString();
|
return x.toString();
|
||||||
}));
|
});
|
||||||
}
|
}
|
||||||
return Optional.empty();
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static class TestRegistration implements ConverterRegistration<SchemaBuilder> {
|
||||||
|
public SchemaBuilder fieldSchema;
|
||||||
|
public Converter converter;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(SchemaBuilder fieldSchema, Converter converter) {
|
||||||
|
this.fieldSchema = fieldSchema;
|
||||||
|
this.converter = converter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private TestRegistration testRegistration;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void before() {
|
||||||
|
testRegistration = new TestRegistration();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void matchingField() {
|
public void matchingField() {
|
||||||
testConverter.configure(new Properties());
|
testConverter.configure(new Properties());
|
||||||
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)")).get();
|
testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)"), testRegistration);
|
||||||
Assertions.assertThat(definition.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
|
Assertions.assertThat(testRegistration.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
|
||||||
Assertions.assertThat(definition.converter.convert(34)).isEqualTo("34");
|
Assertions.assertThat(testRegistration.converter.convert(34)).isEqualTo("34");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void nonMatchingField() {
|
public void nonMatchingField() {
|
||||||
testConverter.configure(new Properties());
|
testConverter.configure(new Properties());
|
||||||
Assertions.assertThat(testConverter.converterFor(new BasicField("wrongfield", "db1.table1", "VARCHAR2(30)")).isPresent()).isFalse();
|
testConverter.converterFor(new BasicField("wrongfield", "db1.table1", "VARCHAR2(30)"), testRegistration);
|
||||||
|
Assertions.assertThat(testRegistration.fieldSchema).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -89,10 +108,11 @@ public void configuredField() {
|
|||||||
final Properties props = new Properties();
|
final Properties props = new Properties();
|
||||||
props.setProperty("field", "otherfield");
|
props.setProperty("field", "otherfield");
|
||||||
testConverter.configure(props);
|
testConverter.configure(props);
|
||||||
Assertions.assertThat(testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)")).isPresent()).isFalse();
|
testConverter.converterFor(new BasicField("myfield", "db1.table1", "VARCHAR2(30)"), testRegistration);
|
||||||
|
Assertions.assertThat(testRegistration.fieldSchema).isNull();
|
||||||
|
|
||||||
final ConverterDefinition<SchemaBuilder> definition = testConverter.converterFor(new BasicField("otherfield", "db1.table1", "VARCHAR2(30)")).get();
|
testConverter.converterFor(new BasicField("otherfield", "db1.table1", "VARCHAR2(30)"), testRegistration);
|
||||||
Assertions.assertThat(definition.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
|
Assertions.assertThat(testRegistration.fieldSchema.name()).isEqualTo("CUSTOM_STRING");
|
||||||
Assertions.assertThat(definition.converter.convert(34)).isEqualTo("34");
|
Assertions.assertThat(testRegistration.converter.convert(34)).isEqualTo("34");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
import java.lang.annotation.Target;
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
@Documented
|
@Documented
|
||||||
@Target({ElementType.TYPE, ElementType.FIELD})
|
@Target({ ElementType.TYPE, ElementType.FIELD })
|
||||||
@Retention(RetentionPolicy.RUNTIME)
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
public @interface Immutable {
|
public @interface Immutable {
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@
|
|||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -351,12 +350,12 @@ private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters
|
|||||||
final List<String> converterNames = Strings.listOf(converterNameList, x -> x.split(","), String::trim);
|
final List<String> converterNames = Strings.listOf(converterNameList, x -> x.split(","), String::trim);
|
||||||
|
|
||||||
return converterNames.stream()
|
return converterNames.stream()
|
||||||
.map(name -> {
|
.map(name -> {
|
||||||
CustomConverter<SchemaBuilder, ConvertedField> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
|
CustomConverter<SchemaBuilder, ConvertedField> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
|
||||||
converter.configure(config.subset(name, true).asProperties());
|
converter.configure(config.subset(name, true).asProperties());
|
||||||
return converter;
|
return converter;
|
||||||
})
|
})
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
@ -116,13 +116,13 @@ public Object defaultValue() {
|
|||||||
return column.defaultValue();
|
return column.defaultValue();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
new CustomConverter.ConverterRegistration<SchemaBuilder>() {
|
new CustomConverter.ConverterRegistration<SchemaBuilder>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void register(SchemaBuilder fieldSchema, Converter converter) {
|
public void register(SchemaBuilder fieldSchema, Converter converter) {
|
||||||
definition.set(new ConverterDefinition<SchemaBuilder>(fieldSchema, converter));
|
definition.set(new ConverterDefinition<SchemaBuilder>(fieldSchema, converter));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if (definition.get() != null) {
|
if (definition.get() != null) {
|
||||||
conversionFunctionMap.put(fullColumnName, definition.get());
|
conversionFunctionMap.put(fullColumnName, definition.get());
|
||||||
|
@ -7,7 +7,6 @@
|
|||||||
|
|
||||||
import java.sql.Types;
|
import java.sql.Types;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
Loading…
Reference in New Issue
Block a user