diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java index 008181288..9842e5ac0 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java @@ -13,7 +13,7 @@ import io.debezium.engine.Header; import io.debezium.engine.RecordChangeEvent; -class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent { +public class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent { private final K key; private final V value; diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AbstractRecordProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AbstractRecordProcessor.java index cdda32772..2e3d0fb01 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AbstractRecordProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AbstractRecordProcessor.java @@ -11,7 +11,6 @@ import org.apache.kafka.connect.source.SourceRecord; import io.debezium.embedded.Transformations; -import io.debezium.engine.DebeziumEngine; /** * Abstract implementation of {@link RecordProcessor}, which provides implementation of processor initialization, while the record processing implementation @@ -20,13 +19,11 @@ public abstract class AbstractRecordProcessor implements RecordProcessor { protected ExecutorService recordService; protected Transformations transformations; - protected DebeziumEngine.RecordCommitter committer; @Override - public void initialize(final ExecutorService recordService, final Transformations transformations, final DebeziumEngine.RecordCommitter committer) { + public void initialize(final ExecutorService recordService, final Transformations transformations) { this.recordService = recordService; this.transformations = transformations; - this.committer = committer; } @Override diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java index a40e4ffd1..222f5cabb 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java @@ -42,6 +42,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.FileOffsetBackingStore; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.KafkaOffsetBackingStore; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore; @@ -56,13 +57,18 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Instantiator; +import io.debezium.embedded.ConverterBuilder; import io.debezium.embedded.DebeziumEngineCommon; +import io.debezium.embedded.EmbeddedEngineChangeEvent; import io.debezium.embedded.EmbeddedEngineConfig; import io.debezium.embedded.EmbeddedWorkerConfig; import io.debezium.embedded.KafkaConnectUtil; import io.debezium.embedded.Transformations; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.StopEngineException; +import io.debezium.engine.format.ChangeEventFormat; +import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import io.debezium.engine.source.EngineSourceConnector; import io.debezium.engine.source.EngineSourceConnectorContext; import io.debezium.engine.source.EngineSourceTask; @@ -94,7 +100,9 @@ public final class AsyncEmbeddedEngine implements DebeziumEngine, AsyncEng private final OffsetCommitPolicy offsetCommitPolicy; private final EngineSourceConnector connector; private final Transformations transformations; + private final HeaderConverter headerConverter; private final Function recordConverter; + private final Function sourceConverter; private final AtomicReference state = new AtomicReference<>(State.CREATING); // state must be changed only via setEngineState() method private final List tasks = new ArrayList<>(); @@ -110,6 +118,7 @@ private AsyncEmbeddedEngine(Properties config, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy, + HeaderConverter headerConverter, Function recordConverter) { this.config = Configuration.from(Objects.requireNonNull(config, "A connector configuration must be specified.")); @@ -119,7 +128,9 @@ private AsyncEmbeddedEngine(Properties config, this.clock = clock == null ? io.debezium.util.Clock.system() : clock; this.completionCallback = completionCallback != null ? completionCallback : new DefaultCompletionCallback(); this.connectorCallback = Optional.ofNullable(connectorCallback); + this.headerConverter = headerConverter; this.recordConverter = recordConverter; + this.sourceConverter = (record) -> ((EmbeddedEngineChangeEvent) record).sourceRecord(); // Ensure either user ChangeConsumer or Consumer is provided and validate supported records ordering is provided when relevant. if (this.handler == null & this.consumer == null) { @@ -263,11 +274,19 @@ public void runWithTask(final Consumer consumer) { } /** - * Shuts down the engine. Currently, it's limited only to stopping the source connector. + * Shuts down the engine. Currently, it's limited only to closing header converter and stopping the source connector. * * @param stateBeforeStop {@link State} of the engine when the shutdown was requested. */ private void close(final State stateBeforeStop) { + if (headerConverter != null) { + try { + headerConverter.close(); + } + catch (IOException e) { + LOGGER.warn("Failed to close header converter: ", e); + } + } stopConnector(tasks, stateBeforeStop); } @@ -397,8 +416,8 @@ private void runTasksPolling(final List tasks) final ExecutorCompletionService taskCompletionService = new ExecutorCompletionService(taskService); final String processorClassName = selectRecordProcessor(); for (EngineSourceTask task : tasks) { - final RecordProcessor processor = createRecordProcessor(processorClassName); - processor.initialize(recordService, transformations, new SourceRecordCommitter(task)); + final RecordProcessor processor = createRecordProcessor(processorClassName, task); + processor.initialize(recordService, transformations); pollingFutures.add(taskCompletionService.submit(new PollRecords(task, processor, state))); } @@ -456,24 +475,24 @@ private String selectRecordProcessor() { * * @return {@link RecordProcessor} instance which will be used for processing the records. */ - private RecordProcessor createRecordProcessor(String processorClassName) { + private RecordProcessor createRecordProcessor(String processorClassName, EngineSourceTask task) { if (ParallelSmtBatchProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtBatchProcessor((DebeziumEngine.ChangeConsumer) handler); + return new ParallelSmtBatchProcessor(new SourceRecordCommitter(task), (DebeziumEngine.ChangeConsumer) handler); } if (ParallelSmtAndConvertBatchProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtAndConvertBatchProcessor(handler, recordConverter); + return new ParallelSmtAndConvertBatchProcessor(new ConvertingRecordCommitter(task), handler, recordConverter); } if (ParallelSmtConsumerProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtConsumerProcessor((Consumer) consumer); + return new ParallelSmtConsumerProcessor(new SourceRecordCommitter(task), (Consumer) consumer); } if (ParallelSmtAndConvertConsumerProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtAndConvertConsumerProcessor(consumer, recordConverter); + return new ParallelSmtAndConvertConsumerProcessor(new SourceRecordCommitter(task), consumer, recordConverter); } if (ParallelSmtAsyncConsumerProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtAsyncConsumerProcessor((Consumer) consumer); + return new ParallelSmtAsyncConsumerProcessor(new SourceRecordCommitter(task), (Consumer) consumer); } if (ParallelSmtAndConvertAsyncConsumerProcessor.class.getName().equals(processorClassName)) { - return new ParallelSmtAndConvertAsyncConsumerProcessor(consumer, recordConverter); + return new ParallelSmtAndConvertAsyncConsumerProcessor(new SourceRecordCommitter(task), consumer, recordConverter); } throw new IllegalStateException("Unable to create RecordProcessor instance, this should never happen."); } @@ -729,29 +748,57 @@ private static boolean commitOffsets(final OffsetStorageWriter offsetWriter, fin /** * Implementation of {@link DebeziumEngine.Builder} which creates {@link AsyncEmbeddedEngine}. */ - public static final class AsyncEngineBuilder implements DebeziumEngine.Builder { + public static final class AsyncEngineBuilder implements DebeziumEngine.Builder { private Properties config; - private Consumer consumer; - private DebeziumEngine.ChangeConsumer handler; + private Consumer consumer; + private DebeziumEngine.ChangeConsumer handler; private ClassLoader classLoader; private io.debezium.util.Clock clock; private DebeziumEngine.CompletionCallback completionCallback; private DebeziumEngine.ConnectorCallback connectorCallback; private OffsetCommitPolicy offsetCommitPolicy = null; + private HeaderConverter headerConverter; + private Function recordConverter; + private ConverterBuilder converterBuilder; + + AsyncEngineBuilder() { + this((KeyValueHeaderChangeEventFormat) null); + } + + AsyncEngineBuilder(ChangeEventFormat format) { + this(KeyValueHeaderChangeEventFormat.of(null, format.getValueFormat(), null)); + } + + AsyncEngineBuilder(KeyValueChangeEventFormat format) { + this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat) format + : KeyValueHeaderChangeEventFormat.of(format.getKeyFormat(), format.getValueFormat(), null)); + } + + AsyncEngineBuilder(KeyValueHeaderChangeEventFormat format) { + if (format != null) { + this.converterBuilder = new ConverterBuilder(); + this.converterBuilder.using(format); + } + } @Override - public Builder notifying(final Consumer consumer) { + public Builder notifying(final Consumer consumer) { this.consumer = consumer; if (config.contains(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()) && config.getProperty(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()).equalsIgnoreCase("true")) { - this.handler = buildDefaultChangeConsumer(consumer); + if (recordConverter == null) { + this.handler = buildDefaultChangeConsumer((Consumer) consumer); + } + else { + this.handler = buildConvertingChangeConsumer(consumer, recordConverter); + } } return this; } @Override - public Builder notifying(final ChangeConsumer handler) { + public Builder notifying(final ChangeConsumer handler) { this.handler = handler; if (!config.contains(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !handler.supportsTombstoneEvents()) { LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()); @@ -761,44 +808,52 @@ public Builder notifying(final ChangeConsumer handle } @Override - public Builder using(final Properties config) { + public Builder using(final Properties config) { this.config = config; + if (converterBuilder != null) { + converterBuilder.using(config); + } return this; } @Override - public Builder using(final ClassLoader classLoader) { + public Builder using(final ClassLoader classLoader) { this.classLoader = classLoader; return this; } @Override - public Builder using(final Clock clock) { + public Builder using(final Clock clock) { this.clock = clock::millis; return this; } @Override - public Builder using(final CompletionCallback completionCallback) { + public Builder using(final CompletionCallback completionCallback) { this.completionCallback = completionCallback; return this; } @Override - public Builder using(final ConnectorCallback connectorCallback) { + public Builder using(final ConnectorCallback connectorCallback) { this.connectorCallback = connectorCallback; return this; } @Override - public Builder using(final OffsetCommitPolicy policy) { + public Builder using(final OffsetCommitPolicy policy) { this.offsetCommitPolicy = policy; return this; } @Override - public DebeziumEngine build() { - return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, null); + public DebeziumEngine build() { + if (converterBuilder != null) { + headerConverter = converterBuilder.headerConverter(); + recordConverter = converterBuilder.toFormat(headerConverter); + } + return new AsyncEmbeddedEngine(config, consumer, handler, classLoader, clock, completionCallback, connectorCallback, offsetCommitPolicy, headerConverter, + recordConverter); } } @@ -840,6 +895,34 @@ public void handleBatch(final List records, final DebeziumEngine.R }; } + /** + * Build the {@link DebeziumEngine.ChangeConsumer} from provided custom {@link Consumer} which convert records to requested format before passing them + * to the custom {@link Consumer}. + * + * @param consumer {@link Consumer} provided by the user. + * @return {@link DebeziumEngine.ChangeConsumer} which use user-provided {@link Consumer} for processing the Debezium records. + */ + private static ChangeConsumer buildConvertingChangeConsumer(Consumer consumer, Function recordConverter) { + return new DebeziumEngine.ChangeConsumer() { + + @Override + public void handleBatch(final List records, final DebeziumEngine.RecordCommitter committer) throws InterruptedException { + for (SourceRecord record : records) { + try { + consumer.accept(recordConverter.apply(record)); + committer.markProcessed(record); + } + catch (StopEngineException ex) { + // Ensure that we mark the record as finished in this case. + committer.markProcessed(record); + throw ex; + } + } + committer.markBatchFinished(); + } + }; + } + /** * Determines the size of the thread pool which will be used for processing records. The value can be either number (provided as a {@code String} value) or * a predefined placeholder from {@link ProcessingCores} enumeration. If the number of threads is provided as a number, it will be eventually limited to @@ -1083,4 +1166,37 @@ public DebeziumEngine.Offsets buildOffsets() { return new DebeziumEngineCommon.SourceRecordOffsets(); } } + + /** + * Implementation of {@link DebeziumEngine.RecordCommitter} which convert records to {@link SourceRecord}s and pass them to {@link SourceRecordCommitter}. + * The implementation is not thread safe and the caller has to ensure it's used in thread safe manner. + */ + private class ConvertingRecordCommitter implements DebeziumEngine.RecordCommitter { + + private final SourceRecordCommitter delegate; + + ConvertingRecordCommitter(final EngineSourceTask task) { + this.delegate = new SourceRecordCommitter(task); + } + + @Override + public void markProcessed(R record) throws InterruptedException { + delegate.markProcessed(sourceConverter.apply(record)); + } + + @Override + public void markBatchFinished() throws InterruptedException { + delegate.markBatchFinished(); + } + + @Override + public void markProcessed(R record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException { + delegate.markProcessed(sourceConverter.apply(record), sourceOffsets); + } + + @Override + public DebeziumEngine.Offsets buildOffsets() { + return delegate.buildOffsets(); + } + } } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ConvertingAsyncEngineBuilderFactory.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ConvertingAsyncEngineBuilderFactory.java new file mode 100644 index 000000000..15bfbde95 --- /dev/null +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ConvertingAsyncEngineBuilderFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded.async; + +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.RecordChangeEvent; +import io.debezium.engine.format.ChangeEventFormat; +import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; +import io.debezium.engine.format.SerializationFormat; + +/** + * Implementation of {@link DebeziumEngine.BuilderFactory} for {@link AsyncEmbeddedEngine}. + * + * @author vjuranek + */ +public class ConvertingAsyncEngineBuilderFactory implements DebeziumEngine.BuilderFactory { + + @Override + public > DebeziumEngine.Builder> builder(ChangeEventFormat format) { + return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format); + } + + @Override + public , V extends SerializationFormat> DebeziumEngine.Builder> builder( + KeyValueChangeEventFormat format) { + return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format); + } + + public , V extends SerializationFormat, H extends SerializationFormat> DebeziumEngine.Builder> builder( + KeyValueHeaderChangeEventFormat format) { + return new AsyncEmbeddedEngine.AsyncEngineBuilder<>(format); + } +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertAsyncConsumerProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertAsyncConsumerProcessor.java index 504cbf633..b8432d911 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertAsyncConsumerProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertAsyncConsumerProcessor.java @@ -15,6 +15,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.engine.DebeziumEngine; + /** * {@link RecordProcessor} which transforms and converts the records in parallel. Records are passed to the user-provided {@link Consumer} in arbitrary order, once * they are processed. This processor should be used when user provides only custom {@link Consumer}, records should be converted and passed to the consumer in @@ -25,10 +27,12 @@ public class ParallelSmtAndConvertAsyncConsumerProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertAsyncConsumerProcessor.class); + final DebeziumEngine.RecordCommitter committer; final Consumer consumer; final Function convertor; - ParallelSmtAndConvertAsyncConsumerProcessor(final Consumer consumer, final Function convertor) { + ParallelSmtAndConvertAsyncConsumerProcessor(DebeziumEngine.RecordCommitter committer, final Consumer consumer, final Function convertor) { + this.committer = committer; this.consumer = consumer; this.convertor = convertor; } @@ -49,4 +53,4 @@ public void processRecords(final List records) throws Exception { LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } -} \ No newline at end of file +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertBatchProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertBatchProcessor.java index 48b36e703..3028beb25 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertBatchProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertBatchProcessor.java @@ -25,10 +25,13 @@ public class ParallelSmtAndConvertBatchProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertBatchProcessor.class); + final DebeziumEngine.RecordCommitter committer; final DebeziumEngine.ChangeConsumer userHandler; final Function convertor; - ParallelSmtAndConvertBatchProcessor(final DebeziumEngine.ChangeConsumer userHandler, final Function convertor) { + ParallelSmtAndConvertBatchProcessor(final DebeziumEngine.RecordCommitter committer, final DebeziumEngine.ChangeConsumer userHandler, + final Function convertor) { + this.committer = committer; this.userHandler = userHandler; this.convertor = convertor; } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertConsumerProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertConsumerProcessor.java index 50912244e..b63993f9e 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertConsumerProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAndConvertConsumerProcessor.java @@ -15,6 +15,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.engine.DebeziumEngine; + /** * {@link RecordProcessor} which transforms and converts the records in parallel. Converted records are passed to the user-provided {@link Consumer}. * This processor should be used when user provides only custom {@link Consumer}, records should be converted and passed to the consumer in the same order as they @@ -25,10 +27,12 @@ public class ParallelSmtAndConvertConsumerProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAndConvertConsumerProcessor.class); + final DebeziumEngine.RecordCommitter committer; final Consumer consumer; final Function convertor; - ParallelSmtAndConvertConsumerProcessor(final Consumer consumer, final Function convertor) { + ParallelSmtAndConvertConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer consumer, final Function convertor) { + this.committer = committer; this.consumer = consumer; this.convertor = convertor; } @@ -54,4 +58,4 @@ public void processRecords(final List records) throws Exception { LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } -} \ No newline at end of file +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAsyncConsumerProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAsyncConsumerProcessor.java index 16dba5590..ca45635ed 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAsyncConsumerProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtAsyncConsumerProcessor.java @@ -14,6 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.engine.DebeziumEngine; + /** * {@link RecordProcessor} which transforms the records in parallel. Records are passed to the user-provided {@link Consumer} in arbitrary order, once they are * processed. This processor should be used when user provides only custom {@link Consumer} and records should be passed without converting to the consumer in the same @@ -21,12 +23,14 @@ * * @author vjuranek */ -public class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor { +public class ParallelSmtAsyncConsumerProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtAsyncConsumerProcessor.class); + final DebeziumEngine.RecordCommitter committer; final Consumer consumer; - ParallelSmtAsyncConsumerProcessor(final Consumer consumer) { + ParallelSmtAsyncConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer consumer) { + this.committer = committer; this.consumer = consumer; } @@ -45,4 +49,4 @@ public void processRecords(final List records) throws Exception { LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } -} \ No newline at end of file +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtBatchProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtBatchProcessor.java index 696ec11c4..930c769fd 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtBatchProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtBatchProcessor.java @@ -21,12 +21,14 @@ * * @author vjuranek */ -public class ParallelSmtBatchProcessor extends AbstractRecordProcessor { +public class ParallelSmtBatchProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtBatchProcessor.class); + final DebeziumEngine.RecordCommitter committer; final DebeziumEngine.ChangeConsumer userHandler; - ParallelSmtBatchProcessor(final DebeziumEngine.ChangeConsumer userHandler) { + ParallelSmtBatchProcessor(final DebeziumEngine.RecordCommitter committer, final DebeziumEngine.ChangeConsumer userHandler) { + this.committer = committer; this.userHandler = userHandler; } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtConsumerProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtConsumerProcessor.java index bb640f014..2b657cda1 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtConsumerProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/ParallelSmtConsumerProcessor.java @@ -14,6 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.engine.DebeziumEngine; + /** * {@link RecordProcessor} which transforms the records in parallel. Records are passed to the user-provided {@link Consumer}. * This processor should be used when user provides only custom {@link Consumer} and records should be passed without converting to the consumer in the same @@ -21,12 +23,14 @@ * * @author vjuranek */ -public class ParallelSmtConsumerProcessor extends AbstractRecordProcessor { +public class ParallelSmtConsumerProcessor extends AbstractRecordProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ParallelSmtConsumerProcessor.class); + final DebeziumEngine.RecordCommitter committer; final Consumer consumer; - ParallelSmtConsumerProcessor(final Consumer consumer) { + ParallelSmtConsumerProcessor(final DebeziumEngine.RecordCommitter committer, final Consumer consumer) { + this.committer = committer; this.consumer = consumer; } @@ -51,4 +55,4 @@ public void processRecords(final List records) throws Exception { LOGGER.trace("Marking batch as finished."); committer.markBatchFinished(); } -} \ No newline at end of file +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/RecordProcessor.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/RecordProcessor.java index 02912d829..e12b0248e 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/RecordProcessor.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/RecordProcessor.java @@ -29,9 +29,8 @@ public interface RecordProcessor { * * @param recordService {@link ExecutorService} which allows to run processing of individual records in parallel. * @param transformations chain of transformations to be applied on every individual record. - * @param committer implementation of {@link DebeziumEngine.RecordCommitter} responsible for committing individual records as well as batches. */ - void initialize(ExecutorService recordService, Transformations transformations, DebeziumEngine.RecordCommitter committer); + void initialize(ExecutorService recordService, Transformations transformations); /** * Processes a batch of records provided by the source connector.