DBZ-2220 Fix test failure - SqlServerConnectorIT#verifyOffsets

This commit is contained in:
Chris Cranford 2020-07-01 14:25:42 -04:00 committed by Jiri Pechanec
parent 61aa659116
commit 4b64791a01

View File

@ -13,9 +13,13 @@
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -675,21 +679,56 @@ public void verifyOffsets() throws Exception {
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.build();
final List<Integer> expectedIds = new ArrayList<>();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')");
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')");
expectedIds.add(id);
}
for (int i = 0; !connection.getMaxLsn().isAvailable(); i++) {
if (i == 30) {
org.junit.Assert.fail("Initial changes not written to CDC structures");
final String tableaCT = connection.getNameOfChangeTable("tablea");
final String tablebCT = connection.getNameOfChangeTable("tableb");
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
// Wait for max lsn to be available
if (!connection.getMaxLsn().isAvailable()) {
return false;
}
Testing.debug("Waiting for initial changes to be propagated to CDC structures");
Thread.sleep(1000);
}
// verify pre-snapshot inserts have succeeded
Map<String, Boolean> resultMap = new HashMap<>();
connection.listOfChangeTables().forEach(ct -> {
final String tableName = ct.getChangeTableId().table();
if (tableName.endsWith("dbo_" + tableaCT) || tableName.endsWith("dbo_" + tablebCT)) {
try {
final Lsn minLsn = connection.getMinLsn(tableName);
final Lsn maxLsn = connection.getMaxLsn();
SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[]{});
final List<Integer> ids = new ArrayList<>();
connection.getChangesForTables(tables, minLsn, maxLsn, resultsets -> {
final ResultSet rs = resultsets[0];
while (rs.next()) {
ids.add(rs.getInt("id"));
}
});
if (ids.equals(expectedIds)) {
resultMap.put(tableName, true);
}
else {
resultMap.put(tableName, false);
}
}
catch (Exception e) {
org.junit.Assert.fail("Failed to fetch changes for table " + tableName + ": " + e.getMessage());
}
}
});
return resultMap.values().stream().filter(v -> !v).count() == 0;
});
start(SqlServerConnector.class, config);
assertConnectorIsRunning();