From 895fd0b99f23df541082c1ad40a160e8403777cc Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Wed, 25 Jan 2023 15:53:13 +0100 Subject: [PATCH] DBZ-6035 Add function for adjusting SQL server polling interval Some tests fail randomly as they insert some records at the begging of the test, start Debezium and asserts that there are no records. However, as SQL server poll for the changes and copies them into CDC table not immediately but in specified interval (by default 5 secods), there can be a race condition when the entries inserted at the beginning of the test are copied into CDC table after starting Debezium and it resutls into the test failure as there are unexpected records. Add function for adjusting the polling interval and decrease polling interval in `IncrementalSnapshotIT` which fails very often. This doesn't fully fix the race condition, but decreases the changes the issue is hit. --- .../sqlserver/IncrementalSnapshotIT.java | 2 ++ .../connector/sqlserver/util/TestHelper.java | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java index fdb764d0f..1116c2db5 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/IncrementalSnapshotIT.java @@ -23,6 +23,7 @@ import io.debezium.util.Testing; public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest { + private static final int POLLING_INTERVAL = 1; private SqlServerConnection connection; @@ -38,6 +39,7 @@ public void before() throws SQLException { "CREATE TABLE b (pk int primary key, aa int)", "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"); TestHelper.enableTableCdc(connection, "debezium_signal"); + TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL); initializeConnectorTestFramework(); Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java index 0b2100f01..db4d66393 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/util/TestHelper.java @@ -76,6 +76,7 @@ public class TestHelper { private static final String IS_CDC_TABLE_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'"; private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0, @captured_column_list = %s"; private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'"; + private static final String ADJUST_CDC_POLLING_INTERVAL = "EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = #"; private static final String CDC_WRAPPERS_DML; /** @@ -420,6 +421,21 @@ public static void disableTableCdc(JdbcConnection connection, String name) throw connection.execute(disableCdcForTableStmt); } + /** + * Sets new polling interval in which SQL server should poll changes. + * + * SQL server polls new changes and copies them into CDC in predefined interval. + * By default, this interval is 5 seconds. For the tests it may be too long and test may need shorter interval. + * + * @param interval + * new CDC polling interval, in seconds + * @throws SQLException if anything unexpected fails + */ + public static void adjustCdcPollingInterval(JdbcConnection connection, int interval) throws SQLException { + String adjustCdcPollingIntervalStmt = ADJUST_CDC_POLLING_INTERVAL.replace(STATEMENTS_PLACEHOLDER, Integer.toString(interval)); + connection.execute(adjustCdcPollingIntervalStmt); + } + public static void waitForSnapshotToBeCompleted() { waitForDatabaseSnapshotToBeCompleted(TEST_DATABASE_1); }