DBZ-1362 Switch to Awaitility implementation
This commit is contained in:
parent
5a5d969e99
commit
412ce20638
@ -90,6 +90,11 @@
|
|||||||
<artifactId>junit</artifactId>
|
<artifactId>junit</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.easytesting</groupId>
|
<groupId>org.easytesting</groupId>
|
||||||
<artifactId>fest-assert</artifactId>
|
<artifactId>fest-assert</artifactId>
|
||||||
|
@ -32,6 +32,9 @@
|
|||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.errors.DataException;
|
import org.apache.kafka.connect.errors.DataException;
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
|
import org.awaitility.Duration;
|
||||||
|
import org.awaitility.core.ConditionTimeoutException;
|
||||||
import org.fest.assertions.Assertions;
|
import org.fest.assertions.Assertions;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -695,12 +698,11 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
|
|||||||
assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1);
|
assertThat(actualRecords.recordsForTopic(topicName("s1.a")).size()).isEqualTo(1);
|
||||||
|
|
||||||
// Wait max 2 seconds for LSN change
|
// Wait max 2 seconds for LSN change
|
||||||
for (int retry = 0; retry < 20; retry++) {
|
try {
|
||||||
final String confirmedflushLsn = getConfirmedFlushLsn(connection);
|
Awaitility.await().atMost(Duration.TWO_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
|
||||||
if (flushLsn.add(confirmedflushLsn)) {
|
}
|
||||||
break;
|
catch (ConditionTimeoutException e) {
|
||||||
}
|
// We do not require all flushes to succeed in time
|
||||||
TimeUnit.MILLISECONDS.sleep(100);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
6
pom.xml
6
pom.xml
@ -85,6 +85,7 @@
|
|||||||
<version.fest>1.4</version.fest>
|
<version.fest>1.4</version.fest>
|
||||||
<version.jmh>1.21</version.jmh>
|
<version.jmh>1.21</version.jmh>
|
||||||
<version.mockito>2.13.0</version.mockito>
|
<version.mockito>2.13.0</version.mockito>
|
||||||
|
<version.awaitility>3.1.6</version.awaitility>
|
||||||
|
|
||||||
<!-- Maven Plugins -->
|
<!-- Maven Plugins -->
|
||||||
<version.resources.plugin>2.7</version.resources.plugin>
|
<version.resources.plugin>2.7</version.resources.plugin>
|
||||||
@ -328,6 +329,11 @@
|
|||||||
<artifactId>mockito-core</artifactId>
|
<artifactId>mockito-core</artifactId>
|
||||||
<version>${version.mockito}</version>
|
<version>${version.mockito}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.awaitility</groupId>
|
||||||
|
<artifactId>awaitility</artifactId>
|
||||||
|
<version>${version.awaitility}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- Debezium artifacts -->
|
<!-- Debezium artifacts -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
Loading…
Reference in New Issue
Block a user