DBZ-812 Table removed during streaming
This commit is contained in:
parent
cfc5acc8da
commit
25bc23a14e
@ -1,3 +1,8 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.sqlserver;
|
||||
|
||||
import io.debezium.relational.TableId;
|
||||
|
@ -49,6 +49,7 @@ public class SqlServerConnection extends JdbcConnection {
|
||||
+ "EXEC sys.sp_cdc_disable_db";
|
||||
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n"
|
||||
+ "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
|
||||
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
|
||||
private static final String CDC_WRAPPERS_DML;
|
||||
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
|
||||
private static final String LOCK_TABLE = "SELECT * FROM # WITH (TABLOCKX)";
|
||||
@ -129,6 +130,19 @@ public void enableTableCdc(String name) throws SQLException {
|
||||
execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Disables CDC for a table for which it was enabled before.
|
||||
*
|
||||
* @param name
|
||||
* the name of the table, may not be {@code null}
|
||||
* @throws SQLException if anything unexpected fails
|
||||
*/
|
||||
public void disableTableCdc(String name) throws SQLException {
|
||||
Objects.requireNonNull(name);
|
||||
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
|
||||
execute(disableCdcForTableStmt);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the current largest log sequence number
|
||||
*/
|
||||
@ -322,7 +336,7 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio
|
||||
cols = rs.next() ? readTableColumns(rs, changeTable.getTableId(), null).stream().map(ColumnEditor::create).collect(Collectors.toList()) : Collections.emptyList();
|
||||
}
|
||||
|
||||
List<String> pkColumnNames = Collections.singletonList("id");
|
||||
List<String> pkColumnNames = readPrimaryKeyNames(metadata, changeTable.getTableId());
|
||||
Collections.sort(cols);
|
||||
return Table.editor()
|
||||
.tableId(changeTable.getTableId())
|
||||
@ -330,4 +344,10 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio
|
||||
.setPrimaryKeyNames(pkColumnNames)
|
||||
.create();
|
||||
}
|
||||
|
||||
public synchronized void rollback() throws SQLException {
|
||||
if (isConnected()) {
|
||||
connection().rollback();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -91,6 +91,12 @@ public void start(Configuration config) {
|
||||
.subset("database.", true);
|
||||
|
||||
jdbcConnection = new SqlServerConnection(jdbcConfig);
|
||||
try {
|
||||
jdbcConnection.setAutoCommit(false);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
|
||||
|
||||
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);
|
||||
|
@ -8,10 +8,13 @@
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
@ -38,6 +41,8 @@ public class SqlServerStreamingChangeEventSource implements StreamingChangeEvent
|
||||
private static final int COL_OPERATION = 3;
|
||||
private static final int COL_DATA = 5;
|
||||
|
||||
private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
|
||||
|
||||
private final SqlServerConnection connection;
|
||||
@ -94,6 +99,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
||||
if (!connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
|
||||
tablesSlot.set(getCdcTablesToQuery());
|
||||
}
|
||||
try {
|
||||
connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
|
||||
|
||||
final int tableCount = resultSets.length;
|
||||
@ -163,6 +169,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
||||
}
|
||||
});
|
||||
lastProcessedLsn = currentMaxLsn;
|
||||
// Terminate the transaction otherwise CDC could not be disabled for tables
|
||||
connection.rollback();
|
||||
}
|
||||
catch (SQLException e) {
|
||||
tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
@ -170,6 +182,18 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
|
||||
}
|
||||
}
|
||||
|
||||
private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception {
|
||||
final Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
|
||||
if (m.matches()) {
|
||||
final String captureName = m.group(1);
|
||||
LOGGER.info("Table is no longer captured with capture instance {}", captureName);
|
||||
return Arrays.asList(currentChangeTables).stream()
|
||||
.filter(x -> !x.getCaptureInstance().equals(captureName))
|
||||
.collect(Collectors.toList()).toArray(new ChangeTable[0]);
|
||||
}
|
||||
throw exception;
|
||||
}
|
||||
|
||||
private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
|
||||
final Set<ChangeTable> cdcEnabledTables = connection.listOfChangeTables();
|
||||
final Set<ChangeTable> newTables = new HashSet<>();
|
||||
@ -236,6 +260,10 @@ public Object[] getData() throws SQLException {
|
||||
public boolean next() throws SQLException {
|
||||
completed = !resultSet.next();
|
||||
currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN));
|
||||
if (completed) {
|
||||
LOGGER.trace("Closing result set of change table for table {}", tableId);
|
||||
resultSet.close();
|
||||
}
|
||||
return !completed;
|
||||
}
|
||||
|
||||
|
@ -102,6 +102,50 @@ public void addTable() throws Exception {
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tabled")).hasSize(RECORDS_PER_TABLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void removeTable() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
final int TABLES = 2;
|
||||
final int ID_START_1 = 10;
|
||||
final int ID_START_2 = 100;
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START_1 + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')"
|
||||
);
|
||||
}
|
||||
|
||||
SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
|
||||
|
||||
// Disable CDC for a table
|
||||
connection.disableTableCdc("tableb");
|
||||
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START_2 + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a2')"
|
||||
);
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b2')"
|
||||
);
|
||||
}
|
||||
records = consumeRecordsByTopic(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).isNullOrEmpty();
|
||||
}
|
||||
|
||||
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
|
||||
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user