From bec109308d24428d039b49d21733f73ea52ea117 Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Wed, 21 Aug 2024 16:07:22 +0200 Subject: [PATCH] DBZ-8160 Updated tests to reflect engine's signal API changes --- .../io/debezium/connector/postgresql/SignalsIT.java | 4 ++-- .../async/AbstractAsyncEngineConnectorTest.java | 12 +++++------- .../embedded/async/TestingAsyncEmbeddedEngine.java | 5 +++++ 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java index 0300c90c0..01789cbaf 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/SignalsIT.java @@ -31,8 +31,8 @@ import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.spi.CustomActionProvider; import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest; +import io.debezium.engine.DebeziumEngine; import io.debezium.junit.logging.LogInterceptor; -import io.debezium.pipeline.signal.SignalRecord; import io.debezium.pipeline.signal.actions.Log; public class SignalsIT extends AbstractAsyncEngineConnectorTest { @@ -114,7 +114,7 @@ private void signalLog(boolean includingEscapedCharacter, boolean useSource) thr } else { expectedNumRecords = 1; - signaler.signal(new SignalRecord("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", null)); + getSignaler().signal(new DebeziumEngine.Signal("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}", null)); } waitForAvailableRecords(800, TimeUnit.MILLISECONDS); diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java index 4c99d74e2..069959e97 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AbstractAsyncEngineConnectorTest.java @@ -8,7 +8,6 @@ import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.TestingDebeziumEngine; import io.debezium.engine.DebeziumEngine; -import io.debezium.pipeline.signal.SignalRecord; /** * Base class for testing connectors using {@link AsyncEmbeddedEngine}. @@ -17,18 +16,17 @@ */ public class AbstractAsyncEngineConnectorTest extends AbstractConnectorTest { - protected DebeziumEngine.Signaler signaler; - @Override protected DebeziumEngine.Builder createEngineBuilder() { - this.signaler = new AsyncEngineSignaler(); - - return new AsyncEmbeddedEngine.AsyncEngineBuilder() - .using(signaler); + return new AsyncEmbeddedEngine.AsyncEngineBuilder(); } @Override protected TestingDebeziumEngine createEngine(DebeziumEngine.Builder builder) { return new TestingAsyncEmbeddedEngine((AsyncEmbeddedEngine) builder.build()); } + + protected DebeziumEngine.Signaler getSignaler() { + return engine.getSignaler(); + } } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java index d66f41c2a..850330c67 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/TestingAsyncEmbeddedEngine.java @@ -36,4 +36,9 @@ public void close() throws IOException { public void runWithTask(Consumer consumer) { engine.runWithTask(consumer); } + + @Override + public Signaler getSignaler() { + return engine.getSignaler(); + } }