DBZ-8166 Provide explicit engine type to avoid raw types

This commit is contained in:
Vojtech Juranek 2024-08-27 13:36:53 +02:00 committed by Vojtěch Juránek
parent be9c70d631
commit 17a3908403
4 changed files with 22 additions and 15 deletions

View File

@ -109,7 +109,7 @@ public abstract class AbstractConnectorTest implements Testing {
private static final String TEST_PROPERTY_PREFIX = "debezium.test."; private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
private ExecutorService executor; private ExecutorService executor;
protected TestingDebeziumEngine engine; protected TestingDebeziumEngine<SourceRecord> engine;
protected BlockingQueue<SourceRecord> consumedLines; protected BlockingQueue<SourceRecord> consumedLines;
protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(10); protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(10);
protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -331,7 +331,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
* @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue * @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue
*/ */
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
DebeziumEngine.ChangeConsumer changeConsumer) { DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer) {
start(connectorClass, connectorConfig, loggingCompletion(), null, x -> { start(connectorClass, connectorConfig, loggingCompletion(), null, x -> {
}, true, changeConsumer); }, true, changeConsumer);
} }
@ -369,7 +369,7 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
*/ */
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord, DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord,
Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) { Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer<SourceRecord> changeConsumer) {
Configuration config = Configuration.copy(connectorConfig) Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector") .with(EmbeddedEngineConfig.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngineConfig.CONNECTOR_CLASS, connectorClass.getName()) .with(EmbeddedEngineConfig.CONNECTOR_CLASS, connectorClass.getName())
@ -413,7 +413,7 @@ public void connectorStopped() {
}; };
// Create the connector ... // Create the connector ...
DebeziumEngine.Builder builder = createEngineBuilder(); DebeziumEngine.Builder<SourceRecord> builder = createEngineBuilder();
builder.using(config.asProperties()) builder.using(config.asProperties())
.notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop)) .notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop))
.using(this.getClass().getClassLoader()) .using(this.getClass().getClassLoader())
@ -444,11 +444,11 @@ public void connectorStopped() {
} }
} }
protected DebeziumEngine.Builder createEngineBuilder() { protected DebeziumEngine.Builder<SourceRecord> createEngineBuilder() {
return new EmbeddedEngine.EngineBuilder(); return new EmbeddedEngine.EngineBuilder();
} }
protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { protected TestingDebeziumEngine<SourceRecord> createEngine(DebeziumEngine.Builder<SourceRecord> builder) {
return new TestingEmbeddedEngine((EmbeddedEngine) builder.build()); return new TestingEmbeddedEngine((EmbeddedEngine) builder.build());
} }

View File

@ -8,10 +8,13 @@
import java.io.IOException; import java.io.IOException;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
/** /**
* Implementation of {@link TestingDebeziumEngine} for {@link EmbeddedEngine}. * Implementation of {@link TestingDebeziumEngine} for {@link EmbeddedEngine}.
*/ */
public class TestingEmbeddedEngine implements TestingDebeziumEngine { public class TestingEmbeddedEngine implements TestingDebeziumEngine<SourceRecord> {
private final EmbeddedEngine engine; private final EmbeddedEngine engine;
@ -30,7 +33,7 @@ public void close() throws IOException {
} }
@Override @Override
public void runWithTask(Consumer consumer) { public void runWithTask(Consumer<SourceTask> consumer) {
engine.runWithTask(consumer); engine.runWithTask(consumer);
} }

View File

@ -5,6 +5,8 @@
*/ */
package io.debezium.embedded.async; package io.debezium.embedded.async;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.TestingDebeziumEngine; import io.debezium.embedded.TestingDebeziumEngine;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
@ -17,13 +19,13 @@
public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest { public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest {
@Override @Override
protected DebeziumEngine.Builder createEngineBuilder() { protected DebeziumEngine.Builder<SourceRecord> createEngineBuilder() {
return new AsyncEmbeddedEngine.AsyncEngineBuilder(); return new AsyncEmbeddedEngine.AsyncEngineBuilder();
} }
@Override @Override
protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { protected TestingDebeziumEngine<SourceRecord> createEngine(DebeziumEngine.Builder<SourceRecord> builder) {
return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build()); return new TestingAsyncEmbeddedEngine<SourceRecord>((AsyncEmbeddedEngine<SourceRecord>) builder.build());
} }
protected DebeziumEngine.Signaler getSignaler() { protected DebeziumEngine.Signaler getSignaler() {

View File

@ -8,6 +8,8 @@
import java.io.IOException; import java.io.IOException;
import java.util.function.Consumer; import java.util.function.Consumer;
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.embedded.TestingDebeziumEngine; import io.debezium.embedded.TestingDebeziumEngine;
/** /**
@ -15,10 +17,10 @@
* *
* @author vjuranek * @author vjuranek
*/ */
public class TestingAsyncEmbeddedEngine implements TestingDebeziumEngine { public class TestingAsyncEmbeddedEngine<T> implements TestingDebeziumEngine<T> {
private final AsyncEmbeddedEngine engine; private final AsyncEmbeddedEngine<T> engine;
public TestingAsyncEmbeddedEngine(AsyncEmbeddedEngine engine) { public TestingAsyncEmbeddedEngine(AsyncEmbeddedEngine<T> engine) {
this.engine = engine; this.engine = engine;
} }
@ -33,7 +35,7 @@ public void close() throws IOException {
} }
@Override @Override
public void runWithTask(Consumer consumer) { public void runWithTask(Consumer<SourceTask> consumer) {
engine.runWithTask(consumer); engine.runWithTask(consumer);
} }