diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 7472f6aab..5b2917ccf 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; import io.debezium.bean.StandardBeanNames; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -137,6 +138,14 @@ public ChangeEventSourceCoordinator connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider, + connectorConfig.createHeartbeat( + topicNamingStrategy, + schemaNameAdjuster, + connectionFactory::newConnection, + exception -> { + final String sqlErrorId = exception.getMessage(); + throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); + }), schemaNameAdjuster, signalProcessor); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 5e2d5badd..6f323de3c 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -65,6 +65,7 @@ import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.heartbeat.DatabaseHeartbeatImpl; import io.debezium.junit.ConditionalFail; import io.debezium.junit.Flaky; import io.debezium.junit.logging.LogInterceptor; @@ -3259,6 +3260,45 @@ public void shouldOnlyCaptureTableSchemaForIncluded() throws Exception { assertNoRecordsToConsume(); } + @Test + @FixFor("DBZ-7801") + public void shouldExecuteHeartbeatActionQuery() throws Exception { + try { + connection.execute("CREATE TABLE dbo.heartbeat (id int primary key, data DATETIME)"); + TestHelper.enableTableCdc(connection, "heartbeat"); + + connection.execute("INSERT INTO dbo.heartbeat (id, data) values (1, current_timestamp)"); + + Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.heartbeat") + .with(SqlServerConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL, "true") + .with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "UPDATE dbo.heartbeat set data = current_timestamp") + .with(DatabaseHeartbeatImpl.HEARTBEAT_INTERVAL, 1000) + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingStarted(); + + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + final SourceRecords records = consumeRecordsByTopic(1); + final List heartbeatRecords = records.recordsForTopic("server1.testDB1.dbo.heartbeat"); + return heartbeatRecords != null && !heartbeatRecords.isEmpty(); + }); + + // stop connector and clean-up any potential residual heartbeat events + stopConnector((success) -> { + consumeAvailableRecords(r -> { + }); + }); + } + finally { + TestHelper.disableTableCdc(connection, "heartbeat"); + } + } + private void purgeDatabaseLogs() throws SQLException { TestHelper.disableTableCdc(connection, "tablea"); diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index c6dfba7d1..a19865d8b 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -3061,6 +3061,16 @@ This may result in more change events to be re-sent after a connector restart. Set this parameter to `0` to not send heartbeat messages at all. + Disabled by default. +|[[sqlserver-property-heartbeat-action-query]]<> +|No default +|Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. + ++ +This is useful for keeping offsets from becoming stale when capturing changes from a low-traffic database. Create a heartbeat table in the low-traffic database, and set this property to a statement that inserts records into that table, for example: + ++ +`INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')` + ++ +This allows the connector to receive changes from the low-traffic database and acknowledge their LSNs, which prevents offsets from become stale. + |[[sqlserver-property-snapshot-delay-ms]]<> |No default |An interval in milli-seconds that the connector should wait before taking a snapshot after starting up; +