DBZ-7024 Allow to override type into AbstractConnectorTest

This commit is contained in:
Vojtech Juranek 2024-01-07 22:51:35 +01:00 committed by Jiri Pechanec
parent 1eec31b3f3
commit 69bbed1fa3

View File

@ -404,7 +404,7 @@ public void connectorStopped() {
};
// Create the connector ...
EmbeddedEngine.Builder builder = new EmbeddedEngine.EngineBuilder();
DebeziumEngine.Builder builder = createEngineBuilder();
builder.using(config.asProperties())
.notifying(getConsumer(isStopRecord, recordArrivedListener, ignoreRecordsAfterStop))
.using(this.getClass().getClassLoader())
@ -413,7 +413,7 @@ public void connectorStopped() {
if (changeConsumer != null) {
builder.notifying(changeConsumer);
}
engine = new TestingEmbeddedEngine((EmbeddedEngine) builder.build());
engine = createEngine(builder);
// Submit the connector for asynchronous execution ...
assertThat(executor).isNull();
@ -435,6 +435,14 @@ public void connectorStopped() {
}
}
protected DebeziumEngine.Builder createEngineBuilder() {
return new EmbeddedEngine.EngineBuilder();
}
protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) {
return new TestingEmbeddedEngine((EmbeddedEngine) builder.build());
}
protected Consumer<SourceRecord> getConsumer(Predicate<SourceRecord> isStopRecord, Consumer<SourceRecord> recordArrivedListener, boolean ignoreRecordsAfterStop) {
return (record) -> {
if (isStopRecord != null && isStopRecord.test(record)) {