From 17a390840359a8a2eef7d8719bf22f1ee9fc4f95 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Tue, 27 Aug 2024 13:36:53 +0200 Subject: [PATCH] DBZ-8166 Provide explicit engine type to avoid raw types --- .../io/debezium/embedded/AbstractConnectorTest.java | 12 ++++++------ .../io/debezium/embedded/TestingEmbeddedEngine.java | 7 +++++-- .../async/AbstractAsyncEngineConnectorTest.java | 8 +++++--- .../embedded/async/TestingAsyncEmbeddedEngine.java | 10 ++++++---- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index b923b8c85..810c49b6c 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -109,7 +109,7 @@ public abstract class AbstractConnectorTest implements Testing { private static final String TEST_PROPERTY_PREFIX = "debezium.test."; private ExecutorService executor; - protected TestingDebeziumEngine engine; + protected TestingDebeziumEngine engine; protected BlockingQueue consumedLines; protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(10); protected final Logger logger = LoggerFactory.getLogger(getClass()); @@ -331,7 +331,7 @@ protected void start(Class connectorClass, Configurat * @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue */ protected void start(Class connectorClass, Configuration connectorConfig, - DebeziumEngine.ChangeConsumer changeConsumer) { + DebeziumEngine.ChangeConsumer changeConsumer) { start(connectorClass, connectorConfig, loggingCompletion(), null, x -> { }, true, changeConsumer); } @@ -369,7 +369,7 @@ protected void start(Class connectorClass, Configurat */ protected void start(Class connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback, Predicate isStopRecord, - Consumer recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) { + Consumer recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) { Configuration config = Configuration.copy(connectorConfig) .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") .with(EmbeddedEngineConfig.CONNECTOR_CLASS, connectorClass.getName()) @@ -413,7 +413,7 @@ public void connectorStopped() { }; // Create the connector ... - DebeziumEngine.Builder builder = createEngineBuilder(); + DebeziumEngine.Builder builder = createEngineBuilder(); builder.using(config.asProperties()) .notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop)) .using(this.getClass().getClassLoader()) @@ -444,11 +444,11 @@ public void connectorStopped() { } } - protected DebeziumEngine.Builder createEngineBuilder() { + protected DebeziumEngine.Builder createEngineBuilder() { return new EmbeddedEngine.EngineBuilder(); } - protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { + protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { return new TestingEmbeddedEngine((EmbeddedEngine) builder.build()); } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/TestingEmbeddedEngine.java b/debezium-embedded/src/test/java/io/debezium/embedded/TestingEmbeddedEngine.java index 7da54e3ee..bef61cf25 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/TestingEmbeddedEngine.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/TestingEmbeddedEngine.java @@ -8,10 +8,13 @@ import java.io.IOException; import java.util.function.Consumer; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + /** * Implementation of {@link TestingDebeziumEngine} for {@link EmbeddedEngine}. */ -public class TestingEmbeddedEngine implements TestingDebeziumEngine { +public class TestingEmbeddedEngine implements TestingDebeziumEngine { private final EmbeddedEngine engine; @@ -30,7 +33,7 @@ public void close() throws IOException { } @Override - public void runWithTask(Consumer consumer) { + public void runWithTask(Consumer consumer) { engine.runWithTask(consumer); } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java index 069959e97..8a455fedf 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java @@ -5,6 +5,8 @@ */ package io.debezium.embedded.async; +import org.apache.kafka.connect.source.SourceRecord; + import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.TestingDebeziumEngine; import io.debezium.engine.DebeziumEngine; @@ -17,13 +19,13 @@ public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest { @Override - protected DebeziumEngine.Builder createEngineBuilder() { + protected DebeziumEngine.Builder createEngineBuilder() { return new AsyncEmbeddedEngine.AsyncEngineBuilder(); } @Override - protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { - return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build()); + protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { + return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build()); } protected DebeziumEngine.Signaler getSignaler() { diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java index 850330c67..e36d85c7c 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java @@ -8,6 +8,8 @@ import java.io.IOException; import java.util.function.Consumer; +import org.apache.kafka.connect.source.SourceTask; + import io.debezium.embedded.TestingDebeziumEngine; /** @@ -15,10 +17,10 @@ * * @author vjuranek */ -public class TestingAsyncEmbeddedEngine implements TestingDebeziumEngine { - private final AsyncEmbeddedEngine engine; +public class TestingAsyncEmbeddedEngine implements TestingDebeziumEngine { + private final AsyncEmbeddedEngine engine; - public TestingAsyncEmbeddedEngine(AsyncEmbeddedEngine engine) { + public TestingAsyncEmbeddedEngine(AsyncEmbeddedEngine engine) { this.engine = engine; } @@ -33,7 +35,7 @@ public void close() throws IOException { } @Override - public void runWithTask(Consumer consumer) { + public void runWithTask(Consumer consumer) { engine.runWithTask(consumer); }