DBZ-4783: Manage change tables per partition

This commit is contained in:
Mike Kamornikov 2021-10-27 11:11:01 -07:00 committed by Gunnar Morling
parent 062c433a7e
commit 37a67d5dcf
2 changed files with 22 additions and 17 deletions

View File

@ -10,6 +10,7 @@
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -42,7 +43,7 @@ public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChange
private final SqlServerConnectorConfig connectorConfig;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema sqlServerDatabaseSchema;
private Map<TableId, SqlServerChangeTable> changeTables;
private final Map<SqlServerPartition, Map<TableId, SqlServerChangeTable>> changeTablesByPartition = new HashMap<>();
public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerConnection jdbcConnection,
SqlServerDatabaseSchema schema, EventDispatcher<SqlServerPartition, TableId> dispatcher, Clock clock,
@ -179,6 +180,15 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
.map(TableId::schema)
.collect(Collectors.toSet());
// Save changeTables for sql select later.
final Map<TableId, SqlServerChangeTable> changeTables = jdbcConnection
.getChangeTables(snapshotContext.partition.getDatabaseName())
.stream()
.collect(Collectors.toMap(SqlServerChangeTable::getSourceTableId, changeTable -> changeTable,
(changeTable1, changeTable2) -> changeTable1.getStartLsn().compareTo(changeTable2.getStartLsn()) > 0
? changeTable1
: changeTable2));
// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
@ -196,13 +206,6 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
null,
false);
// Save changeTables for sql select later.
changeTables = jdbcConnection.getChangeTables(snapshotContext.partition.getDatabaseName()).stream()
.collect(Collectors.toMap(SqlServerChangeTable::getSourceTableId, changeTable -> changeTable,
(changeTable1, changeTable2) -> changeTable1.getStartLsn().compareTo(changeTable2.getStartLsn()) > 0
? changeTable1
: changeTable2));
// Update table schemas to only include columns that are also included in the cdc tables.
changeTables.forEach((tableId, sqlServerChangeTable) -> {
Table sourceTable = snapshotContext.tables.forTable(tableId);
@ -219,6 +222,8 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
}
});
}
changeTablesByPartition.put(snapshotContext.partition, changeTables);
}
@Override
@ -266,18 +271,18 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<SqlServer
@Override
protected String enhanceOverriddenSelect(RelationalSnapshotContext<SqlServerPartition, SqlServerOffsetContext> snapshotContext,
String overriddenSelect, TableId tableId) {
String snapshotSelectColumns = getPreparedColumnNames(sqlServerDatabaseSchema.tableFor(tableId)).stream()
String snapshotSelectColumns = getPreparedColumnNames(snapshotContext.partition, sqlServerDatabaseSchema.tableFor(tableId)).stream()
.collect(Collectors.joining(", "));
return overriddenSelect.replaceAll(SELECT_ALL_PATTERN.pattern(), snapshotSelectColumns);
}
@Override
protected boolean additionalColumnFilter(TableId tableId, String columnName) {
return filterChangeTableColumns(tableId, columnName);
protected boolean additionalColumnFilter(SqlServerPartition partition, TableId tableId, String columnName) {
return filterChangeTableColumns(partition, tableId, columnName);
}
private boolean filterChangeTableColumns(TableId tableId, String columnName) {
SqlServerChangeTable changeTable = changeTables.get(tableId);
private boolean filterChangeTableColumns(SqlServerPartition partition, TableId tableId, String columnName) {
SqlServerChangeTable changeTable = changeTablesByPartition.get(partition).get(tableId);
if (changeTable != null) {
return changeTable.getCapturedColumns().contains(columnName);
}

View File

@ -441,7 +441,7 @@ private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O>
return Optional.of(enhanceOverriddenSelect(snapshotContext, overriddenSelect, tableId));
}
List<String> columns = getPreparedColumnNames(schema.tableFor(tableId));
List<String> columns = getPreparedColumnNames(snapshotContext.partition, schema.tableFor(tableId));
return getSnapshotSelect(snapshotContext, tableId, columns);
}
@ -453,10 +453,10 @@ private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O>
*
* @return list of snapshot select columns
*/
protected List<String> getPreparedColumnNames(Table table) {
protected List<String> getPreparedColumnNames(P partition, Table table) {
List<String> columnNames = table.retrieveColumnNames()
.stream()
.filter(columnName -> additionalColumnFilter(table.id(), columnName))
.filter(columnName -> additionalColumnFilter(partition, table.id(), columnName))
.filter(columnName -> connectorConfig.getColumnFilter().matches(table.id().catalog(), table.id().schema(), table.id().table(), columnName))
.map(columnName -> jdbcConnection.quotedColumnIdString(columnName))
.collect(Collectors.toList());
@ -476,7 +476,7 @@ protected List<String> getPreparedColumnNames(Table table) {
/**
* Additional filter handling for preparing column names for snapshot select
*/
protected boolean additionalColumnFilter(TableId tableId, String columnName) {
protected boolean additionalColumnFilter(P partition, TableId tableId, String columnName) {
return true;
}