DBZ-7765 Use different query for validating log position for SQLServer

This commit is contained in:
mfvitale 2024-04-10 11:25:40 +02:00 committed by Jiri Pechanec
parent 8d096e965f
commit 717888f6f6
7 changed files with 20 additions and 9 deletions

View File

@ -28,6 +28,7 @@
import io.debezium.connector.mysql.MySqlSystemVariables;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -491,7 +492,7 @@ protected String getSessionVariableForSslVersion() {
protected abstract GtidSet createGtidSet(String gtids);
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
final String gtidSet = ((MySqlOffsetContext) offset).gtidSet();
final String binlogFilename = ((MySqlOffsetContext) offset).getSource().binlogFilename();

View File

@ -39,6 +39,7 @@
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Attribute;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
@ -419,7 +420,7 @@ public Optional<Scn> getFirstScnInLogs(Duration archiveLogRetention, String arch
return Optional.of(Scn.valueOf(oldestScn));
}
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
final Duration archiveLogRetention = ((OracleConnectorConfig) config).getArchiveLogRetention();
final String archiveDestinationName = ((OracleConnectorConfig) config).getArchiveLogDestinationName();

View File

@ -48,6 +48,7 @@
import io.debezium.pipeline.source.snapshot.incremental.ChunkQueryBuilder;
import io.debezium.pipeline.source.snapshot.incremental.RowValueConstructorChunkQueryBuilder;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -826,7 +827,7 @@ public TableId createTableId(String databaseName, String schemaName, String tabl
return new TableId(null, schemaName, tableName);
}
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
final Lsn storedLsn = ((PostgresOffsetContext) offset).lastCommitLsn();
final String slotName = ((PostgresConnectorConfig) config).slotName();

View File

@ -45,6 +45,7 @@
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -109,6 +110,7 @@ public class SqlServerConnection extends JdbcConnection {
" FROM ordered_change_tables WHERE ct_sequence = 1";
private static final String GET_NEW_CHANGE_TABLES = "SELECT * FROM [#db].cdc.change_tables WHERE start_lsn BETWEEN ? AND ?";
private static final String GET_MIN_LSN_FROM_ALL_CHANGE_TABLES = "select min(start_lsn) from [#db].cdc.change_tables";
private static final String OPENING_QUOTING_CHARACTER = "[";
private static final String CLOSING_QUOTING_CHARACTER = "]";
@ -735,13 +737,14 @@ public Optional<Instant> getCurrentTimestamp() throws SQLException {
rs -> rs.next() ? Optional.of(rs.getObject(1, OffsetDateTime.class).toInstant()) : Optional.empty());
}
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) {
final Lsn storedLsn = ((SqlServerOffsetContext) offset).getChangePosition().getCommitLsn();
String oldestFirstChangeQuery = "SELECT TOP 1 [Current LSN] FROM sys.fn_dblog (NULL, NULL) ORDER BY [Current LSN] ASC";
final String oldestFirstChangeQuery = replaceDatabaseNamePlaceholder(GET_MIN_LSN_FROM_ALL_CHANGE_TABLES, ((SqlServerPartition) partition).getDatabaseName());
try {
final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1));
if (oldestScn == null) {

View File

@ -105,7 +105,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
validateAndLoadSchemaHistory(connectorConfig, metadataConnection::validateLogPosition, offsets, schema,
validateAndLoadSchemaHistory(connectorConfig, dataConnection::validateLogPosition, offsets, schema,
snapshotterService.getSnapshotter());
taskContext = new SqlServerTaskContext(connectorConfig, schema);

View File

@ -3260,8 +3260,11 @@ public void shouldOnlyCaptureTableSchemaForIncluded() throws Exception {
}
private void purgeDatabaseLogs() throws SQLException {
connection.execute("ALTER DATABASE testDB1 SET RECOVERY SIMPLE");
connection.execute("DBCC SHRINKFILE (testDB1, 1)");
TestHelper.disableTableCdc(connection, "tablea");
TestHelper.disableTableCdc(connection, "tableb");
TestHelper.enableTableCdc(connection, "tablea");
}
private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario) throws Exception {

View File

@ -7,14 +7,16 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
@FunctionalInterface
public interface LogPositionValidator {
/**
* Validate the stored offset with the position available in the db log.
* @param partition The current stored partition.
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
boolean validate(Partition partition, OffsetContext offsetContext, CommonConnectorConfig config);
}