DBZ-6035 Await for transactions to be propagated to CDC table

Lowering polling interval intorduced in previous commit doesn't fully
fix the issue and there is still small race condition. To fully fix it
add function for awaiting transaction to be propagate to CDC table and
await transaction in tests which randomly fails.
This commit is contained in:
Vojtech Juranek 2023-01-25 16:29:06 +01:00 committed by Jiri Pechanec
parent 895fd0b99f
commit f53237c249
3 changed files with 24 additions and 0 deletions

View File

@ -173,4 +173,9 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur
.with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
}
@Override
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions);
}
}

View File

@ -653,6 +653,20 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table
}
}
public static void waitForCdcTransactionPropagation(SqlServerConnection connection, String dbName, int expectedTransactions) throws SQLException {
Awaitility.await().atMost(60, TimeUnit.SECONDS)
.pollDelay(1, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(() -> {
int transactions = connection.queryAndMap(String.format("SELECT COUNT(start_lsn) FROM [%s].cdc.lsn_time_mapping WHERE tran_id <> 0x00", dbName),
(rs) -> {
rs.next();
return rs.getInt(1);
});
return expectedTransactions == transactions;
});
}
public static String topicName(String databaseName, String tableName) {
return String.join(".", TEST_SERVER_NAME, databaseName, "dbo", tableName);
}

View File

@ -76,6 +76,9 @@ protected String signalTableNameSanitized() {
protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl);
protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception {
}
protected String alterTableAddColumnStatement(String tableName) {
return "ALTER TABLE " + tableName + " add col3 int default 0";
}
@ -890,6 +893,7 @@ public void snapshotWithAdditionalCondition() throws Exception {
int expectedCount = 10, expectedValue = 12345678;
populateTable();
populateTableWithSpecificValue(2000, expectedCount, expectedValue);
waitForCdcTransactionPropagation(3);
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
@ -931,6 +935,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception {
int expectedCount = 1000, expectedValue = 12345678;
populateTable();
populateTableWithSpecificValue(2000, expectedCount, expectedValue);
waitForCdcTransactionPropagation(3);
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();