DBZ-6035 Add function for adjusting SQL server polling interval
Some tests fail randomly as they insert some records at the begging of the test, start Debezium and asserts that there are no records. However, as SQL server poll for the changes and copies them into CDC table not immediately but in specified interval (by default 5 secods), there can be a race condition when the entries inserted at the beginning of the test are copied into CDC table after starting Debezium and it resutls into the test failure as there are unexpected records. Add function for adjusting the polling interval and decrease polling interval in `IncrementalSnapshotIT` which fails very often. This doesn't fully fix the race condition, but decreases the changes the issue is hit.
This commit is contained in:
parent
9ee39691bf
commit
895fd0b99f
@ -23,6 +23,7 @@
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest<SqlServerConnector> {
|
||||
private static final int POLLING_INTERVAL = 1;
|
||||
|
||||
private SqlServerConnection connection;
|
||||
|
||||
@ -38,6 +39,7 @@ public void before() throws SQLException {
|
||||
"CREATE TABLE b (pk int primary key, aa int)",
|
||||
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");
|
||||
TestHelper.enableTableCdc(connection, "debezium_signal");
|
||||
TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL);
|
||||
|
||||
initializeConnectorTestFramework();
|
||||
Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
|
||||
|
@ -76,6 +76,7 @@ public class TestHelper {
|
||||
private static final String IS_CDC_TABLE_ENABLED = "SELECT COUNT(*) FROM sys.tables tb WHERE tb.is_tracked_by_cdc = 1 AND tb.name='#'";
|
||||
private static final String ENABLE_TABLE_CDC_WITH_CUSTOM_CAPTURE = "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'%s', @capture_instance = N'%s', @role_name = NULL, @supports_net_changes = 0, @captured_column_list = %s";
|
||||
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
|
||||
private static final String ADJUST_CDC_POLLING_INTERVAL = "EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = #";
|
||||
private static final String CDC_WRAPPERS_DML;
|
||||
|
||||
/**
|
||||
@ -420,6 +421,21 @@ public static void disableTableCdc(JdbcConnection connection, String name) throw
|
||||
connection.execute(disableCdcForTableStmt);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets new polling interval in which SQL server should poll changes.
|
||||
*
|
||||
* SQL server polls new changes and copies them into CDC in predefined interval.
|
||||
* By default, this interval is 5 seconds. For the tests it may be too long and test may need shorter interval.
|
||||
*
|
||||
* @param interval
|
||||
* new CDC polling interval, in seconds
|
||||
* @throws SQLException if anything unexpected fails
|
||||
*/
|
||||
public static void adjustCdcPollingInterval(JdbcConnection connection, int interval) throws SQLException {
|
||||
String adjustCdcPollingIntervalStmt = ADJUST_CDC_POLLING_INTERVAL.replace(STATEMENTS_PLACEHOLDER, Integer.toString(interval));
|
||||
connection.execute(adjustCdcPollingIntervalStmt);
|
||||
}
|
||||
|
||||
public static void waitForSnapshotToBeCompleted() {
|
||||
waitForDatabaseSnapshotToBeCompleted(TEST_DATABASE_1);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user