diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index b6f0f3636..a6b936fd6 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -13,9 +13,13 @@ import static org.junit.Assert.assertNull; import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -675,21 +679,56 @@ public void verifyOffsets() throws Exception { .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .build(); + final List expectedIds = new ArrayList<>(); for (int i = 0; i < RECORDS_PER_TABLE; i++) { final int id = ID_START + i; connection.execute( "INSERT INTO tablea VALUES(" + id + ", 'a')"); connection.execute( "INSERT INTO tableb VALUES(" + id + ", 'b')"); + expectedIds.add(id); } - for (int i = 0; !connection.getMaxLsn().isAvailable(); i++) { - if (i == 30) { - org.junit.Assert.fail("Initial changes not written to CDC structures"); + final String tableaCT = connection.getNameOfChangeTable("tablea"); + final String tablebCT = connection.getNameOfChangeTable("tableb"); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + // Wait for max lsn to be available + if (!connection.getMaxLsn().isAvailable()) { + return false; } - Testing.debug("Waiting for initial changes to be propagated to CDC structures"); - Thread.sleep(1000); - } + + // verify pre-snapshot inserts have succeeded + Map resultMap = new HashMap<>(); + connection.listOfChangeTables().forEach(ct -> { + final String tableName = ct.getChangeTableId().table(); + if (tableName.endsWith("dbo_" + tableaCT) || tableName.endsWith("dbo_" + tablebCT)) { + try { + final Lsn minLsn = connection.getMinLsn(tableName); + final Lsn maxLsn = connection.getMaxLsn(); + SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{}); + final List ids = new ArrayList<>(); + connection.getChangesForTables(tables, minLsn, maxLsn, resultsets -> { + final ResultSet rs = resultsets[0]; + while (rs.next()) { + ids.add(rs.getInt("id")); + } + }); + if (ids.equals(expectedIds)) { + resultMap.put(tableName, true); + } + else { + resultMap.put(tableName, false); + } + } + catch (Exception e) { + org.junit.Assert.fail("Failed to fetch changes for table " + tableName + ": " + e.getMessage()); + } + } + }); + return resultMap.values().stream().filter(v -> !v).count() == 0; + }); + start(SqlServerConnector.class, config); assertConnectorIsRunning();