diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 4ff307c3c..8d32c0b69 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -99,7 +99,7 @@ public abstract class AbstractConnectorTest implements Testing { private ExecutorService executor; protected EmbeddedEngine engine; - private BlockingQueue consumedLines; + protected BlockingQueue consumedLines; protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5); protected final Logger logger = LoggerFactory.getLogger(getClass()); private CountDownLatch latch; @@ -309,6 +309,19 @@ protected void start(Class connectorClass, Configurat }, true); } + /** + * Start the connector using the supplied connector configuration. + * + * @param connectorClass the connector class; may not be null + * @param connectorConfig the configuration for the connector; may not be null + * @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue + */ + protected void start(Class connectorClass, Configuration connectorConfig, + DebeziumEngine.ChangeConsumer changeConsumer) { + start(connectorClass, connectorConfig, loggingCompletion(), null, x -> { + }, true, changeConsumer); + } + /** * Start the connector using the supplied connector configuration. * @@ -324,6 +337,25 @@ protected void start(Class connectorClass, Configurat protected void start(Class connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback, Predicate isStopRecord, Consumer recordArrivedListener, boolean ignoreRecordsAfterStop) { + start(connectorClass, connectorConfig, callback, isStopRecord, recordArrivedListener, ignoreRecordsAfterStop, null); + } + + /** + * Start the connector using the supplied connector configuration. + * + * @param connectorClass the connector class; may not be null + * @param connectorConfig the configuration for the connector; may not be null + * @param isStopRecord the function that will be called to determine if the connector should be stopped before processing + * this record; may be null if not needed + * @param callback the function that will be called when the engine fails to start the connector or when the connector + * stops running after completing successfully or due to an error; may be null + * @param recordArrivedListener function invoked when a record arrives and is stored in the queue + * @param ignoreRecordsAfterStop {@code true} if records arriving after stop should be ignored + * @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue + */ + protected void start(Class connectorClass, Configuration connectorConfig, + DebeziumEngine.CompletionCallback callback, Predicate isStopRecord, + Consumer recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) { Configuration config = Configuration.copy(connectorConfig) .with(EmbeddedEngine.ENGINE_NAME, "testing-connector") .with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName()) @@ -355,13 +387,16 @@ public void taskStarted() { }; // Create the connector ... - engine = EmbeddedEngine.create() - .using(config) + EmbeddedEngine.Builder builder = EmbeddedEngine.create(); + builder.using(config) .notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop)) .using(this.getClass().getClassLoader()) .using(wrapperCallback) - .using(connectorCallback) - .build(); + .using(connectorCallback); + if (changeConsumer != null) { + builder.notifying(changeConsumer); + } + engine = builder.build(); // Submit the connector for asynchronous execution ... assertThat(executor).isNull();