diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java index 772d78416..e883cc984 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerMetricsIT.java @@ -57,8 +57,6 @@ protected Configuration.Builder noSnapshot(Configuration.Builder config) { @Override protected void executeInsertStatements() throws Exception { connection.execute("INSERT INTO tablea VALUES('a')", "INSERT INTO tablea VALUES('b')"); - TestHelper.enableTableCdc(connection, "tablea"); - TestHelper.waitForEnabledCdc(connection, "tablea"); } @Override @@ -83,21 +81,24 @@ protected String task() { @Override protected String database() { - return "testDB1"; + return TEST_DATABASE_1; } private SqlServerConnection connection; @Before - public void before() throws SQLException { + public void before() throws Exception { + // Testing.Print.enable(); TestHelper.createTestDatabase(); connection = TestHelper.testConnection(); connection.execute( "CREATE TABLE tablea (id int IDENTITY(1,1) primary key, cola varchar(30))"); TestHelper.enableTableCdc(connection, "tablea"); - TestHelper.adjustCdcPollingInterval(connection, 1); initializeConnectorTestFramework(); Testing.Files.delete(SCHEMA_HISTORY_PATH); + + // Be sure the agent is running + TestHelper.waitForMaxLsnAvailable(connection, TEST_DATABASE_1); } @After @@ -139,9 +140,14 @@ public void testSnapshotAndStreamingMetrics() throws Exception { assertConnectorIsRunning(); assertSnapshotMetrics(); - // For SQL Server we have two more since when the streaming will start from an empty offset - // it will take the last commited transaction in the log and so also the initial inserts will be streamed. - assertStreamingMetrics(false, expectedEvents() + 2); + + consumeRecords(2); + + TestHelper.disableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.waitForEnabledCdc(connection, "tablea"); + + assertStreamingMetrics(false, expectedEvents()); } @Test @@ -156,8 +162,13 @@ public void testSnapshotAndStreamingWithCustomMetrics() throws Exception { start(x -> x.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata")); assertSnapshotWithCustomMetrics(customMetricTags); - // For SQL Server we have two more since when the streaming will start from an empty offset - // it will take the last commited transaction in the log and so also the initial inserts will be streamed. - assertStreamingWithCustomMetrics(customMetricTags, expectedEvents() + 2); + + consumeRecords(2); + + TestHelper.disableTableCdc(connection, "tablea"); + TestHelper.enableTableCdc(connection, "tablea"); + TestHelper.waitForEnabledCdc(connection, "tablea"); + + assertStreamingWithCustomMetrics(customMetricTags, expectedEvents()); } } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java index eed617e19..96daf9756 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractMetricsTest.java @@ -255,7 +255,7 @@ protected void assertStreamingMetrics(boolean checkAdvancedMetrics, long expecte // Insert new records and wait for them to become available executeInsertStatements(); - // Testing.Print.enable(); + consumeRecordsByTopic((int) expectedEvents); Thread.sleep(Duration.ofSeconds(2).toMillis());