DBZ-1401 Use awaitility

This commit is contained in:
Jiri Pechanec 2019-10-08 06:18:16 +02:00 committed by Gunnar Morling
parent 55b11d60e6
commit 4fdf1f8556
2 changed files with 28 additions and 8 deletions

View File

@ -78,6 +78,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>

View File

@ -7,7 +7,9 @@
package io.debezium.connector.sqlserver;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
@ -39,7 +41,6 @@ public class TablesWithoutPrimaryKeyIT extends AbstractConnectorTest {
@Before
public void before() throws SQLException {
Testing.Print.enable();
TestHelper.createTestDatabase();
initializeConnectorTestFramework();
@ -88,6 +89,10 @@ public void shouldProcessFromStreaming() throws Exception {
);
TestHelper.enableTableCdc(connection, "init");
waitForDisabledCdc(connection, "t1");
waitForDisabledCdc(connection, "t2");
waitForDisabledCdc(connection, "t3");
start(SqlServerConnector.class, TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build()
@ -101,15 +106,15 @@ public void shouldProcessFromStreaming() throws Exception {
Testing.Print.enable();
TestHelper.enableTableCdc(connection, "t1");
waitForEnbledCdc(connection, "t1");
waitForEnabledCdc(connection, "t1");
connection.execute("INSERT INTO t1 VALUES (1,10);");
TestHelper.enableTableCdc(connection, "t2");
waitForEnbledCdc(connection, "t2");
waitForEnabledCdc(connection, "t2");
connection.execute("INSERT INTO t2 VALUES (2,20);");
TestHelper.enableTableCdc(connection, "t3");
waitForEnbledCdc(connection, "t3");
waitForEnabledCdc(connection, "t3");
connection.execute("INSERT INTO t3 VALUES (3,30);");
final int expectedRecordsCount = 1 + 1 + 1;
@ -122,9 +127,19 @@ public void shouldProcessFromStreaming() throws Exception {
Assertions.assertThat(records.recordsForTopic("server1.dbo.t3").get(0).keySchema()).isNull();
}
private void waitForEnbledCdc(SqlServerConnection connection, String table) throws SQLException, InterruptedException {
while (!TestHelper.isCdcEnabled(connection, table)) {
Thread.sleep(100);
}
private void waitForEnabledCdc(SqlServerConnection connection, String table) throws SQLException, InterruptedException {
Awaitility
.await("CDC " + table)
.atMost(1, TimeUnit.MINUTES)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> TestHelper.isCdcEnabled(connection, table));
}
private void waitForDisabledCdc(SqlServerConnection connection, String table) throws SQLException, InterruptedException {
Awaitility
.await("CDC " + table)
.atMost(1, TimeUnit.MINUTES)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> !TestHelper.isCdcEnabled(connection, table));
}
}