diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/ChangeTable.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/ChangeTable.java index a805691c3..c76b9febf 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/ChangeTable.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/ChangeTable.java @@ -1,3 +1,8 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ package io.debezium.connector.sqlserver; import io.debezium.relational.TableId; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java index 69b8eeff4..3b4ca06b5 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnection.java @@ -49,6 +49,7 @@ public class SqlServerConnection extends JdbcConnection { + "EXEC sys.sp_cdc_disable_db"; private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n" + "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0"; + private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'"; private static final String CDC_WRAPPERS_DML; private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()"; private static final String LOCK_TABLE = "SELECT * FROM # WITH (TABLOCKX)"; @@ -129,6 +130,19 @@ public void enableTableCdc(String name) throws SQLException { execute(enableCdcForTableStmt, generateWrapperFunctionsStmts); } + /** + * Disables CDC for a table for which it was enabled before. + * + * @param name + * the name of the table, may not be {@code null} + * @throws SQLException if anything unexpected fails + */ + public void disableTableCdc(String name) throws SQLException { + Objects.requireNonNull(name); + String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name); + execute(disableCdcForTableStmt); + } + /** * @return the current largest log sequence number */ @@ -322,7 +336,7 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio cols = rs.next() ? readTableColumns(rs, changeTable.getTableId(), null).stream().map(ColumnEditor::create).collect(Collectors.toList()) : Collections.emptyList(); } - List pkColumnNames = Collections.singletonList("id"); + List pkColumnNames = readPrimaryKeyNames(metadata, changeTable.getTableId()); Collections.sort(cols); return Table.editor() .tableId(changeTable.getTableId()) @@ -330,4 +344,10 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio .setPrimaryKeyNames(pkColumnNames) .create(); } + + public synchronized void rollback() throws SQLException { + if (isConnected()) { + connection().rollback(); + } + } } diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index f10f35258..8e09b6dae 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -91,6 +91,12 @@ public void start(Configuration config) { .subset("database.", true); jdbcConnection = new SqlServerConnection(jdbcConfig); + try { + jdbcConnection.setAutoCommit(false); + } + catch (SQLException e) { + throw new ConnectException(e); + } final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index a6f5f1e7f..209633eef 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -8,10 +8,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; +import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; @@ -38,6 +41,8 @@ public class SqlServerStreamingChangeEventSource implements StreamingChangeEvent private static final int COL_OPERATION = 3; private static final int COL_DATA = 5; + private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\."); + private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class); private final SqlServerConnection connection; @@ -94,75 +99,82 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio if (!connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) { tablesSlot.set(getCdcTablesToQuery()); } - connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> { + try { + connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> { - final int tableCount = resultSets.length; - final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount]; - final ChangeTable[] tables = tablesSlot.get(); + final int tableCount = resultSets.length; + final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount]; + final ChangeTable[] tables = tablesSlot.get(); - for (int i = 0; i < tableCount; i++) { - changeTables[i] = new ChangeTablePointer(tables[i].getTableId(), resultSets[i]); - changeTables[i].next(); - } - - for (;;) { - ChangeTablePointer tableSmallestLsn = null; for (int i = 0; i < tableCount; i++) { - final ChangeTablePointer changeTable = changeTables[i]; - if (changeTable.isCompleted()) { + changeTables[i] = new ChangeTablePointer(tables[i].getTableId(), resultSets[i]); + changeTables[i].next(); + } + + for (;;) { + ChangeTablePointer tableSmallestLsn = null; + for (int i = 0; i < tableCount; i++) { + final ChangeTablePointer changeTable = changeTables[i]; + if (changeTable.isCompleted()) { + continue; + } + if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) { + tableSmallestLsn = changeTable; + } + } + if (tableSmallestLsn == null) { + // No more LSNs available + break; + } + + if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) { + LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart); + tableSmallestLsn.next(); continue; } - if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) { - tableSmallestLsn = changeTable; - } - } - if (tableSmallestLsn == null) { - // No more LSNs available - break; - } + LOGGER.trace("Processing change {}", tableSmallestLsn); + final TableId tableId = tableSmallestLsn.getTableId(); + final Lsn commitLsn = tableSmallestLsn.getCommitLsn(); + final Lsn rowLsn = tableSmallestLsn.getRowLsn(); + final int operation = tableSmallestLsn.getOperation(); + final Object[] data = tableSmallestLsn.getData(); - if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) { - LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart); + // UPDATE consists of two consecutive events, first event contains + // the row before it was updated and the second the row after + // it was updated + if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { + if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { + throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); + } + } + final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null; + + offsetContext.setChangeLsn(rowLsn); + offsetContext.setCommitLsn(commitLsn); + offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn)); + + dispatcher + .dispatchDataChangeEvent( + tableId, + new SqlServerChangeRecordEmitter( + offsetContext, + operation, + data, + dataNext, + schema.tableFor(tableId), + clock + ) + ); tableSmallestLsn.next(); - continue; } - LOGGER.trace("Processing change {}", tableSmallestLsn); - final TableId tableId = tableSmallestLsn.getTableId(); - final Lsn commitLsn = tableSmallestLsn.getCommitLsn(); - final Lsn rowLsn = tableSmallestLsn.getRowLsn(); - final int operation = tableSmallestLsn.getOperation(); - final Object[] data = tableSmallestLsn.getData(); - - // UPDATE consists of two consecutive events, first event contains - // the row before it was updated and the second the row after - // it was updated - if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) { - if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) { - throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN."); - } - } - final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null; - - offsetContext.setChangeLsn(rowLsn); - offsetContext.setCommitLsn(commitLsn); - offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn)); - - dispatcher - .dispatchDataChangeEvent( - tableId, - new SqlServerChangeRecordEmitter( - offsetContext, - operation, - data, - dataNext, - schema.tableFor(tableId), - clock - ) - ); - tableSmallestLsn.next(); - } - }); - lastProcessedLsn = currentMaxLsn; + }); + lastProcessedLsn = currentMaxLsn; + // Terminate the transaction otherwise CDC could not be disabled for tables + connection.rollback(); + } + catch (SQLException e) { + tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get())); + } } } catch (Exception e) { @@ -170,6 +182,18 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio } } + private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception { + final Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage()); + if (m.matches()) { + final String captureName = m.group(1); + LOGGER.info("Table is no longer captured with capture instance {}", captureName); + return Arrays.asList(currentChangeTables).stream() + .filter(x -> !x.getCaptureInstance().equals(captureName)) + .collect(Collectors.toList()).toArray(new ChangeTable[0]); + } + throw exception; + } + private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException { final Set cdcEnabledTables = connection.listOfChangeTables(); final Set newTables = new HashSet<>(); @@ -236,6 +260,10 @@ public Object[] getData() throws SQLException { public boolean next() throws SQLException { completed = !resultSet.next(); currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN)); + if (completed) { + LOGGER.trace("Closing result set of change table for table {}", tableId); + resultSet.close(); + } return !completed; } diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java index bcc802024..2ace22f6b 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.java @@ -102,6 +102,50 @@ public void addTable() throws Exception { Assertions.assertThat(records.recordsForTopic("server1.dbo.tabled")).hasSize(RECORDS_PER_TABLE); } + @Test + public void removeTable() throws Exception { + final int RECORDS_PER_TABLE = 5; + final int TABLES = 2; + final int ID_START_1 = 10; + final int ID_START_2 = 100; + final Configuration config = TestHelper.defaultConfig() + .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY) + .build(); + + start(SqlServerConnector.class, config); + assertConnectorIsRunning(); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START_1 + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a')" + ); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b')" + ); + } + + SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES); + Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE); + + // Disable CDC for a table + connection.disableTableCdc("tableb"); + + for (int i = 0; i < RECORDS_PER_TABLE; i++) { + final int id = ID_START_2 + i; + connection.execute( + "INSERT INTO tablea VALUES(" + id + ", 'a2')" + ); + connection.execute( + "INSERT INTO tableb VALUES(" + id + ", 'b2')" + ); + } + records = consumeRecordsByTopic(RECORDS_PER_TABLE); + Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE); + Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).isNullOrEmpty(); + } + private void assertRecord(Struct record, List expected) { expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record)); }