diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java index 1116c2db5..16ff8c9d4 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java @@ -173,4 +173,9 @@ protected Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCaptur .with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl); } + + @Override + protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { + TestHelper.waitForCdcTransactionPropagation(connection, TestHelper.TEST_DATABASE_1, expectedTransactions); + } } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index db4d66393..b35d0a425 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -653,6 +653,20 @@ public static void waitForCdcRecord(SqlServerConnection connection, String table } } + public static void waitForCdcTransactionPropagation(SqlServerConnection connection, String dbName, int expectedTransactions) throws SQLException { + Awaitility.await().atMost(60, TimeUnit.SECONDS) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> { + int transactions = connection.queryAndMap(String.format("SELECT COUNT(start_lsn) FROM [%s].cdc.lsn_time_mapping WHERE tran_id <> 0x00", dbName), + (rs) -> { + rs.next(); + return rs.getInt(1); + }); + return expectedTransactions == transactions; + }); + } + public static String topicName(String databaseName, String tableName) { return String.join(".", TEST_SERVER_NAME, databaseName, "dbo", tableName); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index e2ed14adb..0e257634c 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -76,6 +76,9 @@ protected String signalTableNameSanitized() { protected abstract Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl); + protected void waitForCdcTransactionPropagation(int expectedTransactions) throws Exception { + } + protected String alterTableAddColumnStatement(String tableName) { return "ALTER TABLE " + tableName + " add col3 int default 0"; } @@ -890,6 +893,7 @@ public void snapshotWithAdditionalCondition() throws Exception { int expectedCount = 10, expectedValue = 12345678; populateTable(); populateTableWithSpecificValue(2000, expectedCount, expectedValue); + waitForCdcTransactionPropagation(3); final Configuration config = config().build(); startAndConsumeTillEnd(connectorClass(), config); waitForConnectorToStart(); @@ -931,6 +935,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception { int expectedCount = 1000, expectedValue = 12345678; populateTable(); populateTableWithSpecificValue(2000, expectedCount, expectedValue); + waitForCdcTransactionPropagation(3); final Configuration config = config().build(); startAndConsumeTillEnd(connectorClass(), config); waitForConnectorToStart();