diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/strategy/AbstractConnectorConnection.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/strategy/AbstractConnectorConnection.java index 6fee48aa3..967a2217e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/strategy/AbstractConnectorConnection.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/strategy/AbstractConnectorConnection.java @@ -28,6 +28,7 @@ import io.debezium.connector.mysql.MySqlSystemVariables; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -491,7 +492,7 @@ protected String getSessionVariableForSslVersion() { protected abstract GtidSet createGtidSet(String gtids); - public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) { + public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { final String gtidSet = ((MySqlOffsetContext) offset).gtidSet(); final String binlogFilename = ((MySqlOffsetContext) offset).getSource().binlogFilename(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java index e616259ec..7f08b1dd2 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -39,6 +39,7 @@ import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Attribute; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; @@ -419,7 +420,7 @@ public Optional getFirstScnInLogs(Duration archiveLogRetention, String arch return Optional.of(Scn.valueOf(oldestScn)); } - public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) { + public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { final Duration archiveLogRetention = ((OracleConnectorConfig) config).getArchiveLogRetention(); final String archiveDestinationName = ((OracleConnectorConfig) config).getArchiveLogDestinationName(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 94b5c4dd1..ebc67b37d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -48,6 +48,7 @@ import io.debezium.pipeline.source.snapshot.incremental.ChunkQueryBuilder; import io.debezium.pipeline.source.snapshot.incremental.RowValueConstructorChunkQueryBuilder; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; import io.debezium.relational.RelationalDatabaseConnectorConfig; @@ -826,7 +827,7 @@ public TableId createTableId(String databaseName, String schemaName, String tabl return new TableId(null, schemaName, tableName); } - public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) { + public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { final Lsn storedLsn = ((PostgresOffsetContext) offset).lastCommitLsn(); final String slotName = ((PostgresConnectorConfig) config).slotName(); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 536e2c930..5cc2508de 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -45,6 +45,7 @@ import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; @@ -109,6 +110,7 @@ public class SqlServerConnection extends JdbcConnection { " FROM ordered_change_tables WHERE ct_sequence = 1"; private static final String GET_NEW_CHANGE_TABLES = "SELECT * FROM [#db].cdc.change_tables WHERE start_lsn BETWEEN ? AND ?"; + private static final String GET_MIN_LSN_FROM_ALL_CHANGE_TABLES = "select min(start_lsn) from [#db].cdc.change_tables"; private static final String OPENING_QUOTING_CHARACTER = "["; private static final String CLOSING_QUOTING_CHARACTER = "]"; @@ -735,13 +737,14 @@ public Optional getCurrentTimestamp() throws SQLException { rs -> rs.next() ? Optional.of(rs.getObject(1, OffsetDateTime.class).toInstant()) : Optional.empty()); } - public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) { + public boolean validateLogPosition(Partition partition, OffsetContext offset, CommonConnectorConfig config) { final Lsn storedLsn = ((SqlServerOffsetContext) offset).getChangePosition().getCommitLsn(); - String oldestFirstChangeQuery = "SELECT TOP 1 [Current LSN] FROM sys.fn_dblog (NULL, NULL) ORDER BY [Current LSN] ASC"; + final String oldestFirstChangeQuery = replaceDatabaseNamePlaceholder(GET_MIN_LSN_FROM_ALL_CHANGE_TABLES, ((SqlServerPartition) partition).getDatabaseName()); try { + final String oldestScn = singleOptionalValue(oldestFirstChangeQuery, rs -> rs.getString(1)); if (oldestScn == null) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index ef63124a3..7472f6aab 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -105,7 +105,7 @@ public ChangeEventSourceCoordinator final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class); - validateAndLoadSchemaHistory(connectorConfig, metadataConnection::validateLogPosition, offsets, schema, + validateAndLoadSchemaHistory(connectorConfig, dataConnection::validateLogPosition, offsets, schema, snapshotterService.getSnapshotter()); taskContext = new SqlServerTaskContext(connectorConfig, schema); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index a73a94451..e0a8a4e4f 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -3260,8 +3260,11 @@ public void shouldOnlyCaptureTableSchemaForIncluded() throws Exception { } private void purgeDatabaseLogs() throws SQLException { - connection.execute("ALTER DATABASE testDB1 SET RECOVERY SIMPLE"); - connection.execute("DBCC SHRINKFILE (testDB1, 1)"); + + TestHelper.disableTableCdc(connection, "tablea"); + TestHelper.disableTableCdc(connection, "tableb"); + + TestHelper.enableTableCdc(connection, "tablea"); } private void shouldStopRetriableRestartsAtConfiguredMaximum(SqlRunnable scenario) throws Exception { diff --git a/debezium-core/src/main/java/io/debezium/function/LogPositionValidator.java b/debezium-core/src/main/java/io/debezium/function/LogPositionValidator.java index c764eca95..958e21bdf 100644 --- a/debezium-core/src/main/java/io/debezium/function/LogPositionValidator.java +++ b/debezium-core/src/main/java/io/debezium/function/LogPositionValidator.java @@ -7,14 +7,16 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.spi.Partition; @FunctionalInterface public interface LogPositionValidator { /** * Validate the stored offset with the position available in the db log. + * @param partition The current stored partition. * @param offsetContext The current stored offset. * @param config Connector configuration. */ - boolean validate(OffsetContext offsetContext, CommonConnectorConfig config); + boolean validate(Partition partition, OffsetContext offsetContext, CommonConnectorConfig config); }