diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 2c8916d0f..fad585496 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -6,6 +6,7 @@ package io.debezium.embedded; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.pollinterval.FibonacciPollInterval.fibonacci; import static org.junit.Assert.fail; import java.io.IOException; @@ -951,17 +952,27 @@ protected int consumeAvailableRecords(Consumer recordConsumer) { * @return {@code true} if records are available, or {@code false} if the timeout occurred and no records are available */ protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) { - assertThat(timeout).isGreaterThanOrEqualTo(0); - long now = System.currentTimeMillis(); - long stop = now + unit.toMillis(timeout); - while (System.currentTimeMillis() < stop) { - if (!consumedLines.isEmpty()) { - break; - } + assertThat(timeout).isNotNegative(); + try { + Awaitility.await() + .alias("Records were not available on time") + .pollInterval(fibonacci(unit)) + .atMost(timeout, unit) + .until(() -> !consumedLines.isEmpty()); + } + catch (ConditionTimeoutException ignore) { + // IGNORE } return !consumedLines.isEmpty(); } + /** + * Wait for a maximum amount of time until the first record is available. + */ + protected boolean waitForAvailableRecords() { + return waitForAvailableRecords(waitTimeForRecords() * 30L, TimeUnit.SECONDS); + } + /** * Disable record validation using Avro converter. */