DBZ-773 Moving management of Tables to RelationalDatabaseSchema

This commit is contained in:
Gunnar Morling 2018-06-28 12:30:04 +02:00
parent bbfbdf6fab
commit c4c8cbc3ab
8 changed files with 66 additions and 82 deletions

View File

@ -28,7 +28,6 @@
import io.debezium.relational.SystemVariables;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
@ -70,9 +69,7 @@ public class MySqlSchema extends RelationalDatabaseSchema {
private final DatabaseHistory dbHistory;
private final DdlChanges ddlChanges;
private final HistoryRecordComparator historyComparator;
private Tables tables;
private final boolean skipUnparseableDDL;
private final boolean tableIdCaseInsensitive;
private final boolean storeOnlyMonitoredTablesDdl;
/**
@ -101,8 +98,6 @@ public MySqlSchema(MySqlConnectorConfig configuration, String serverName, Predic
Configuration config = configuration.getConfig();
this.filters = new Filters(config);
this.tables = new Tables(tableIdCaseInsensitive);
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.ddlParser = configuration.getDdlParsingMode().getNewParserInstance(getValueConverters(config));
this.ddlChanges = this.ddlParser.getDdlChanges();
@ -177,24 +172,13 @@ public Filters filters() {
return filters;
}
/**
* Get all of the table definitions for all database tables as defined by
* {@link #applyDdl(SourceInfo, String, String, DatabaseStatementStringConsumer) applied DDL statements}, excluding those
* that have been excluded by the {@link #filters() filters}.
*
* @return the table definitions; never null
*/
public Tables tables() {
return tables.subset(filters.tableFilter());
}
/**
* Get all table names for all databases that are monitored whose events are captured by Debezium
*
* @return the array with the table names
*/
public String[] monitoredTablesAsStringArray() {
final Collection<TableId> tables = tables().tableIds();
final Collection<TableId> tables = tableIds();
String[] ret = new String[tables.size()];
int i = 0;
for (TableId table: tables) {
@ -203,19 +187,6 @@ public String[] monitoredTablesAsStringArray() {
return ret;
}
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and is
* included by the {@link #filters() filter}.
*
* @param id the fully-qualified table identifier; may be null
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters
*/
@Override
public Table tableFor(TableId id) {
return isTableMonitored(id) ? tables.forTable(id) : null;
}
/**
* Decide whether events should be captured for a given table
*
@ -271,8 +242,8 @@ protected void appendCreateTableStatement(StringBuilder sb, Table table) {
* offset} at which the database schemas are to reflect; may not be null
*/
public void loadHistory(SourceInfo startingPoint) {
tables = new Tables(tableIdCaseInsensitive);
dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables, ddlParser);
tables().clear();
dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables(), ddlParser);
refreshSchemas();
}
@ -296,8 +267,8 @@ public void intializeHistoryStorage() {
protected void refreshSchemas() {
clearSchemas();
// Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(id -> {
Table table = this.tables.forTable(id);
this.tableIds().forEach(id -> {
Table table = this.tableFor(id);
buildAndRegisterSchema(table);
});
}
@ -325,7 +296,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
try {
this.ddlChanges.reset();
this.ddlParser.setCurrentSchema(databaseName);
this.ddlParser.parse(ddlStatements, tables);
this.ddlParser.parse(ddlStatements, tables());
} catch (ParsingException | MultipleParsingExceptions e) {
if (skipUnparseableDDL) {
logger.warn("Ignoring unparseable DDL statement '{}': {}", ddlStatements);
@ -333,7 +304,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
throw e;
}
} finally {
changes = tables.drainChanges();
changes = tables().drainChanges();
// No need to send schema events or store DDL if no table has changed
// Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here
// (which writes to the public schema change topic); if required, a second option could be added
@ -383,7 +354,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// Figure out what changed ...
changes.forEach(tableId -> {
Table table = tables.forTable(tableId);
Table table = tableFor(tableId);
if (table == null) { // removed
removeSchema(tableId);
}

View File

@ -152,7 +152,7 @@ public void clear() {
public void regenerate() {
clear();
AtomicInteger nextTableNumber = new AtomicInteger(0);
Set<TableId> tableIds = schema.tables().tableIds();
Set<TableId> tableIds = schema.tableIds();
logger.debug("Regenerating converters for {} tables", tableIds.size());
tableIds.forEach(id -> {
assign(nextTableNumber.incrementAndGet(), id);

View File

@ -409,7 +409,7 @@ protected void execute() {
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
List<TableId> allTableIds = new ArrayList<>(schema.tables().tableIds());
List<TableId> allTableIds = new ArrayList<>(schema.tableIds());
allTableIds.addAll(tableIds);
allTableIds.stream()
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
@ -418,7 +418,7 @@ protected void execute() {
this::enqueueSchemaChanges));
// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
schema.tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(readableDatabaseNames::contains))
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped
.forEach(missingDbName -> schema.applyDdl(source, missingDbName,

View File

@ -171,7 +171,6 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeAllDatabases() throws In
protected void assertTableIncluded(String fullyQualifiedTableName) {
TableId tableId = TableId.parse(fullyQualifiedTableName);
assertThat(mysql.tables().forTable(tableId)).isNotNull();
TableSchema tableSchema = mysql.schemaFor(tableId);
assertThat(tableSchema).isNotNull();
assertThat(tableSchema.keySchema().name()).isEqualTo(SchemaNameAdjuster.validFullname(SERVER_NAME + "." + fullyQualifiedTableName + ".Key"));
@ -180,15 +179,14 @@ protected void assertTableIncluded(String fullyQualifiedTableName) {
protected void assertTableExcluded(String fullyQualifiedTableName) {
TableId tableId = TableId.parse(fullyQualifiedTableName);
assertThat(mysql.tables().forTable(tableId)).isNull();
assertThat(mysql.schemaFor(tableId)).isNull();
}
protected void assertNoTablesExistForDatabase(String dbName) {
assertThat(mysql.tables().tableIds().stream().filter(id->id.catalog().equals(dbName)).count()).isEqualTo(0);
assertThat(mysql.tableIds().stream().filter(id->id.catalog().equals(dbName)).count()).isEqualTo(0);
}
protected void assertTablesExistForDatabase(String dbName) {
assertThat(mysql.tables().tableIds().stream().filter(id->id.catalog().equals(dbName)).count()).isGreaterThan(0);
assertThat(mysql.tableIds().stream().filter(id->id.catalog().equals(dbName)).count()).isGreaterThan(0);
}
protected void assertHistoryRecorded() {
@ -196,22 +194,22 @@ protected void assertHistoryRecorded() {
duplicate.loadHistory(source);
// Make sure table is defined in each ...
assertThat(duplicate.tables()).isEqualTo(mysql.tables());
assertThat(duplicate.tableIds()).isEqualTo(mysql.tableIds());
for (int i = 0; i != 2; ++i) {
duplicate.tables().tableIds().forEach(tableId -> {
duplicate.tableIds().forEach(tableId -> {
TableSchema dupSchema = duplicate.schemaFor(tableId);
TableSchema schema = mysql.schemaFor(tableId);
assertThat(schema).isEqualTo(dupSchema);
Table dupTable = duplicate.tables().forTable(tableId);
Table table = mysql.tables().forTable(tableId);
Table dupTable = duplicate.tableFor(tableId);
Table table = mysql.tableFor(tableId);
assertThat(table).isEqualTo(dupTable);
});
mysql.tables().tableIds().forEach(tableId -> {
mysql.tableIds().forEach(tableId -> {
TableSchema dupSchema = duplicate.schemaFor(tableId);
TableSchema schema = mysql.schemaFor(tableId);
assertThat(schema).isEqualTo(dupSchema);
Table dupTable = duplicate.tables().forTable(tableId);
Table table = mysql.tables().forTable(tableId);
Table dupTable = duplicate.tableFor(tableId);
Table table = mysql.tableFor(tableId);
assertThat(table).isEqualTo(dupTable);
});
duplicate.refreshSchemas();

View File

@ -9,7 +9,6 @@
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
@ -23,7 +22,6 @@
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.util.SchemaNameAdjuster;
@ -43,7 +41,6 @@ public class PostgresSchema extends RelationalDatabaseSchema {
private final static Logger LOGGER = LoggerFactory.getLogger(PostgresSchema.class);
private final Filters filters;
private final Tables tables;
private final SchemaNameAdjuster schemaNameAdjuster;
private Map<String, Integer> typeInfo;
@ -60,7 +57,6 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
null, getTableSchemaBuilder(config, typeRegistry), false);
this.filters = new Filters(config);
this.tables = new Tables();
this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.typeRegistry = typeRegistry;
}
@ -86,10 +82,10 @@ protected PostgresSchema refresh(PostgresConnection connection, boolean printRep
}
// read all the information from the DB
connection.readSchema(tables, null, null, filters.tableNameFilter(), null, true);
connection.readSchema(tables(), null, null, filters.tableNameFilter(), null, true);
if (printReplicaIdentityInfo) {
// print out all the replica identity info
tables.tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId));
tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId));
}
// and then refresh the schemas
refreshSchemas();
@ -120,7 +116,7 @@ protected void refresh(PostgresConnection connection, TableId tableId) throws SQ
// we expect the refreshed table to be there
assert temp.size() == 1;
// overwrite (add or update) or views of the tables
tables.overwriteTable(temp.forTable(tableId));
tables().overwriteTable(temp.forTable(tableId));
// and refresh the schema
refreshSchema(tableId);
}
@ -132,7 +128,7 @@ protected void refresh(PostgresConnection connection, TableId tableId) throws SQ
*/
protected void refresh(Table table) {
// overwrite (add or update) or views of the tables
tables.overwriteTable(table);
tables().overwriteTable(table);
// and refresh the schema
refreshSchema(table.id());
}
@ -146,19 +142,6 @@ public Filters filters() {
return filters;
}
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and is
* included by the {@link #filters() filter}.
*
* @param id the fully-qualified table identifier; may be null
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters
*/
@Override
public Table tableFor(TableId id) {
return filters.tableFilter().test(id) ? tables.forTable(id) : null;
}
protected String adjustSchemaName(String name) {
return this.schemaNameAdjuster.adjust(name);
}
@ -175,10 +158,6 @@ protected int columnTypeNameToJdbcTypeId(String localTypeName) {
return typeInfo.get(localTypeName);
}
protected Set<TableId> tables() {
return tables.tableIds();
}
/**
* Discard any currently-cached schemas and rebuild them using the filters.
*/
@ -186,14 +165,14 @@ protected void refreshSchemas() {
clearSchemas();
// Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(this::refreshSchema);
tableIds().forEach(this::refreshSchema);
}
private void refreshSchema(TableId id) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("refreshing DB schema for table '{}'", id);
}
Table table = this.tables.forTable(id);
Table table = tableFor(id);
buildAndRegisterSchema(table);
}

View File

@ -154,7 +154,7 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
// we're locking in SHARE UPDATE EXCLUSIVE MODE to avoid concurrent schema changes while we're taking the snapshot
// this does not prevent writes to the table, but prevents changes to the table's schema....
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
schema.tables().forEach(tableId -> statements.append("LOCK TABLE ")
schema.tableIds().forEach(tableId -> statements.append("LOCK TABLE ")
.append(tableId.toDoubleQuotedString())
.append(" IN SHARE UPDATE EXCLUSIVE MODE;")
.append(lineSeparator));
@ -178,7 +178,7 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
AtomicInteger rowsCounter = new AtomicInteger(0);
final Map<TableId, String> selectOverrides = getSnapshotSelectOverridesByTable();
for(TableId tableId : schema.tables()) {
for(TableId tableId : schema.tableIds()) {
if (schema.isFilteredOut(tableId)) {
logger.info("\t table '{}' is filtered out, ignoring", tableId);
continue;

View File

@ -5,6 +5,7 @@
*/
package io.debezium.relational;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
@ -30,6 +31,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId
private final String schemaPrefix;
private final SchemasByTableId schemasByTableId;
private final Tables tables;
protected RelationalDatabaseSchema(String serverName, TopicSelector<TableId> topicSelector,
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnMappers,
@ -43,6 +45,7 @@ protected RelationalDatabaseSchema(String serverName, TopicSelector<TableId> top
this.schemaPrefix = getSchemaPrefix(serverName);
this.schemasByTableId = new SchemasByTableId(tableIdCaseInsensitive);
this.tables = new Tables(tableIdCaseInsensitive);
}
private static String getSchemaPrefix(String serverName) {
@ -59,6 +62,14 @@ private static String getSchemaPrefix(String serverName) {
public void close() {
}
/**
* Returns the set of table ids included in the current filter configuration.
*/
public Set<TableId> tableIds() {
// TODO that filtering should really be done once upon insertion
return tables.subset(tableFilter).tableIds();
}
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and
* is included by the filter configuration.
@ -75,8 +86,21 @@ public TableSchema schemaFor(TableId id) {
return schemasByTableId.get(id);
}
// TODO have single implementation here
public abstract Table tableFor(TableId id);
/**
* Get the {@link Table} meta-data for the table with the given identifier, if that table exists and is
* included by the filter configuration
*
* @param id the table identifier; may be null
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters
*/
public Table tableFor(TableId id) {
return tableFilter.test(id) ? tables.forTable(id) : null;
}
protected Tables tables() {
return tables;
}
protected void clearSchemas() {
schemasByTableId.clear();
@ -97,6 +121,7 @@ private String getEnvelopeSchemaName(Table table) {
return topicSelector.topicNameFor(table.id()) + ".Envelope";
}
/**
* A map of schemas by table id. Table names are stored lower-case if required as per the config.
*/

View File

@ -108,6 +108,13 @@ protected Tables(Tables other, boolean tableIdCaseInsensitive) {
this.tablesByTableId.putAll(other.tablesByTableId);
}
public void clear() {
lock.write(() -> {
tablesByTableId.clear();
changes.clear();
});
}
@Override
public Tables clone() {
return new Tables(this, tableIdCaseInsensitive);
@ -404,6 +411,10 @@ Set<Map.Entry<TableId, TableImpl>> entrySet() {
return values.entrySet();
}
void clear() {
values.clear();
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}