DBZ-812 Table removed during streaming

This commit is contained in:
Jiri Pechanec 2018-10-04 09:10:22 +02:00
parent 61ff10a771
commit a912d43a51
5 changed files with 165 additions and 62 deletions

View File

@ -1,3 +1,8 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.relational.TableId;

View File

@ -49,6 +49,7 @@ public class SqlServerConnection extends JdbcConnection {
+ "EXEC sys.sp_cdc_disable_db";
private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\n"
+ "EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
private static final String DISABLE_TABLE_CDC = "EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'#', @capture_instance = 'all'";
private static final String CDC_WRAPPERS_DML;
private static final String GET_MAX_LSN = "SELECT sys.fn_cdc_get_max_lsn()";
private static final String LOCK_TABLE = "SELECT * FROM # WITH (TABLOCKX)";
@ -129,6 +130,19 @@ public void enableTableCdc(String name) throws SQLException {
execute(enableCdcForTableStmt, generateWrapperFunctionsStmts);
}
/**
* Disables CDC for a table for which it was enabled before.
*
* @param name
* the name of the table, may not be {@code null}
* @throws SQLException if anything unexpected fails
*/
public void disableTableCdc(String name) throws SQLException {
Objects.requireNonNull(name);
String disableCdcForTableStmt = DISABLE_TABLE_CDC.replace(STATEMENTS_PLACEHOLDER, name);
execute(disableCdcForTableStmt);
}
/**
* @return the current largest log sequence number
*/
@ -322,7 +336,7 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio
cols = rs.next() ? readTableColumns(rs, changeTable.getTableId(), null).stream().map(ColumnEditor::create).collect(Collectors.toList()) : Collections.emptyList();
}
List<String> pkColumnNames = Collections.singletonList("id");
List<String> pkColumnNames = readPrimaryKeyNames(metadata, changeTable.getTableId());
Collections.sort(cols);
return Table.editor()
.tableId(changeTable.getTableId())
@ -330,4 +344,10 @@ public Table getTableSchemaFromTable(ChangeTable changeTable) throws SQLExceptio
.setPrimaryKeyNames(pkColumnNames)
.create();
}
public synchronized void rollback() throws SQLException {
if (isConnected()) {
connection().rollback();
}
}
}

View File

@ -91,6 +91,12 @@ public void start(Configuration config) {
.subset("database.", true);
jdbcConnection = new SqlServerConnection(jdbcConfig);
try {
jdbcConnection.setAutoCommit(false);
}
catch (SQLException e) {
throw new ConnectException(e);
}
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schema = new SqlServerDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);

View File

@ -8,10 +8,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -38,6 +41,8 @@ public class SqlServerStreamingChangeEventSource implements StreamingChangeEvent
private static final int COL_OPERATION = 3;
private static final int COL_DATA = 5;
private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
private final SqlServerConnection connection;
@ -94,75 +99,82 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
if (!connection.listOfNewChangeTables(fromLsn, currentMaxLsn).isEmpty()) {
tablesSlot.set(getCdcTablesToQuery());
}
connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
try {
connection.getChangesForTables(tablesSlot.get(), fromLsn, currentMaxLsn, resultSets -> {
final int tableCount = resultSets.length;
final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount];
final ChangeTable[] tables = tablesSlot.get();
final int tableCount = resultSets.length;
final ChangeTablePointer[] changeTables = new ChangeTablePointer[tableCount];
final ChangeTable[] tables = tablesSlot.get();
for (int i = 0; i < tableCount; i++) {
changeTables[i] = new ChangeTablePointer(tables[i].getTableId(), resultSets[i]);
changeTables[i].next();
}
for (;;) {
ChangeTablePointer tableSmallestLsn = null;
for (int i = 0; i < tableCount; i++) {
final ChangeTablePointer changeTable = changeTables[i];
if (changeTable.isCompleted()) {
changeTables[i] = new ChangeTablePointer(tables[i].getTableId(), resultSets[i]);
changeTables[i].next();
}
for (;;) {
ChangeTablePointer tableSmallestLsn = null;
for (int i = 0; i < tableCount; i++) {
final ChangeTablePointer changeTable = changeTables[i];
if (changeTable.isCompleted()) {
continue;
}
if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) {
tableSmallestLsn = changeTable;
}
}
if (tableSmallestLsn == null) {
// No more LSNs available
break;
}
if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) {
LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart);
tableSmallestLsn.next();
continue;
}
if (tableSmallestLsn == null || changeTable.compareTo(tableSmallestLsn) < 0) {
tableSmallestLsn = changeTable;
}
}
if (tableSmallestLsn == null) {
// No more LSNs available
break;
}
LOGGER.trace("Processing change {}", tableSmallestLsn);
final TableId tableId = tableSmallestLsn.getTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn();
final int operation = tableSmallestLsn.getOperation();
final Object[] data = tableSmallestLsn.getData();
if (tableSmallestLsn.getRowLsn().compareTo(lastProcessedLsnOnStart) <= 0) {
LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", tableSmallestLsn, lastProcessedLsnOnStart);
// UPDATE consists of two consecutive events, first event contains
// the row before it was updated and the second the row after
// it was updated
if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) {
throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
}
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null;
offsetContext.setChangeLsn(rowLsn);
offsetContext.setCommitLsn(commitLsn);
offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn));
dispatcher
.dispatchDataChangeEvent(
tableId,
new SqlServerChangeRecordEmitter(
offsetContext,
operation,
data,
dataNext,
schema.tableFor(tableId),
clock
)
);
tableSmallestLsn.next();
continue;
}
LOGGER.trace("Processing change {}", tableSmallestLsn);
final TableId tableId = tableSmallestLsn.getTableId();
final Lsn commitLsn = tableSmallestLsn.getCommitLsn();
final Lsn rowLsn = tableSmallestLsn.getRowLsn();
final int operation = tableSmallestLsn.getOperation();
final Object[] data = tableSmallestLsn.getData();
// UPDATE consists of two consecutive events, first event contains
// the row before it was updated and the second the row after
// it was updated
if (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) {
if (!tableSmallestLsn.next() || tableSmallestLsn.getOperation() != SqlServerChangeRecordEmitter.OP_UPDATE_AFTER) {
throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
}
}
final Object[] dataNext = (operation == SqlServerChangeRecordEmitter.OP_UPDATE_BEFORE) ? tableSmallestLsn.getData() : null;
offsetContext.setChangeLsn(rowLsn);
offsetContext.setCommitLsn(commitLsn);
offsetContext.setSourceTime(connection.timestampOfLsn(commitLsn));
dispatcher
.dispatchDataChangeEvent(
tableId,
new SqlServerChangeRecordEmitter(
offsetContext,
operation,
data,
dataNext,
schema.tableFor(tableId),
clock
)
);
tableSmallestLsn.next();
}
});
lastProcessedLsn = currentMaxLsn;
});
lastProcessedLsn = currentMaxLsn;
// Terminate the transaction otherwise CDC could not be disabled for tables
connection.rollback();
}
catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
}
}
}
catch (Exception e) {
@ -170,6 +182,18 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
}
private ChangeTable[] processErrorFromChangeTableQuery(SQLException exception, ChangeTable[] currentChangeTables) throws Exception {
final Matcher m = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(exception.getMessage());
if (m.matches()) {
final String captureName = m.group(1);
LOGGER.info("Table is no longer captured with capture instance {}", captureName);
return Arrays.asList(currentChangeTables).stream()
.filter(x -> !x.getCaptureInstance().equals(captureName))
.collect(Collectors.toList()).toArray(new ChangeTable[0]);
}
throw exception;
}
private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
final Set<ChangeTable> cdcEnabledTables = connection.listOfChangeTables();
final Set<ChangeTable> newTables = new HashSet<>();
@ -236,6 +260,10 @@ public Object[] getData() throws SQLException {
public boolean next() throws SQLException {
completed = !resultSet.next();
currentChangeLsn = completed ? Lsn.NULL : Lsn.valueOf(resultSet.getBytes(COL_ROW_LSN));
if (completed) {
LOGGER.trace("Closing result set of change table for table {}", tableId);
resultSet.close();
}
return !completed;
}

View File

@ -102,6 +102,50 @@ public void addTable() throws Exception {
Assertions.assertThat(records.recordsForTopic("server1.dbo.tabled")).hasSize(RECORDS_PER_TABLE);
}
@Test
public void removeTable() throws Exception {
final int RECORDS_PER_TABLE = 5;
final int TABLES = 2;
final int ID_START_1 = 10;
final int ID_START_2 = 100;
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_SCHEMA_ONLY)
.build();
start(SqlServerConnector.class, config);
assertConnectorIsRunning();
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_1 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b')"
);
}
SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).hasSize(RECORDS_PER_TABLE);
// Disable CDC for a table
connection.disableTableCdc("tableb");
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
final int id = ID_START_2 + i;
connection.execute(
"INSERT INTO tablea VALUES(" + id + ", 'a2')"
);
connection.execute(
"INSERT INTO tableb VALUES(" + id + ", 'b2')"
);
}
records = consumeRecordsByTopic(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tablea")).hasSize(RECORDS_PER_TABLE);
Assertions.assertThat(records.recordsForTopic("server1.dbo.tableb")).isNullOrEmpty();
}
private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
}