DBZ-7801 Add SQL Server heartbeat.action.query support

This commit is contained in:
Chris Cranford 2024-06-11 16:14:15 -04:00 committed by Jiri Pechanec
parent c05c2f51f9
commit c792a94e44
3 changed files with 59 additions and 0 deletions

View File

@ -14,6 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
@ -137,6 +138,14 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
connectionFactory::newConnection,
exception -> {
final String sqlErrorId = exception.getMessage();
throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
}),
schemaNameAdjuster,
signalProcessor);

View File

@ -65,6 +65,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.Flaky;
import io.debezium.junit.logging.LogInterceptor;
@ -3259,6 +3260,45 @@ public void shouldOnlyCaptureTableSchemaForIncluded() throws Exception {
assertNoRecordsToConsume();
}
@Test
@FixFor("DBZ-7801")
public void shouldExecuteHeartbeatActionQuery() throws Exception {
try {
connection.execute("CREATE TABLE dbo.heartbeat (id int primary key, data DATETIME)");
TestHelper.enableTableCdc(connection, "heartbeat");
connection.execute("INSERT INTO dbo.heartbeat (id, data) values (1, current_timestamp)");
Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.heartbeat")
.with(SqlServerConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL, "true")
.with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "UPDATE dbo.heartbeat set data = current_timestamp")
.with(DatabaseHeartbeatImpl.HEARTBEAT_INTERVAL, 1000)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
waitForStreamingStarted();
Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
final SourceRecords records = consumeRecordsByTopic(1);
final List<SourceRecord> heartbeatRecords = records.recordsForTopic("server1.testDB1.dbo.heartbeat");
return heartbeatRecords != null && !heartbeatRecords.isEmpty();
});
// stop connector and clean-up any potential residual heartbeat events
stopConnector((success) -> {
consumeAvailableRecords(r -> {
});
});
}
finally {
TestHelper.disableTableCdc(connection, "heartbeat");
}
}
private void purgeDatabaseLogs() throws SQLException {
TestHelper.disableTableCdc(connection, "tablea");

View File

@ -3061,6 +3061,16 @@ This may result in more change events to be re-sent after a connector restart.
Set this parameter to `0` to not send heartbeat messages at all. +
Disabled by default.
|[[sqlserver-property-heartbeat-action-query]]<<sqlserver-property-heartbeat-action-query, `+heartbeat.action.query+`>>
|No default
|Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. +
+
This is useful for keeping offsets from becoming stale when capturing changes from a low-traffic database. Create a heartbeat table in the low-traffic database, and set this property to a statement that inserts records into that table, for example: +
+
`INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')` +
+
This allows the connector to receive changes from the low-traffic database and acknowledge their LSNs, which prevents offsets from become stale.
|[[sqlserver-property-snapshot-delay-ms]]<<sqlserver-property-snapshot-delay-ms, `+snapshot.delay.ms+`>>
|No default
|An interval in milli-seconds that the connector should wait before taking a snapshot after starting up; +