DBZ-7024 Add possibility to specify engine builder factory

Also add converting builder factory for async engine into SPI service.
This commit is contained in:
Vojtech Juranek 2024-02-10 17:55:50 +01:00 committed by Jiri Pechanec
parent dbdb052535
commit 7789d995e5
2 changed files with 37 additions and 2 deletions

View File

@ -300,6 +300,13 @@ static <K, V, H> Builder<ChangeEvent<K, V>> create(Class<? extends Serialization
return create(KeyValueHeaderChangeEventFormat.of(keyFormat, valueFormat, headerFormat)); return create(KeyValueHeaderChangeEventFormat.of(keyFormat, valueFormat, headerFormat));
} }
static <K, V, H> Builder<ChangeEvent<K, V>> create(Class<? extends SerializationFormat<K>> keyFormat,
Class<? extends SerializationFormat<V>> valueFormat,
Class<? extends SerializationFormat<H>> headerFormat,
String builderFactory) {
return create(KeyValueHeaderChangeEventFormat.of(keyFormat, valueFormat, headerFormat), builderFactory);
}
static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> create(KeyValueChangeEventFormat<K, V> format) { static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> create(KeyValueChangeEventFormat<K, V> format) {
final BuilderFactory builder = determineBuilderFactory(); final BuilderFactory builder = determineBuilderFactory();
return builder.builder(format); return builder.builder(format);
@ -310,6 +317,12 @@ static <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat
return builder.builder(format); return builder.builder(format);
} }
static <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat<T>, H extends SerializationFormat<U>> Builder<ChangeEvent<S, T>> create(KeyValueHeaderChangeEventFormat<K, V, H> format,
String builderFactory) {
final BuilderFactory builder = determineBuilderFactory(builderFactory);
return builder.builder(format);
}
/** /**
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link DebeziumEngine} instances. * Obtain a new {@link Builder} instance that can be used to construct runnable {@link DebeziumEngine} instances.
* Emitted change events encapsulate both key and value. * Emitted change events encapsulate both key and value.
@ -329,11 +342,32 @@ private static BuilderFactory determineBuilderFactory() {
} }
final BuilderFactory builder = iterator.next(); final BuilderFactory builder = iterator.next();
if (iterator.hasNext()) { if (iterator.hasNext()) {
LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass()); LoggerFactory.getLogger(Builder.class)
.warn("More than one Debezium engine builder implementation was found, using {} (in Debezium 2.6 you can ignore this warning)", builder.getClass());
} }
return builder; return builder;
} }
private static BuilderFactory determineBuilderFactory(String builderFactory) {
if (builderFactory == null || builderFactory.isBlank()) {
return determineBuilderFactory();
}
final ServiceLoader<BuilderFactory> loader = ServiceLoader.load(BuilderFactory.class);
final Iterator<BuilderFactory> iterator = loader.iterator();
if (!iterator.hasNext()) {
throw new DebeziumException("No implementation of Debezium engine builder was found");
}
BuilderFactory builder;
while (iterator.hasNext()) {
builder = iterator.next();
if (builder.getClass().getName().equalsIgnoreCase(builderFactory)) {
return builder;
}
}
throw new DebeziumException(String.format("No builder factory '%s' found.", builderFactory));
}
/** /**
* Internal contract between the API and implementation, for bootstrapping the latter. * Internal contract between the API and implementation, for bootstrapping the latter.
* Not intended for direct usage by application code. * Not intended for direct usage by application code.

View File

@ -1 +1,2 @@
io.debezium.embedded.ConvertingEngineBuilderFactory io.debezium.embedded.ConvertingEngineBuilderFactory
io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory