DBZ-5052 Add option to start embedded engine with custom change handler

Also make `consumedLines` instance variable available to subclasses to
they can use it in the change handler.
This commit is contained in:
Vojtech Juranek 2022-05-25 18:36:51 +02:00 committed by Jiri Pechanec
parent d6efe20826
commit d6447a91c9

View File

@ -99,7 +99,7 @@ public abstract class AbstractConnectorTest implements Testing {
private ExecutorService executor;
protected EmbeddedEngine engine;
private BlockingQueue<SourceRecord> consumedLines;
protected BlockingQueue<SourceRecord> consumedLines;
protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5);
protected final Logger logger = LoggerFactory.getLogger(getClass());
private CountDownLatch latch;
@ -309,6 +309,19 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
}, true);
}
/**
* Start the connector using the supplied connector configuration.
*
* @param connectorClass the connector class; may not be null
* @param connectorConfig the configuration for the connector; may not be null
* @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
DebeziumEngine.ChangeConsumer changeConsumer) {
start(connectorClass, connectorConfig, loggingCompletion(), null, x -> {
}, true, changeConsumer);
}
/**
* Start the connector using the supplied connector configuration.
*
@ -324,6 +337,25 @@ protected void start(Class<? extends SourceConnector> connectorClass, Configurat
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord,
Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop) {
start(connectorClass, connectorConfig, callback, isStopRecord, recordArrivedListener, ignoreRecordsAfterStop, null);
}
/**
* Start the connector using the supplied connector configuration.
*
* @param connectorClass the connector class; may not be null
* @param connectorConfig the configuration for the connector; may not be null
* @param isStopRecord the function that will be called to determine if the connector should be stopped before processing
* this record; may be null if not needed
* @param callback the function that will be called when the engine fails to start the connector or when the connector
* stops running after completing successfully or due to an error; may be null
* @param recordArrivedListener function invoked when a record arrives and is stored in the queue
* @param ignoreRecordsAfterStop {@code true} if records arriving after stop should be ignored
* @param changeConsumer {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} invoked when a record arrives and is stored in the queue
*/
protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig,
DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord,
Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop, DebeziumEngine.ChangeConsumer changeConsumer) {
Configuration config = Configuration.copy(connectorConfig)
.with(EmbeddedEngine.ENGINE_NAME, "testing-connector")
.with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName())
@ -355,13 +387,16 @@ public void taskStarted() {
};
// Create the connector ...
engine = EmbeddedEngine.create()
.using(config)
EmbeddedEngine.Builder builder = EmbeddedEngine.create();
builder.using(config)
.notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop))
.using(this.getClass().getClassLoader())
.using(wrapperCallback)
.using(connectorCallback)
.build();
.using(connectorCallback);
if (changeConsumer != null) {
builder.notifying(changeConsumer);
}
engine = builder.build();
// Submit the connector for asynchronous execution ...
assertThat(executor).isNull();