From 412ce206388fc47d400adee586faccf0826fe37d Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Thu, 27 Jun 2019 15:53:57 +0200 Subject: [PATCH] DBZ-1362 Switch to Awaitility implementation --- debezium-connector-postgres/pom.xml | 5 +++++ .../connector/postgresql/PostgresConnectorIT.java | 14 ++++++++------ pom.xml | 6 ++++++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml index 75b5113d4..374f55577 100644 --- a/debezium-connector-postgres/pom.xml +++ b/debezium-connector-postgres/pom.xml @@ -90,6 +90,11 @@ junit test + + org.awaitility + awaitility + test + org.easytesting fest-assert diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 11395866b..8ff2a9a12 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -32,6 +32,9 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; +import org.awaitility.Awaitility; +import org.awaitility.Duration; +import org.awaitility.core.ConditionTimeoutException; import org.fest.assertions.Assertions; import org.junit.Before; import org.junit.BeforeClass; @@ -695,12 +698,11 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1); // Wait max 2 seconds for LSN change - for (int retry = 0; retry < 20; retry++) { - final String confirmedflushLsn = getConfirmedFlushLsn(connection); - if (flushLsn.add(confirmedflushLsn)) { - break; - } - TimeUnit.MILLISECONDS.sleep(100); + try { + Awaitility.await().atMost(Duration.TWO_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection))); + } + catch (ConditionTimeoutException e) { + // We do not require all flushes to succeed in time } } } diff --git a/pom.xml b/pom.xml index b699fab2f..ca0a66764 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ 1.4 1.21 2.13.0 + 3.1.6 2.7 @@ -328,6 +329,11 @@ mockito-core ${version.mockito} + + org.awaitility + awaitility + ${version.awaitility} +