DBZ-8160 Updated tests to reflect engine's signal API changes

This commit is contained in:
Jakub Cechacek 2024-08-21 16:07:22 +02:00 committed by Jiri Pechanec
parent f20d05fab9
commit bec109308d
3 changed files with 12 additions and 9 deletions

View File

@ -31,8 +31,8 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.spi.CustomActionProvider; import io.debezium.connector.postgresql.spi.CustomActionProvider;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor; import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.SignalRecord;
import io.debezium.pipeline.signal.actions.Log; import io.debezium.pipeline.signal.actions.Log;
public class SignalsIT extends AbstractAsyncEngineConnectorTest { public class SignalsIT extends AbstractAsyncEngineConnectorTest {
@ -114,7 +114,7 @@ private void signalLog(boolean includingEscapedCharacter, boolean useSource) thr
} }
else { else {
expectedNumRecords = 1; expectedNumRecords = 1;
signaler.signal(new SignalRecord("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", null)); getSignaler().signal(new DebeziumEngine.Signal("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", null));
} }
waitForAvailableRecords(800, TimeUnit.MILLISECONDS); waitForAvailableRecords(800, TimeUnit.MILLISECONDS);

View File

@ -8,7 +8,6 @@
import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.TestingDebeziumEngine; import io.debezium.embedded.TestingDebeziumEngine;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
import io.debezium.pipeline.signal.SignalRecord;
/** /**
* Base class for testing connectors using {@link AsyncEmbeddedEngine}. * Base class for testing connectors using {@link AsyncEmbeddedEngine}.
@ -17,18 +16,17 @@
*/ */
public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest { public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest {
protected DebeziumEngine.Signaler<SignalRecord> signaler;
@Override @Override
protected DebeziumEngine.Builder createEngineBuilder() { protected DebeziumEngine.Builder createEngineBuilder() {
this.signaler = new AsyncEngineSignaler(); return new AsyncEmbeddedEngine.AsyncEngineBuilder();
return new AsyncEmbeddedEngine.AsyncEngineBuilder()
.using(signaler);
} }
@Override @Override
protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) {
return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build()); return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build());
} }
protected DebeziumEngine.Signaler getSignaler() {
return engine.getSignaler();
}
} }

View File

@ -36,4 +36,9 @@ public void close() throws IOException {
public void runWithTask(Consumer consumer) { public void runWithTask(Consumer consumer) {
engine.runWithTask(consumer); engine.runWithTask(consumer);
} }
@Override
public Signaler getSignaler() {
return engine.getSignaler();
}
} }