DBZ-1134 Making converter registry thread-safe; misc. clean-up

This commit is contained in:
Gunnar Morling 2020-02-27 12:37:25 +01:00
parent 096d13f266
commit e7fd7acab8
4 changed files with 20 additions and 12 deletions

View File

@ -12,7 +12,7 @@
import java.lang.annotation.Target;
@Documented
@Target(ElementType.TYPE)
@Target({ElementType.TYPE, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Immutable {
}

View File

@ -10,6 +10,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -346,16 +347,16 @@ public CustomConverterRegistry customConverterRegistry() {
@SuppressWarnings("unchecked")
private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters() {
final List<CustomConverter<SchemaBuilder, ConvertedField>> converters = new ArrayList<>();
final String converterNameList = config.getString(CUSTOM_CONVERTERS);
final List<String> converterNames = Strings.listOf(converterNameList, x -> x.split(","), String::trim);
for (String name : converterNames) {
final CustomConverter<SchemaBuilder, ConvertedField> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
converter.configure(config.subset(name, true).asProperties());
converters.add(converter);
}
return converters;
return converterNames.stream()
.map(name -> {
CustomConverter<SchemaBuilder, ConvertedField> converter = config.getInstance(name + CONVERTER_TYPE_SUFFIX, CustomConverter.class);
converter.configure(config.subset(name, true).asProperties());
return converter;
})
.collect(Collectors.toList());
}
@SuppressWarnings("unchecked")

View File

@ -14,6 +14,9 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.annotation.GuardedBy;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.CustomConverter.ConverterDefinition;
@ -25,9 +28,13 @@
* @author Jiri Pechanec
*
*/
@ThreadSafe
public class CustomConverterRegistry {
@Immutable
private final List<CustomConverter<SchemaBuilder, ConvertedField>> converters;
@GuardedBy("registerConverterFor")
private final Map<String, ConverterDefinition<SchemaBuilder>> conversionFunctionMap = new HashMap<>();
public CustomConverterRegistry(List<CustomConverter<SchemaBuilder, ConvertedField>> converters) {
@ -35,7 +42,7 @@ public CustomConverterRegistry(List<CustomConverter<SchemaBuilder, ConvertedFiel
this.converters = Collections.emptyList();
}
else {
this.converters = converters;
this.converters = Collections.unmodifiableList(converters);
}
}
@ -46,7 +53,7 @@ public CustomConverterRegistry(List<CustomConverter<SchemaBuilder, ConvertedFiel
* @param column the column metadata
* @return the schema of the value generated by the converter or empty if converter does not support the column
*/
public Optional<SchemaBuilder> registerConverterFor(TableId table, Column column) {
public synchronized Optional<SchemaBuilder> registerConverterFor(TableId table, Column column) {
final String fullColumnName = fullColumnName(table, column);
for (CustomConverter<SchemaBuilder, ConvertedField> converter : converters) {

View File

@ -364,9 +364,9 @@ private ValueConverter wrapInMappingConverterIfNeeded(ColumnMappers mappers, Tab
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
*/
protected void addField(SchemaBuilder builder, Table table, Column column, ColumnMapper mapper) {
final Optional<SchemaBuilder> customSchema = customConverterRegistry.registerConverterFor(table.id(), column);
SchemaBuilder fieldBuilder = customConverterRegistry.registerConverterFor(table.id(), column)
.orElse(valueConverterProvider.schemaBuilder(column));
SchemaBuilder fieldBuilder = customSchema.isPresent() ? customSchema.get() : valueConverterProvider.schemaBuilder(column);
if (fieldBuilder != null) {
if (mapper != null) {
// Let the mapper add properties to the schema ...