DBZ-7813: waitForAvailableRecords using Awaitility

This commit is contained in:
Lars M Johansson 2024-06-05 09:12:02 +02:00 committed by Jiri Pechanec
parent 430b8125a8
commit 145c42e700

View File

@ -6,6 +6,7 @@
package io.debezium.embedded;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.pollinterval.FibonacciPollInterval.fibonacci;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -951,17 +952,27 @@ protected int consumeAvailableRecords(Consumer<SourceRecord> recordConsumer) {
* @return {@code true} if records are available, or {@code false} if the timeout occurred and no records are available
*/
protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) {
assertThat(timeout).isGreaterThanOrEqualTo(0);
long now = System.currentTimeMillis();
long stop = now + unit.toMillis(timeout);
while (System.currentTimeMillis() < stop) {
if (!consumedLines.isEmpty()) {
break;
assertThat(timeout).isNotNegative();
try {
Awaitility.await()
.alias("Records were not available on time")
.pollInterval(fibonacci(unit))
.atMost(timeout, unit)
.until(() -> !consumedLines.isEmpty());
}
catch (ConditionTimeoutException ignore) {
// IGNORE
}
return !consumedLines.isEmpty();
}
/**
* Wait for a maximum amount of time until the first record is available.
*/
protected boolean waitForAvailableRecords() {
return waitForAvailableRecords(waitTimeForRecords() * 30L, TimeUnit.SECONDS);
}
/**
* Disable record validation using Avro converter.
*/