From c34cf6920cd7576e900191e63df92083a9d88516 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Mon, 6 Nov 2023 13:31:11 +0100 Subject: [PATCH] DBZ-7110 Remove deprecated EmbeddedEngine interface --- .../embedded/ConvertingEngineBuilder.java | 2 +- .../io/debezium/embedded/EmbeddedEngine.java | 121 +----------------- .../embedded/AbstractConnectorTest.java | 20 ++- .../embedded/ConnectorOutputTest.java | 3 +- .../connector/oracle/EndToEndPerf.java | 8 +- 5 files changed, 22 insertions(+), 132 deletions(-) diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java index c30bc3595..c0d13496a 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java @@ -79,7 +79,7 @@ public class ConvertingEngineBuilder implements Builder { } ConvertingEngineBuilder(KeyValueHeaderChangeEventFormat format) { - this.delegate = EmbeddedEngine.create(); + this.delegate = new EmbeddedEngine.BuilderImpl(); this.formatKey = format.getKeyFormat(); this.formatValue = format.getValueFormat(); this.formatHeader = format.getHeaderFormat(); diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 41a00d9fd..8c2d0027d 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -84,7 +84,7 @@ @ThreadSafe public final class EmbeddedEngine implements DebeziumEngine, EmbeddedEngineConfig { - public static final class BuilderImpl implements Builder { + public static final class BuilderImpl implements Builder { private Configuration config; private DebeziumEngine.ChangeConsumer handler; private ClassLoader classLoader; @@ -93,12 +93,6 @@ public static final class BuilderImpl implements Builder { private DebeziumEngine.ConnectorCallback connectorCallback; private OffsetCommitPolicy offsetCommitPolicy = null; - @Override - public Builder using(Configuration config) { - this.config = config; - return this; - } - @Override public Builder using(Properties config) { this.config = Configuration.from(config); @@ -111,12 +105,6 @@ public Builder using(ClassLoader classLoader) { return this; } - @Override - public Builder using(Clock clock) { - this.clock = clock; - return this; - } - @Override public Builder using(DebeziumEngine.CompletionCallback completionCallback) { this.completionCallback = completionCallback; @@ -153,13 +141,14 @@ public Builder notifying(DebeziumEngine.ChangeConsumer handler) { @Override public Builder using(java.time.Clock clock) { - return using(new Clock() { + this.clock = new Clock() { @Override public long currentTimeInMillis() { return clock.millis(); } - }); + }; + return this; } @Override @@ -175,31 +164,6 @@ public EmbeddedEngine build() { return new EmbeddedEngine(config, classLoader, clock, handler, completionCallback, connectorCallback, offsetCommitPolicy); } - - // backward compatibility methods - @Override - public Builder using(CompletionCallback completionCallback) { - return using((DebeziumEngine.CompletionCallback) completionCallback); - } - - @Override - public Builder using(ConnectorCallback connectorCallback) { - return using((DebeziumEngine.ConnectorCallback) connectorCallback); - } - } - - /** - * A callback function to be notified when the connector completes. - */ - @Deprecated - public interface CompletionCallback extends DebeziumEngine.CompletionCallback { - } - - /** - * Callback function which informs users about the various stages a connector goes through during startup - */ - @Deprecated - public interface ConnectorCallback extends DebeziumEngine.ConnectorCallback { } /** @@ -308,25 +272,8 @@ public boolean hasError() { } } - /** - * Contract passed to {@link ChangeConsumer}s, allowing them to commit single records as they have been processed - * and to signal that offsets may be flushed eventually. - */ - @ThreadSafe - @Deprecated - public interface RecordCommitter extends DebeziumEngine.RecordCommitter { - } - - /** - * A contract invoked by the embedded engine when it has received a batch of change records to be processed. Allows - * to process multiple records in one go, acknowledging their processing once that's done. - */ - @Deprecated - public interface ChangeConsumer extends DebeziumEngine.ChangeConsumer { - } - - private static ChangeConsumer buildDefaultChangeConsumer(Consumer consumer) { - return new ChangeConsumer() { + private static ChangeConsumer buildDefaultChangeConsumer(Consumer consumer) { + return new DebeziumEngine.ChangeConsumer() { /** * the default implementation that is compatible with the old Consumer api. @@ -358,60 +305,6 @@ public void handleBatch(List records, DebeziumEngine.RecordCommitt }; } - /** - * A builder to set up and create {@link EmbeddedEngine} instances. - */ - @Deprecated - public interface Builder extends DebeziumEngine.Builder { - - /** - * Use the specified configuration for the connector. The configuration is assumed to already be valid. - * - * @param config the configuration - * @return this builder object so methods can be chained together; never null - */ - Builder using(Configuration config); - - /** - * Use the specified clock when needing to determine the current time. Passing null or not calling this - * method results in the connector using the {@link Clock#system() system clock}. - * - * @param clock the clock - * @return this builder object so methods can be chained together; never null - */ - Builder using(Clock clock); - - // backward compatibility methods - @Override - Builder notifying(Consumer consumer); - - @Override - Builder notifying(DebeziumEngine.ChangeConsumer handler); - - @Override - Builder using(ClassLoader classLoader); - - Builder using(CompletionCallback completionCallback); - - Builder using(ConnectorCallback connectorCallback); - - @Override - Builder using(OffsetCommitPolicy policy); - - @Override - EmbeddedEngine build(); - } - - /** - * Obtain a new {@link Builder} instance that can be used to construct runnable {@link EmbeddedEngine} instances. - * - * @return the new builder; never null - */ - @Deprecated - public static Builder create() { - return new BuilderImpl(); - } - private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedEngine.class); private final Configuration config; @@ -920,7 +813,7 @@ private int getErrorsMaxRetries() { * @return the new recordCommitter to be used for a given batch */ protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) { - return new RecordCommitter() { + return new DebeziumEngine.RecordCommitter() { @Override public synchronized void markProcessed(SourceRecord record) throws InterruptedException { diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index be2abd35a..dbbf8cc9a 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -69,8 +69,6 @@ import io.debezium.config.Configuration; import io.debezium.config.Instantiator; import io.debezium.data.VerifyRecord; -import io.debezium.embedded.EmbeddedEngine.CompletionCallback; -import io.debezium.embedded.EmbeddedEngine.ConnectorCallback; import io.debezium.embedded.EmbeddedEngine.EmbeddedConfig; import io.debezium.engine.DebeziumEngine; import io.debezium.function.BooleanConsumer; @@ -219,12 +217,12 @@ protected int getMaximumEnqueuedRecordCount() { } /** - * Create a {@link CompletionCallback} that logs when the engine fails to start the connector or when the connector + * Create a {@link DebeziumEngine.CompletionCallback} that logs when the engine fails to start the connector or when the connector * stops running after completing successfully or due to an error * - * @return the logging {@link CompletionCallback} + * @return the logging {@link DebeziumEngine.CompletionCallback} */ - protected CompletionCallback loggingCompletion() { + protected DebeziumEngine.CompletionCallback loggingCompletion() { return (success, msg, error) -> { if (success) { logger.info(msg); @@ -370,7 +368,7 @@ protected void start(Class connectorClass, Configurat .with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0) .build(); latch = new CountDownLatch(1); - CompletionCallback wrapperCallback = (success, msg, error) -> { + DebeziumEngine.CompletionCallback wrapperCallback = (success, msg, error) -> { try { if (callback != null) { callback.handle(success, msg, error); @@ -385,7 +383,7 @@ protected void start(Class connectorClass, Configurat Testing.debug("Stopped connector"); }; - ConnectorCallback connectorCallback = new ConnectorCallback() { + DebeziumEngine.ConnectorCallback connectorCallback = new DebeziumEngine.ConnectorCallback() { @Override public void taskStarted() { // if this is called, it means a task has been started successfully so we can continue @@ -406,8 +404,8 @@ public void connectorStopped() { }; // Create the connector ... - EmbeddedEngine.Builder builder = EmbeddedEngine.create(); - builder.using(config) + EmbeddedEngine.Builder builder = new EmbeddedEngine.BuilderImpl(); + builder.using(config.asProperties()) .notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop)) .using(this.getClass().getClassLoader()) .using(wrapperCallback) @@ -415,7 +413,7 @@ public void connectorStopped() { if (changeConsumer != null) { builder.notifying(changeConsumer); } - engine = new TestingEmbeddedEngine(builder.build()); + engine = new TestingEmbeddedEngine((EmbeddedEngine) builder.build()); // Submit the connector for asynchronous execution ... assertThat(executor).isNull(); @@ -626,7 +624,7 @@ protected SourceRecords consumeRecordsByTopic(int numRecords) throws Interrupted * This is most useful in corner cases when there can be a duplicate records between snapshot * and streaming switch. * - * @param numRecords the number of records that should be consumed + * @param recordsToRead the number of records that should be consumed * @param tripCondition condition to satisfy to stop skipping records * @return the collector into which the records were captured; never null * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/ConnectorOutputTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/ConnectorOutputTest.java index 83233d5ef..491958641 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/ConnectorOutputTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/ConnectorOutputTest.java @@ -60,7 +60,6 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.Value; -import io.debezium.embedded.EmbeddedEngine.CompletionCallback; import io.debezium.embedded.EmbeddedEngine.CompletionResult; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.RecordChangeEvent; @@ -796,7 +795,7 @@ protected void runConnector(TestSpecification spec) { * @param spec the test specification * @param callback the function that should be called when the connector is stopped */ - protected void runConnector(TestSpecification spec, CompletionCallback callback) { + protected void runConnector(TestSpecification spec, DebeziumEngine.CompletionCallback callback) { PreviousContext preRunContext = LoggingContext.forConnector(getClass().getSimpleName(), "runner", spec.name()); final Configuration environmentConfig = Configuration.copy(spec.environment()).build(); final Configuration connectorConfig = spec.config(); diff --git a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java index 1028b97ff..44a6f85aa 100644 --- a/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java +++ b/debezium-microbenchmark-oracle/src/main/java/io/debezium/performance/connector/oracle/EndToEndPerf.java @@ -126,18 +126,18 @@ public void doSetup() { .build(); Consumer recordArrivedListener = this::processRecord; - this.engine = EmbeddedEngine.create() - .using(config) + this.engine = (EmbeddedEngine) new EmbeddedEngine.BuilderImpl() + .using(config.asProperties()) .notifying((record) -> { if (!engine.isRunning() || Thread.currentThread().isInterrupted()) { return; } - while (!consumedLines.offer(record)) { + while (!consumedLines.offer((SourceRecord) record)) { if (!engine.isRunning() || Thread.currentThread().isInterrupted()) { return; } } - recordArrivedListener.accept(record); + recordArrivedListener.accept((SourceRecord) record); }) .using(this.getClass().getClassLoader()) .build();