diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java index 224730532..d900c97ad 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java @@ -6,8 +6,6 @@ package io.debezium.connector.postgresql; -import static org.junit.Assert.assertFalse; - import java.sql.SQLException; import java.util.concurrent.TimeUnit; @@ -140,10 +138,11 @@ public void shouldStopOnPostgresFastShutdown() throws Exception { .execInContainer("su", "-", "postgres", "-c", "/usr/lib/postgresql/11/bin/pg_ctl -m fast -D /var/lib/postgresql/data stop"); logger.info(result.toString()); - logger.info("Waiting for Postgres to shutdown..."); + logger.info("Waiting for Postgres to shut down..."); waitForPostgresShutdown(); - assertFalse(isStreamingRunning("postgres", TestHelper.TEST_SERVER)); + logger.info("Waiting for connector to shut down..."); + waitForConnectorShutdown("postgres", TestHelper.TEST_SERVER); } private void waitForPostgresShutdown() { @@ -152,5 +151,4 @@ private void waitForPostgresShutdown() { .atMost(60 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) .until(() -> !postgresContainer.isRunning()); } - } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 675dacf8f..b6fc9aabc 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -1064,6 +1064,13 @@ public static void waitForStreamingRunning(String connector, String server, Stri .until(() -> isStreamingRunning(connector, server, contextName)); } + public static void waitForConnectorShutdown(String connector, String server) { + Awaitility.await() + .pollInterval(200, TimeUnit.MILLISECONDS) + .atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS) + .until(() -> !isStreamingRunning(connector, server)); + } + public static boolean isStreamingRunning(String connector, String server) { return isStreamingRunning(connector, server, "streaming"); }