DBZ-7024 Add converting builder for async engine

This commit is contained in:
Vojtech Juranek 2024-02-10 16:51:18 +01:00 committed by Jiri Pechanec
parent 004ebeff16
commit dbdb052535
11 changed files with 215 additions and 44 deletions

View File

@ -13,7 +13,7 @@
import io.debezium.engine.Header;
import io.debezium.engine.RecordChangeEvent;
class EmbeddedEngineChangeEvent<K, V, H> implements ChangeEvent<K, V>, RecordChangeEvent<V> {
public class EmbeddedEngineChangeEvent<K, V, H> implements ChangeEvent<K, V>, RecordChangeEvent<V> {
private final K key;
private final V value;

View File

@ -11,7 +11,6 @@
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.embedded.Transformations;
import io.debezium.engine.DebeziumEngine;
/**
* Abstract implementation of {@link RecordProcessor}, which provides implementation of processor initialization, while the record processing implementation
@ -20,13 +19,11 @@
public abstract class AbstractRecordProcessor<R> implements RecordProcessor<R> {
protected ExecutorService recordService;
protected Transformations transformations;
protected DebeziumEngine.RecordCommitter committer;
@Override
public void initialize(final ExecutorService recordService, final Transformations transformations, final DebeziumEngine.RecordCommitter committer) {
public void initialize(final ExecutorService recordService, final Transformations transformations) {
this.recordService = recordService;
this.transformations = transformations;
this.committer = committer;
}
@Override

View File

@ -42,6 +42,7 @@
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
@ -56,13 +57,18 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Instantiator;
import io.debezium.embedded.ConverterBuilder;
import io.debezium.embedded.DebeziumEngineCommon;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.EmbeddedWorkerConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.embedded.Transformations;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.source.EngineSourceConnector;
import io.debezium.engine.source.EngineSourceConnectorContext;
import io.debezium.engine.source.EngineSourceTask;
@ -94,7 +100,9 @@ public final class AsyncEmbeddedEngine<R> implements DebeziumEngine<R>, AsyncEng
private final OffsetCommitPolicy offsetCommitPolicy;
private final EngineSourceConnector connector;
private final Transformations transformations;
private final HeaderConverter headerConverter;
private final Function<SourceRecord, R> recordConverter;
private final Function<R, SourceRecord> sourceConverter;
private final AtomicReference<State> state = new AtomicReference<>(State.CREATING); // state must be changed only via setEngineState() method
private final List<EngineSourceTask> tasks = new ArrayList<>();
@ -110,6 +118,7 @@ private AsyncEmbeddedEngine(Properties config,
DebeziumEngine.CompletionCallback completionCallback,
DebeziumEngine.ConnectorCallback connectorCallback,
OffsetCommitPolicy offsetCommitPolicy,
HeaderConverter headerConverter,
Function<SourceRecord, R> recordConverter) {
this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified."));
@ -119,7 +128,9 @@ private AsyncEmbeddedEngine(Properties config,
this.clock = clock == null ? io.debezium.util.Clock.system() : clock;
this.completionCallback = completionCallback != null ? completionCallback : new DefaultCompletionCallback();
this.connectorCallback = Optional.ofNullable(connectorCallback);
this.headerConverter = headerConverter;
this.recordConverter = recordConverter;
this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent<?, ?, ?>) record).sourceRecord();
// Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant.
if (this.handler == null & this.consumer == null) {
@ -263,11 +274,19 @@ public void runWithTask(final Consumer<SourceTask> consumer) {
}
/**
* Shuts down the engine. Currently, it's limited only to stopping the source connector.
* Shuts down the engine. Currently, it's limited only to closing header converter and stopping the source connector.
*
* @param stateBeforeStop {@link State} of the engine when the shutdown was requested.
*/
private void close(final State stateBeforeStop) {
if (headerConverter != null) {
try {
headerConverter.close();
}
catch (IOException e) {
LOGGER.warn("Failed to close header converter: ", e);
}
}
stopConnector(tasks, stateBeforeStop);
}
@ -397,8 +416,8 @@ private void runTasksPolling(final List<EngineSourceTask> tasks)
final ExecutorCompletionService<Void> taskCompletionService = new ExecutorCompletionService(taskService);
final String processorClassName = selectRecordProcessor();
for (EngineSourceTask task : tasks) {
final RecordProcessor processor = createRecordProcessor(processorClassName);
processor.initialize(recordService, transformations, new SourceRecordCommitter(task));
final RecordProcessor processor = createRecordProcessor(processorClassName, task);
processor.initialize(recordService, transformations);
pollingFutures.add(taskCompletionService.submit(new PollRecords(task, processor, state)));
}
@ -456,24 +475,24 @@ private String selectRecordProcessor() {
*
* @return {@link RecordProcessor} instance which will be used for processing the records.
*/
private RecordProcessor createRecordProcessor(String processorClassName) {
private RecordProcessor createRecordProcessor(String processorClassName, EngineSourceTask task) {
if (ParallelSmtBatchProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtBatchProcessor((DebeziumEngine.ChangeConsumer<SourceRecord>) handler);
return new ParallelSmtBatchProcessor(new SourceRecordCommitter(task), (DebeziumEngine.ChangeConsumer<SourceRecord>) handler);
}
if (ParallelSmtAndConvertBatchProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtAndConvertBatchProcessor(handler, recordConverter);
return new ParallelSmtAndConvertBatchProcessor(new ConvertingRecordCommitter(task), handler, recordConverter);
}
if (ParallelSmtConsumerProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtConsumerProcessor((Consumer<SourceRecord>) consumer);
return new ParallelSmtConsumerProcessor(new SourceRecordCommitter(task), (Consumer<SourceRecord>) consumer);
}
if (ParallelSmtAndConvertConsumerProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtAndConvertConsumerProcessor(consumer, recordConverter);
return new ParallelSmtAndConvertConsumerProcessor(new SourceRecordCommitter(task), consumer, recordConverter);
}
if (ParallelSmtAsyncConsumerProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtAsyncConsumerProcessor((Consumer<SourceRecord>) consumer);
return new ParallelSmtAsyncConsumerProcessor(new SourceRecordCommitter(task), (Consumer<SourceRecord>) consumer);
}
if (ParallelSmtAndConvertAsyncConsumerProcessor.class.getName().equals(processorClassName)) {
return new ParallelSmtAndConvertAsyncConsumerProcessor(consumer, recordConverter);
return new ParallelSmtAndConvertAsyncConsumerProcessor(new SourceRecordCommitter(task), consumer, recordConverter);
}
throw new IllegalStateException("Unable to create RecordProcessor instance, this should never happen.");
}
@ -729,29 +748,57 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin
/**
* Implementation of {@link DebeziumEngine.Builder} which creates {@link AsyncEmbeddedEngine}.
*/
public static final class AsyncEngineBuilder implements DebeziumEngine.Builder<SourceRecord> {
public static final class AsyncEngineBuilder<R> implements DebeziumEngine.Builder<R> {
private Properties config;
private Consumer<SourceRecord> consumer;
private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
private Consumer<R> consumer;
private DebeziumEngine.ChangeConsumer<?> handler;
private ClassLoader classLoader;
private io.debezium.util.Clock clock;
private DebeziumEngine.CompletionCallback completionCallback;
private DebeziumEngine.ConnectorCallback connectorCallback;
private OffsetCommitPolicy offsetCommitPolicy = null;
private HeaderConverter headerConverter;
private Function<SourceRecord, R> recordConverter;
private ConverterBuilder converterBuilder;
AsyncEngineBuilder() {
this((KeyValueHeaderChangeEventFormat<?, ?, ?>) null);
}
AsyncEngineBuilder(ChangeEventFormat<?> format) {
this(KeyValueHeaderChangeEventFormat.of(null, format.getValueFormat(), null));
}
AsyncEngineBuilder(KeyValueChangeEventFormat<?, ?> format) {
this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat) format
: KeyValueHeaderChangeEventFormat.of(format.getKeyFormat(), format.getValueFormat(), null));
}
AsyncEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
if (format != null) {
this.converterBuilder = new ConverterBuilder();
this.converterBuilder.using(format);
}
}
@Override
public Builder<SourceRecord> notifying(final Consumer<SourceRecord> consumer) {
public Builder<R> notifying(final Consumer<R> consumer) {
this.consumer = consumer;
if (config.contains(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name())
&& config.getProperty(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()).equalsIgnoreCase("true")) {
this.handler = buildDefaultChangeConsumer(consumer);
if (recordConverter == null) {
this.handler = buildDefaultChangeConsumer((Consumer<SourceRecord>) consumer);
}
else {
this.handler = buildConvertingChangeConsumer(consumer, recordConverter);
}
}
return this;
}
@Override
public Builder<SourceRecord> notifying(final ChangeConsumer<SourceRecord> handler) {
public Builder<R> notifying(final ChangeConsumer<R> handler) {
this.handler = handler;
if (!config.contains(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) {
LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
@ -761,44 +808,52 @@ public Builder<SourceRecord> notifying(final ChangeConsumer<SourceRecord> handle
}
@Override
public Builder<SourceRecord> using(final Properties config) {
public Builder<R> using(final Properties config) {
this.config = config;
if (converterBuilder != null) {
converterBuilder.using(config);
}
return this;
}
@Override
public Builder<SourceRecord> using(final ClassLoader classLoader) {
public Builder<R> using(final ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
@Override
public Builder<SourceRecord> using(final Clock clock) {
public Builder<R> using(final Clock clock) {
this.clock = clock::millis;
return this;
}
@Override
public Builder<SourceRecord> using(final CompletionCallback completionCallback) {
public Builder<R> using(final CompletionCallback completionCallback) {
this.completionCallback = completionCallback;
return this;
}
@Override
public Builder<SourceRecord> using(final ConnectorCallback connectorCallback) {
public Builder<R> using(final ConnectorCallback connectorCallback) {
this.connectorCallback = connectorCallback;
return this;
}
@Override
public Builder<SourceRecord> using(final OffsetCommitPolicy policy) {
public Builder<R> using(final OffsetCommitPolicy policy) {
this.offsetCommitPolicy = policy;
return this;
}
@Override
public DebeziumEngine<SourceRecord> build() {
return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, null);
public DebeziumEngine<R> build() {
if (converterBuilder != null) {
headerConverter = converterBuilder.headerConverter();
recordConverter = converterBuilder.toFormat(headerConverter);
}
return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter,
recordConverter);
}
}
@ -840,6 +895,34 @@ public void handleBatch(final List<SourceRecord> records, final DebeziumEngine.R
};
}
/**
* Build the {@link DebeziumEngine.ChangeConsumer} from provided custom {@link Consumer} which convert records to requested format before passing them
* to the custom {@link Consumer}.
*
* @param consumer {@link Consumer} provided by the user.
* @return {@link DebeziumEngine.ChangeConsumer} which use user-provided {@link Consumer} for processing the Debezium records.
*/
private static ChangeConsumer buildConvertingChangeConsumer(Consumer consumer, Function<SourceRecord, ?> recordConverter) {
return new DebeziumEngine.ChangeConsumer<SourceRecord>() {
@Override
public void handleBatch(final List<SourceRecord> records, final DebeziumEngine.RecordCommitter<SourceRecord> committer) throws InterruptedException {
for (SourceRecord record : records) {
try {
consumer.accept(recordConverter.apply(record));
committer.markProcessed(record);
}
catch (StopEngineException ex) {
// Ensure that we mark the record as finished in this case.
committer.markProcessed(record);
throw ex;
}
}
committer.markBatchFinished();
}
};
}
/**
* Determines the size of the thread pool which will be used for processing records. The value can be either number (provided as a {@code String} value) or
* a predefined placeholder from {@link ProcessingCores} enumeration. If the number of threads is provided as a number, it will be eventually limited to
@ -1083,4 +1166,37 @@ public DebeziumEngine.Offsets buildOffsets() {
return new DebeziumEngineCommon.SourceRecordOffsets();
}
}
/**
* Implementation of {@link DebeziumEngine.RecordCommitter} which convert records to {@link SourceRecord}s and pass them to {@link SourceRecordCommitter}.
* The implementation is not thread safe and the caller has to ensure it's used in thread safe manner.
*/
private class ConvertingRecordCommitter implements DebeziumEngine.RecordCommitter<R> {
private final SourceRecordCommitter delegate;
ConvertingRecordCommitter(final EngineSourceTask task) {
this.delegate = new SourceRecordCommitter(task);
}
@Override
public void markProcessed(R record) throws InterruptedException {
delegate.markProcessed(sourceConverter.apply(record));
}
@Override
public void markBatchFinished() throws InterruptedException {
delegate.markBatchFinished();
}
@Override
public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
delegate.markProcessed(sourceConverter.apply(record), sourceOffsets);
}
@Override
public DebeziumEngine.Offsets buildOffsets() {
return delegate.buildOffsets();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded.async;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.SerializationFormat;
/**
* Implementation of {@link DebeziumEngine.BuilderFactory} for {@link AsyncEmbeddedEngine}.
*
* @author vjuranek
*/
public class ConvertingAsyncEngineBuilderFactory implements DebeziumEngine.BuilderFactory {
@Override
public <T, V extends SerializationFormat<T>> DebeziumEngine.Builder<RecordChangeEvent<T>> builder(ChangeEventFormat<V> format) {
return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format);
}
@Override
public <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> DebeziumEngine.Builder<ChangeEvent<S, T>> builder(
KeyValueChangeEventFormat<K, V> format) {
return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format);
}
public <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat<T>, H extends SerializationFormat<U>> DebeziumEngine.Builder<ChangeEvent<S, T>> builder(
KeyValueHeaderChangeEventFormat<K, V, H> format) {
return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format);
}
}

View File

@ -15,6 +15,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.engine.DebeziumEngine;
/**
* {@link RecordProcessor} which transforms and converts the records in parallel. Records are passed to the user-provided {@link Consumer} in arbitrary order, once
* they are processed. This processor should be used when user provides only custom {@link Consumer}, records should be converted and passed to the consumer in
@ -25,10 +27,12 @@
public class ParallelSmtAndConvertAsyncConsumerProcessor<R> extends AbstractRecordProcessor<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertAsyncConsumerProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final Consumer<R> consumer;
final Function<SourceRecord, R> convertor;
ParallelSmtAndConvertAsyncConsumerProcessor(final Consumer<R> consumer, final Function<SourceRecord, R> convertor) {
ParallelSmtAndConvertAsyncConsumerProcessor(DebeziumEngine.RecordCommitter committer, final Consumer<R> consumer, final Function<SourceRecord, R> convertor) {
this.committer = committer;
this.consumer = consumer;
this.convertor = convertor;
}

View File

@ -25,10 +25,13 @@
public class ParallelSmtAndConvertBatchProcessor<R> extends AbstractRecordProcessor<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertBatchProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final DebeziumEngine.ChangeConsumer<R> userHandler;
final Function<SourceRecord, R> convertor;
ParallelSmtAndConvertBatchProcessor(final DebeziumEngine.ChangeConsumer<R> userHandler, final Function<SourceRecord, R> convertor) {
ParallelSmtAndConvertBatchProcessor(final DebeziumEngine.RecordCommitter committer, final DebeziumEngine.ChangeConsumer<R> userHandler,
final Function<SourceRecord, R> convertor) {
this.committer = committer;
this.userHandler = userHandler;
this.convertor = convertor;
}

View File

@ -15,6 +15,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.engine.DebeziumEngine;
/**
* {@link RecordProcessor} which transforms and converts the records in parallel. Converted records are passed to the user-provided {@link Consumer}.
* This processor should be used when user provides only custom {@link Consumer}, records should be converted and passed to the consumer in the same order as they
@ -25,10 +27,12 @@
public class ParallelSmtAndConvertConsumerProcessor<R> extends AbstractRecordProcessor<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertConsumerProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final Consumer<R> consumer;
final Function<SourceRecord, R> convertor;
ParallelSmtAndConvertConsumerProcessor(final Consumer<R> consumer, final Function<SourceRecord, R> convertor) {
ParallelSmtAndConvertConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer<R> consumer, final Function<SourceRecord, R> convertor) {
this.committer = committer;
this.consumer = consumer;
this.convertor = convertor;
}

View File

@ -14,6 +14,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.engine.DebeziumEngine;
/**
* {@link RecordProcessor} which transforms the records in parallel. Records are passed to the user-provided {@link Consumer} in arbitrary order, once they are
* processed. This processor should be used when user provides only custom {@link Consumer} and records should be passed without converting to the consumer in the same
@ -21,12 +23,14 @@
*
* @author vjuranek
*/
public class ParallelSmtAsyncConsumerProcessor<R> extends AbstractRecordProcessor<R> {
public class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor<SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAsyncConsumerProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final Consumer<SourceRecord> consumer;
ParallelSmtAsyncConsumerProcessor(final Consumer<SourceRecord> consumer) {
ParallelSmtAsyncConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer<SourceRecord> consumer) {
this.committer = committer;
this.consumer = consumer;
}

View File

@ -21,12 +21,14 @@
*
* @author vjuranek
*/
public class ParallelSmtBatchProcessor<R> extends AbstractRecordProcessor<R> {
public class ParallelSmtBatchProcessor extends AbstractRecordProcessor<SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtBatchProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final DebeziumEngine.ChangeConsumer<SourceRecord> userHandler;
ParallelSmtBatchProcessor(final DebeziumEngine.ChangeConsumer<SourceRecord> userHandler) {
ParallelSmtBatchProcessor(final DebeziumEngine.RecordCommitter committer, final DebeziumEngine.ChangeConsumer<SourceRecord> userHandler) {
this.committer = committer;
this.userHandler = userHandler;
}

View File

@ -14,6 +14,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.engine.DebeziumEngine;
/**
* {@link RecordProcessor} which transforms the records in parallel. Records are passed to the user-provided {@link Consumer}.
* This processor should be used when user provides only custom {@link Consumer} and records should be passed without converting to the consumer in the same
@ -21,12 +23,14 @@
*
* @author vjuranek
*/
public class ParallelSmtConsumerProcessor<R> extends AbstractRecordProcessor<R> {
public class ParallelSmtConsumerProcessor extends AbstractRecordProcessor<SourceRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtConsumerProcessor.class);
final DebeziumEngine.RecordCommitter committer;
final Consumer<SourceRecord> consumer;
ParallelSmtConsumerProcessor(final Consumer<SourceRecord> consumer) {
ParallelSmtConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer<SourceRecord> consumer) {
this.committer = committer;
this.consumer = consumer;
}

View File

@ -29,9 +29,8 @@ public interface RecordProcessor<R> {
*
* @param recordService {@link ExecutorService} which allows to run processing of individual records in parallel.
* @param transformations chain of transformations to be applied on every individual record.
* @param committer implementation of {@link DebeziumEngine.RecordCommitter} responsible for committing individual records as well as batches.
*/
void initialize(ExecutorService recordService, Transformations transformations, DebeziumEngine.RecordCommitter committer);
void initialize(ExecutorService recordService, Transformations transformations);
/**
* Processes a batch of records provided by the source connector.