DBZ-7601 Stabilize SqlServerMetricsIT tests

This commit is contained in:
mfvitale 2024-08-13 10:54:27 +02:00 committed by Jiri Pechanec
parent f3d43a8755
commit 1f0a61fb30
2 changed files with 23 additions and 12 deletions

View File

@ -57,8 +57,6 @@ protected Configuration.Builder noSnapshot(Configuration.Builder config) {
@Override @Override
protected void executeInsertStatements() throws Exception { protected void executeInsertStatements() throws Exception {
connection.execute("INSERT INTO tablea VALUES('a')", "INSERT INTO tablea VALUES('b')"); connection.execute("INSERT INTO tablea VALUES('a')", "INSERT INTO tablea VALUES('b')");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.waitForEnabledCdc(connection, "tablea");
} }
@Override @Override
@ -83,21 +81,24 @@ protected String task() {
@Override @Override
protected String database() { protected String database() {
return "testDB1"; return TEST_DATABASE_1;
} }
private SqlServerConnection connection; private SqlServerConnection connection;
@Before @Before
public void before() throws SQLException { public void before() throws Exception {
// Testing.Print.enable();
TestHelper.createTestDatabase(); TestHelper.createTestDatabase();
connection = TestHelper.testConnection(); connection = TestHelper.testConnection();
connection.execute( connection.execute(
"CREATE TABLE tablea (id int IDENTITY(1,1) primary key, cola varchar(30))"); "CREATE TABLE tablea (id int IDENTITY(1,1) primary key, cola varchar(30))");
TestHelper.enableTableCdc(connection, "tablea"); TestHelper.enableTableCdc(connection, "tablea");
TestHelper.adjustCdcPollingInterval(connection, 1);
initializeConnectorTestFramework(); initializeConnectorTestFramework();
Testing.Files.delete(SCHEMA_HISTORY_PATH); Testing.Files.delete(SCHEMA_HISTORY_PATH);
// Be sure the agent is running
TestHelper.waitForMaxLsnAvailable(connection, TEST_DATABASE_1);
} }
@After @After
@ -139,9 +140,14 @@ public void testSnapshotAndStreamingMetrics() throws Exception {
assertConnectorIsRunning(); assertConnectorIsRunning();
assertSnapshotMetrics(); assertSnapshotMetrics();
// For SQL Server we have two more since when the streaming will start from an empty offset
// it will take the last commited transaction in the log and so also the initial inserts will be streamed. consumeRecords(2);
assertStreamingMetrics(false, expectedEvents() + 2);
TestHelper.disableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.waitForEnabledCdc(connection, "tablea");
assertStreamingMetrics(false, expectedEvents());
} }
@Test @Test
@ -156,8 +162,13 @@ public void testSnapshotAndStreamingWithCustomMetrics() throws Exception {
start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")); start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata"));
assertSnapshotWithCustomMetrics(customMetricTags); assertSnapshotWithCustomMetrics(customMetricTags);
// For SQL Server we have two more since when the streaming will start from an empty offset
// it will take the last commited transaction in the log and so also the initial inserts will be streamed. consumeRecords(2);
assertStreamingWithCustomMetrics(customMetricTags, expectedEvents() + 2);
TestHelper.disableTableCdc(connection, "tablea");
TestHelper.enableTableCdc(connection, "tablea");
TestHelper.waitForEnabledCdc(connection, "tablea");
assertStreamingWithCustomMetrics(customMetricTags, expectedEvents());
} }
} }

View File

@ -255,7 +255,7 @@ protected void assertStreamingMetrics(boolean checkAdvancedMetrics, long expecte
// Insert new records and wait for them to become available // Insert new records and wait for them to become available
executeInsertStatements(); executeInsertStatements();
// Testing.Print.enable();
consumeRecordsByTopic((int) expectedEvents); consumeRecordsByTopic((int) expectedEvents);
Thread.sleep(Duration.ofSeconds(2).toMillis()); Thread.sleep(Duration.ofSeconds(2).toMillis());