diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml
index 75b5113d4..374f55577 100644
--- a/debezium-connector-postgres/pom.xml
+++ b/debezium-connector-postgres/pom.xml
@@ -90,6 +90,11 @@
junit
test
+
+ org.awaitility
+ awaitility
+ test
+
org.easytesting
fest-assert
diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
index 11395866b..8ff2a9a12 100644
--- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
+++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java
@@ -32,6 +32,9 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.awaitility.core.ConditionTimeoutException;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -695,12 +698,11 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1);
// Wait max 2 seconds for LSN change
- for (int retry = 0; retry < 20; retry++) {
- final String confirmedflushLsn = getConfirmedFlushLsn(connection);
- if (flushLsn.add(confirmedflushLsn)) {
- break;
- }
- TimeUnit.MILLISECONDS.sleep(100);
+ try {
+ Awaitility.await().atMost(Duration.TWO_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
+ }
+ catch (ConditionTimeoutException e) {
+ // We do not require all flushes to succeed in time
}
}
}
diff --git a/pom.xml b/pom.xml
index b699fab2f..ca0a66764 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
1.4
1.21
2.13.0
+ 3.1.6
2.7
@@ -328,6 +329,11 @@
mockito-core
${version.mockito}
+
+ org.awaitility
+ awaitility
+ ${version.awaitility}
+