DBZ-773 Moving management of table schemas to RelationalDatabaseSchema

This commit is contained in:
Gunnar Morling 2018-06-27 16:38:40 +02:00
parent 142e68e060
commit bbfbdf6fab
20 changed files with 309 additions and 194 deletions

View File

@ -8,8 +8,6 @@
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate; import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -26,6 +24,7 @@
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.SystemVariables; import io.debezium.relational.SystemVariables;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -61,20 +60,15 @@
* @author Randall Hauch * @author Randall Hauch
*/ */
@NotThreadSafe @NotThreadSafe
public class MySqlSchema { public class MySqlSchema extends RelationalDatabaseSchema {
private final static Logger logger = LoggerFactory.getLogger(MySqlSchema.class);
private final Logger logger = LoggerFactory.getLogger(getClass());
private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger);
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES"); private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final DdlParser ddlParser; private final DdlParser ddlParser;
private final TopicSelector topicSelector;
private final SchemasByTableId tableSchemaByTableId;
private final Filters filters; private final Filters filters;
private final DatabaseHistory dbHistory; private final DatabaseHistory dbHistory;
private final TableSchemaBuilder schemaBuilder;
private final DdlChanges ddlChanges; private final DdlChanges ddlChanges;
private final String serverName;
private final String schemaPrefix;
private final HistoryRecordComparator historyComparator; private final HistoryRecordComparator historyComparator;
private Tables tables; private Tables tables;
private final boolean skipUnparseableDDL; private final boolean skipUnparseableDDL;
@ -91,39 +85,28 @@ public class MySqlSchema {
* may be null if not needed * may be null if not needed
* @param tableIdCaseInsensitive true if table lookup ignores letter case * @param tableIdCaseInsensitive true if table lookup ignores letter case
*/ */
public MySqlSchema(MySqlConnectorConfig configuration, String serverName, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, TopicSelector topicSelector) { public MySqlSchema(MySqlConnectorConfig configuration, String serverName, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, MySqlTopicSelector topicSelector) {
super(
serverName,
topicSelector,
new Filters(configuration.getConfig()).tableFilter(),
new Filters(configuration.getConfig()).columnFilter(),
new Filters(configuration.getConfig()).columnMappers(),
new TableSchemaBuilder(
getValueConverters(configuration.getConfig()), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)
,
tableIdCaseInsensitive
);
Configuration config = configuration.getConfig(); Configuration config = configuration.getConfig();
this.filters = new Filters(config); this.filters = new Filters(config);
this.tables = new Tables(tableIdCaseInsensitive); this.tables = new Tables(tableIdCaseInsensitive);
this.topicSelector = topicSelector;
this.tableIdCaseInsensitive = tableIdCaseInsensitive; this.tableIdCaseInsensitive = tableIdCaseInsensitive;
// Use MySQL-specific converters and schemas for values ... this.ddlParser = configuration.getDdlParsingMode().getNewParserInstance(getValueConverters(config));
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
String decimalHandlingModeStr = config.getString(MySqlConnectorConfig.DECIMAL_HANDLING_MODE);
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
String bigIntUnsignedHandlingModeStr = config.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = BigIntUnsignedHandlingMode.parse(bigIntUnsignedHandlingModeStr);
BigIntUnsignedMode bigIntUnsignedMode = bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode);
this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameAdjuster, SourceInfo.SCHEMA);
this.ddlParser = configuration.getDdlParsingMode().getNewParserInstance(valueConverters);
this.ddlChanges = this.ddlParser.getDdlChanges(); this.ddlChanges = this.ddlParser.getDdlChanges();
// Set up the server name and schema prefix ...
if (serverName != null) serverName = serverName.trim();
this.serverName = serverName;
if (this.serverName == null || serverName.isEmpty()) {
this.schemaPrefix = "";
}
else {
this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + ".";
}
// Create and configure the database history ... // Create and configure the database history ...
this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class); this.dbHistory = config.getInstance(MySqlConnectorConfig.DATABASE_HISTORY, DatabaseHistory.class);
if (this.dbHistory == null) { if (this.dbHistory == null) {
@ -147,10 +130,26 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates
this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
tableSchemaByTableId = new SchemasByTableId(tableIdCaseInsensitive);
this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL); this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
} }
private static MySqlValueConverters getValueConverters(Configuration config) {
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
TemporalPrecisionMode timePrecisionMode = TemporalPrecisionMode.parse(timePrecisionModeStr);
String decimalHandlingModeStr = config.getString(MySqlConnectorConfig.DECIMAL_HANDLING_MODE);
DecimalHandlingMode decimalHandlingMode = DecimalHandlingMode.parse(decimalHandlingModeStr);
DecimalMode decimalMode = decimalHandlingMode.asDecimalMode();
String bigIntUnsignedHandlingModeStr = config.getString(MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE);
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode = BigIntUnsignedHandlingMode.parse(bigIntUnsignedHandlingModeStr);
BigIntUnsignedMode bigIntUnsignedMode = bigIntUnsignedHandlingMode.asBigIntUnsignedMode();
return new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode);
}
protected HistoryRecordComparator historyComparator() { protected HistoryRecordComparator historyComparator() {
return this.historyComparator; return this.historyComparator;
} }
@ -212,25 +211,11 @@ public String[] monitoredTablesAsStringArray() {
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null, * @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters * or if the table has been excluded by the filters
*/ */
@Override
public Table tableFor(TableId id) { public Table tableFor(TableId id) {
return isTableMonitored(id) ? tables.forTable(id) : null; return isTableMonitored(id) ? tables.forTable(id) : null;
} }
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and is
* included by the {@link #filters() filter}.
* <p>
* Note that the {@link Schema} will not contain any columns that have been {@link MySqlConnectorConfig#COLUMN_BLACKLIST
* filtered out}.
*
* @param id the fully-qualified table identifier; may be null
* @return the schema information, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters
*/
public TableSchema schemaFor(TableId id) {
return isTableMonitored(id) ? tableSchemaByTableId.get(id) : null;
}
/** /**
* Decide whether events should be captured for a given table * Decide whether events should be captured for a given table
* *
@ -309,12 +294,11 @@ public void intializeHistoryStorage() {
* Discard any currently-cached schemas and rebuild them using the filters. * Discard any currently-cached schemas and rebuild them using the filters.
*/ */
protected void refreshSchemas() { protected void refreshSchemas() {
tableSchemaByTableId.clear(); clearSchemas();
// Create TableSchema instances for any existing table ... // Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(id -> { this.tables.tableIds().forEach(id -> {
Table table = this.tables.forTable(id); Table table = this.tables.forTable(id);
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, filters.columnFilter(), filters.columnMappers()); buildAndRegisterSchema(table);
tableSchemaByTableId.put(id, schema);
}); });
} }
@ -401,50 +385,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
changes.forEach(tableId -> { changes.forEach(tableId -> {
Table table = tables.forTable(tableId); Table table = tables.forTable(tableId);
if (table == null) { // removed if (table == null) { // removed
tableSchemaByTableId.remove(tableId); removeSchema(tableId);
} else { }
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, filters.columnFilter(), filters.columnMappers()); else {
tableSchemaByTableId.put(tableId, schema); buildAndRegisterSchema(table);
} }
}); });
return true; return true;
} }
private String getEnvelopeSchemaName(Table table) {
return topicSelector.getTopic(table.id()) + ".Envelope";
}
/**
* A map of schemas by table id. Table names are stored lower-case if required as per the config.
*/
private static class SchemasByTableId {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableSchema> values;
public SchemasByTableId(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}
public void clear() {
values.clear();
}
public TableSchema remove(TableId tableId) {
return values.remove(toLowerCaseIfNeeded(tableId));
}
public TableSchema get(TableId tableId) {
return values.get(toLowerCaseIfNeeded(tableId));
}
public TableSchema put(TableId tableId, TableSchema updated) {
return values.put(toLowerCaseIfNeeded(tableId), updated);
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
}
} }

View File

@ -40,7 +40,7 @@ public final class MySqlTaskContext extends CdcSourceTaskContext {
private final MySqlConnectorConfig connectorConfig; private final MySqlConnectorConfig connectorConfig;
private final SourceInfo source; private final SourceInfo source;
private final MySqlSchema dbSchema; private final MySqlSchema dbSchema;
private final TopicSelector topicSelector; private final MySqlTopicSelector topicSelector;
private final RecordMakers recordProcessor; private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter; private final Predicate<String> gtidSourceFilter;
private final Predicate<String> ddlFilter; private final Predicate<String> ddlFilter;
@ -62,7 +62,7 @@ public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
this.connectionContext = new MySqlJdbcContext(config); this.connectionContext = new MySqlJdbcContext(config);
// Set up the topic selector ... // Set up the topic selector ...
this.topicSelector = TopicSelector.defaultSelector(serverName(), getHeartbeatTopicsPrefix()); this.topicSelector = MySqlTopicSelector.defaultSelector(serverName(), getHeartbeatTopicsPrefix());
// Set up the source information ... // Set up the source information ...
this.source = new SourceInfo(); this.source = new SourceInfo();
@ -107,7 +107,7 @@ public String connectorName() {
return config.getString("name"); return config.getString("name");
} }
public TopicSelector topicSelector() { public MySqlTopicSelector topicSelector() {
return topicSelector; return topicSelector;
} }

View File

@ -7,6 +7,7 @@
import io.debezium.annotation.ThreadSafe; import io.debezium.annotation.ThreadSafe;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* A function that determines the name of topics for data and metadata. * A function that determines the name of topics for data and metadata.
@ -14,7 +15,7 @@
* @author Randall Hauch * @author Randall Hauch
*/ */
@ThreadSafe @ThreadSafe
public interface TopicSelector { public interface MySqlTopicSelector extends TopicSelector<TableId> {
/** /**
* Get the default topic selector logic, which uses a '.' delimiter character when needed. * Get the default topic selector logic, which uses a '.' delimiter character when needed.
* *
@ -24,7 +25,7 @@ public interface TopicSelector {
* {@code delimiter} * {@code delimiter}
* @return the topic selector; never null * @return the topic selector; never null
*/ */
static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) { static MySqlTopicSelector defaultSelector(String prefix, String heartbeatPrefix) {
return defaultSelector(prefix, heartbeatPrefix, "."); return defaultSelector(prefix, heartbeatPrefix, ".");
} }
@ -38,8 +39,8 @@ static TopicSelector defaultSelector(String prefix, String heartbeatPrefix) {
* @param delimiter the string delineating the server, database, and table names; may not be null * @param delimiter the string delineating the server, database, and table names; may not be null
* @return the topic selector; never null * @return the topic selector; never null
*/ */
static TopicSelector defaultSelector(String prefix, String heartbeatPrefix, String delimiter) { static MySqlTopicSelector defaultSelector(String prefix, String heartbeatPrefix, String delimiter) {
return new TopicSelector() { return new MySqlTopicSelector() {
/** /**
* Get the name of the topic for the given server, database, and table names. This method returns * Get the name of the topic for the given server, database, and table names. This method returns
* "{@code <serverName>}". * "{@code <serverName>}".
@ -84,7 +85,8 @@ public String getHeartbeatTopic() {
* @param tableId the identifier of the table; may not be null * @param tableId the identifier of the table; may not be null
* @return the topic name; never null * @return the topic name; never null
*/ */
default String getTopic(TableId tableId) { @Override
default String topicNameFor(TableId tableId) {
return getTopic(tableId.catalog(),tableId.table()); return getTopic(tableId.catalog(),tableId.table());
} }

View File

@ -37,7 +37,7 @@ public class RecordMakers {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final MySqlSchema schema; private final MySqlSchema schema;
private final SourceInfo source; private final SourceInfo source;
private final TopicSelector topicSelector; private final MySqlTopicSelector topicSelector;
private final boolean emitTombstoneOnDelete; private final boolean emitTombstoneOnDelete;
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
@ -53,7 +53,7 @@ public class RecordMakers {
* @param source the connector's source information; may not be null * @param source the connector's source information; may not be null
* @param topicSelector the selector for topic names; may not be null * @param topicSelector the selector for topic names; may not be null
*/ */
public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector topicSelector, boolean emitTombstoneOnDelete) { public RecordMakers(MySqlSchema schema, SourceInfo source, MySqlTopicSelector topicSelector, boolean emitTombstoneOnDelete) {
this.schema = schema; this.schema = schema;
this.source = source; this.source = source;
this.topicSelector = topicSelector; this.topicSelector = topicSelector;
@ -177,7 +177,7 @@ public boolean assign(long tableNumber, TableId id) {
TableSchema tableSchema = schema.schemaFor(id); TableSchema tableSchema = schema.schemaFor(id);
if (tableSchema == null) return false; if (tableSchema == null) return false;
String topicName = topicSelector.getTopic(id); String topicName = topicSelector.topicNameFor(id);
Envelope envelope = tableSchema.getEnvelopeSchema(); Envelope envelope = tableSchema.getEnvelopeSchema();
// Generate this table's insert, update, and delete converters ... // Generate this table's insert, update, and delete converters ...

View File

@ -88,6 +88,6 @@ public MySqlSchema createSchemas() {
String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME); String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
return new MySqlSchema(new MySqlConnectorConfig(config), serverName, null, false, return new MySqlSchema(new MySqlConnectorConfig(config), serverName, null, false,
TopicSelector.defaultSelector(serverName, "__debezium-heartbeat")); MySqlTopicSelector.defaultSelector(serverName, "__debezium-heartbeat"));
} }
} }

View File

@ -61,7 +61,7 @@ public void start(Configuration config) {
} }
// create the task context and schema... // create the task context and schema...
TopicSelector topicSelector = TopicSelector.create(connectorConfig); PostgresTopicSelector topicSelector = PostgresTopicSelector.create(connectorConfig);
PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector); PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);

View File

@ -8,7 +8,6 @@
import java.sql.SQLException; import java.sql.SQLException;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -21,6 +20,7 @@
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ServerInfo; import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema; import io.debezium.relational.TableSchema;
@ -37,49 +37,41 @@
* @author Horia Chiorean * @author Horia Chiorean
*/ */
@NotThreadSafe @NotThreadSafe
public class PostgresSchema { public class PostgresSchema extends RelationalDatabaseSchema {
protected final static String PUBLIC_SCHEMA_NAME = "public"; protected final static String PUBLIC_SCHEMA_NAME = "public";
private final static Logger LOGGER = LoggerFactory.getLogger(PostgresSchema.class); private final static Logger LOGGER = LoggerFactory.getLogger(PostgresSchema.class);
private final Map<TableId, TableSchema> tableSchemaByTableId = new HashMap<>();
private final Filters filters; private final Filters filters;
private final TableSchemaBuilder schemaBuilder;
private final String schemaPrefix;
private final Tables tables; private final Tables tables;
private final SchemaNameAdjuster schemaNameAdjuster; private final SchemaNameAdjuster schemaNameAdjuster;
private final PostgresValueConverter valueConverter;
private Map<String, Integer> typeInfo; private Map<String, Integer> typeInfo;
private final TypeRegistry typeRegistry; private final TypeRegistry typeRegistry;
private final TopicSelector topicSelector;
/** /**
* Create a schema component given the supplied {@link PostgresConnectorConfig Postgres connector configuration}. * Create a schema component given the supplied {@link PostgresConnectorConfig Postgres connector configuration}.
* *
* @param config the connector configuration, which is presumed to be valid * @param config the connector configuration, which is presumed to be valid
*/ */
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, TopicSelector topicSelector) { protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry,
PostgresTopicSelector topicSelector) {
super(config.serverName(), topicSelector, new Filters(config).tableFilter(), new Filters(config).columnFilter(),
null, getTableSchemaBuilder(config, typeRegistry), false);
this.filters = new Filters(config); this.filters = new Filters(config);
this.tables = new Tables(); this.tables = new Tables();
this.topicSelector = topicSelector;
this.valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry);
this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameAdjuster, SourceInfo.SCHEMA);
// Set up the server name and schema prefix ...
String serverName = config.serverName();
if (serverName == null) {
schemaPrefix = "";
} else {
serverName = serverName.trim();
this.schemaPrefix = serverName.endsWith(".") || serverName.isEmpty() ? serverName : serverName + ".";
}
this.typeRegistry = typeRegistry; this.typeRegistry = typeRegistry;
} }
private static TableSchemaBuilder getTableSchemaBuilder(PostgresConnectorConfig config, TypeRegistry typeRegistry) {
PostgresValueConverter valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry);
return new TableSchemaBuilder(valueConverter, SchemaNameAdjuster.create(LOGGER), SourceInfo.SCHEMA);
}
/** /**
* Initializes the content for this schema by reading all the database information from the supplied connection. * Initializes the content for this schema by reading all the database information from the supplied connection.
* *
@ -162,6 +154,7 @@ public Filters filters() {
* @return the current table definition, or null if there is no table with the given identifier, if the identifier is null, * @return the current table definition, or null if there is no table with the given identifier, if the identifier is null,
* or if the table has been excluded by the filters * or if the table has been excluded by the filters
*/ */
@Override
public Table tableFor(TableId id) { public Table tableFor(TableId id) {
return filters.tableFilter().test(id) ? tables.forTable(id) : null; return filters.tableFilter().test(id) ? tables.forTable(id) : null;
} }
@ -170,10 +163,6 @@ protected String adjustSchemaName(String name) {
return this.schemaNameAdjuster.adjust(name); return this.schemaNameAdjuster.adjust(name);
} }
protected TableSchema schemaFor(TableId id) {
return tableSchemaByTableId.get(id);
}
protected boolean isFilteredOut(TableId id) { protected boolean isFilteredOut(TableId id) {
return !filters.tableFilter().test(id); return !filters.tableFilter().test(id);
} }
@ -194,7 +183,8 @@ protected Set<TableId> tables() {
* Discard any currently-cached schemas and rebuild them using the filters. * Discard any currently-cached schemas and rebuild them using the filters.
*/ */
protected void refreshSchemas() { protected void refreshSchemas() {
tableSchemaByTableId.clear(); clearSchemas();
// Create TableSchema instances for any existing table ... // Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(this::refreshSchema); this.tables.tableIds().forEach(this::refreshSchema);
} }
@ -204,12 +194,8 @@ private void refreshSchema(TableId id) {
LOGGER.debug("refreshing DB schema for table '{}'", id); LOGGER.debug("refreshing DB schema for table '{}'", id);
} }
Table table = this.tables.forTable(id); Table table = this.tables.forTable(id);
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, filters.columnFilter(), null);
tableSchemaByTableId.put(id, schema);
}
private String getEnvelopeSchemaName(Table table) { buildAndRegisterSchema(table);
return topicSelector.topicNameFor(table.id()) + ".Envelope";
} }
protected static TableId parse(String table) { protected static TableId parse(String table) {

View File

@ -23,10 +23,10 @@
public class PostgresTaskContext extends CdcSourceTaskContext { public class PostgresTaskContext extends CdcSourceTaskContext {
private final PostgresConnectorConfig config; private final PostgresConnectorConfig config;
private final TopicSelector topicSelector; private final PostgresTopicSelector topicSelector;
private final PostgresSchema schema; private final PostgresSchema schema;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector topicSelector) { protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, PostgresTopicSelector topicSelector) {
super("Postgres", config.serverName()); super("Postgres", config.serverName());
this.config = config; this.config = config;
@ -35,7 +35,7 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema; this.schema = schema;
} }
protected TopicSelector topicSelector() { protected PostgresTopicSelector topicSelector() {
return topicSelector; return topicSelector;
} }

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* Generator of topic names for {@link io.debezium.relational.Table table ids} used by the Postgres connector to determine * Generator of topic names for {@link io.debezium.relational.Table table ids} used by the Postgres connector to determine
@ -14,16 +15,16 @@
* *
* @author Horia Chiorean (hchiorea@redhat.com) * @author Horia Chiorean (hchiorea@redhat.com)
*/ */
public interface TopicSelector { public interface PostgresTopicSelector extends TopicSelector<TableId> {
public static TopicSelector create(PostgresConnectorConfig config) { public static PostgresTopicSelector create(PostgresConnectorConfig config) {
PostgresConnectorConfig.TopicSelectionStrategy topicSelectionStrategy = config.topicSelectionStrategy(); PostgresConnectorConfig.TopicSelectionStrategy topicSelectionStrategy = config.topicSelectionStrategy();
switch (topicSelectionStrategy) { switch (topicSelectionStrategy) {
case TOPIC_PER_SCHEMA: case TOPIC_PER_SCHEMA:
return TopicSelector.topicPerSchema(config.serverName()); return topicPerSchema(config.serverName());
case TOPIC_PER_TABLE: case TOPIC_PER_TABLE:
return TopicSelector.topicPerTable(config.serverName()); return topicPerTable(config.serverName());
default: default:
throw new IllegalArgumentException("Unknown topic selection strategy: " + topicSelectionStrategy); throw new IllegalArgumentException("Unknown topic selection strategy: " + topicSelectionStrategy);
} }
@ -35,7 +36,7 @@ public static TopicSelector create(PostgresConnectorConfig config) {
* @param prefix a prefix which will be prepended to the topic name * @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null} * @return a {@link TopicSelector} instance, never {@code null}
*/ */
static TopicSelector topicPerTable(String prefix) { static PostgresTopicSelector topicPerTable(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema(), tableId.table()); return tableId -> String.join(".", prefix, tableId.schema(), tableId.table());
} }
@ -45,15 +46,7 @@ static TopicSelector topicPerTable(String prefix) {
* @param prefix a prefix which will be prepended to the topic name * @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null} * @return a {@link TopicSelector} instance, never {@code null}
*/ */
static TopicSelector topicPerSchema(String prefix) { static PostgresTopicSelector topicPerSchema(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema()); return tableId -> String.join(".", prefix, tableId.schema());
} }
/**
* Returns the name of the Kafka topic for a given table identifier
*
* @param tableId the table identifier, never {@code null}
* @return the name of the Kafka topic, never {@code null}
*/
String topicNameFor(TableId tableId);
} }

View File

@ -55,7 +55,7 @@ protected PostgresSchema schema() {
return taskContext.schema(); return taskContext.schema();
} }
protected TopicSelector topicSelector() { protected PostgresTopicSelector topicSelector() {
return taskContext.topicSelector(); return taskContext.topicSelector();
} }

View File

@ -70,7 +70,7 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl"); TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), TopicSelector.create(config)); schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
@ -122,7 +122,7 @@ public void shouldLoadSchemaForExtensionPostgresTypes() throws Exception {
TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build() TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build()
); );
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), TopicSelector.create(config)); schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
@ -137,7 +137,7 @@ public void shouldLoadSchemaForPostgisTypes() throws Exception {
TestHelper.executeDDL("init_postgis.ddl"); TestHelper.executeDDL("init_postgis.ddl");
TestHelper.executeDDL("postgis_create_tables.ddl"); TestHelper.executeDDL("postgis_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), TopicSelector.create(config)); schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
@ -165,7 +165,7 @@ public void shouldApplyFilters() throws Exception {
TestHelper.execute(statements); TestHelper.execute(statements);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s1").build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s1").build());
final TypeRegistry typeRegistry = TestHelper.getTypeRegistry(); final TypeRegistry typeRegistry = TestHelper.getTypeRegistry();
schema = new PostgresSchema(config, typeRegistry, TopicSelector.create(config)); schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertTablesIncluded("s2.a", "s2.b"); assertTablesIncluded("s2.a", "s2.b");
@ -173,14 +173,14 @@ public void shouldApplyFilters() throws Exception {
} }
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s.*").build()); config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(SCHEMA_BLACKLIST, "s.*").build());
schema = new PostgresSchema(config, typeRegistry, TopicSelector.create(config)); schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertTablesExcluded("s1.a", "s2.a", "s1.b", "s2.b"); assertTablesExcluded("s1.a", "s2.a", "s1.b", "s2.b");
} }
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A,s2.A").build()); config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A,s2.A").build());
schema = new PostgresSchema(config, typeRegistry, TopicSelector.create(config)); schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertTablesIncluded("s1.b", "s2.b"); assertTablesIncluded("s1.b", "s2.b");
@ -191,7 +191,7 @@ public void shouldApplyFilters() throws Exception {
.with(SCHEMA_BLACKLIST, "s2") .with(SCHEMA_BLACKLIST, "s2")
.with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A") .with(PostgresConnectorConfig.TABLE_BLACKLIST, "s1.A")
.build()); .build());
schema = new PostgresSchema(config, typeRegistry, TopicSelector.create(config)); schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertTablesIncluded("s1.b"); assertTablesIncluded("s1.b");
@ -199,7 +199,7 @@ public void shouldApplyFilters() throws Exception {
} }
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_BLACKLIST, ".*aa") config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_BLACKLIST, ".*aa")
.build()); .build());
schema = new PostgresSchema(config, typeRegistry, TopicSelector.create(config)); schema = new PostgresSchema(config, typeRegistry, PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertColumnsExcluded("s1.a.aa", "s2.a.aa"); assertColumnsExcluded("s1.a.aa", "s2.a.aa");
@ -213,7 +213,7 @@ public void shouldDetectNewChangesAfterRefreshing() throws Exception {
"CREATE TABLE table1 (pk SERIAL, PRIMARY KEY(pk));"; "CREATE TABLE table1 (pk SERIAL, PRIMARY KEY(pk));";
TestHelper.execute(statements); TestHelper.execute(statements);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), TopicSelector.create(config)); schema = new PostgresSchema(config, TestHelper.getTypeRegistry(), PostgresTopicSelector.create(config));
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);

View File

@ -47,7 +47,7 @@ public void before() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig() PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.build()); .build());
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -93,7 +93,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl"); TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -183,7 +183,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
.with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS) .with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.build()); .build());
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -224,7 +224,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build()); .build());
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),

View File

@ -639,7 +639,7 @@ public void shouldNotStartAfterStop() throws Exception {
} }
private void setupRecordsProducer(PostgresConnectorConfig config) { private void setupRecordsProducer(PostgresConnectorConfig config) {
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
PostgresTaskContext context = new PostgresTaskContext( PostgresTaskContext context = new PostgresTaskContext(
config, config,

View File

@ -51,7 +51,7 @@ public void before(Configuration overrides) throws SQLException {
TestHelper.dropAllSchemas(); TestHelper.dropAllSchemas();
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build());
TopicSelector selector = TopicSelector.create(config); PostgresTopicSelector selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),

View File

@ -24,6 +24,7 @@
import io.debezium.schema.DataCollectionId; import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema; import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent; import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
@ -40,16 +41,20 @@ public class EventDispatcher<T extends DataCollectionId> {
private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class); private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
private final TopicSelector topicSelector; private final TopicSelector<T> topicSelector;
private final DatabaseSchema schema; private final DatabaseSchema<T> schema;
private final HistorizedDatabaseSchema<T> historizedSchema;
private final ChangeEventQueue<Object> queue; private final ChangeEventQueue<Object> queue;
private final DataCollectionFilter<T> filter; private final DataCollectionFilter<T> filter;
public EventDispatcher(TopicSelector topicSelector, DatabaseSchema schema, public EventDispatcher(TopicSelector<T> topicSelector, DatabaseSchema<T> schema,
ChangeEventQueue<Object> queue, ChangeEventQueue<Object> queue,
DataCollectionFilter<T> filter) { DataCollectionFilter<T> filter) {
this.topicSelector = topicSelector; this.topicSelector = topicSelector;
this.schema = schema; this.schema = schema;
this.historizedSchema = schema instanceof HistorizedDatabaseSchema
? (HistorizedDatabaseSchema<T>) schema
: null;
this.queue = queue; this.queue = queue;
this.filter = filter; this.filter = filter;
} }
@ -71,7 +76,7 @@ public void dispatchDataChangeEvent(T dataCollectionId, Supplier<ChangeRecordEmi
return; return;
} }
DataCollectionSchema dataCollectionSchema = schema.getDataCollectionSchema(dataCollectionId); DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);
// TODO handle as per inconsistent schema info option // TODO handle as per inconsistent schema info option
if(dataCollectionSchema == null) { if(dataCollectionSchema == null) {
@ -95,11 +100,11 @@ public void dispatchSchemaChangeEvent(T dataCollectionId, Supplier<SchemaChangeE
private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver { private final class ChangeRecordReceiver implements ChangeRecordEmitter.Receiver {
private final DataCollectionId dataCollectionId; private final T dataCollectionId;
private final ChangeEventCreator changeEventCreator; private final ChangeEventCreator changeEventCreator;
private final DataCollectionSchema dataCollectionSchema; private final DataCollectionSchema dataCollectionSchema;
private ChangeRecordReceiver(DataCollectionId dataCollectionId, ChangeEventCreator changeEventCreator, private ChangeRecordReceiver(T dataCollectionId, ChangeEventCreator changeEventCreator,
DataCollectionSchema dataCollectionSchema) { DataCollectionSchema dataCollectionSchema) {
this.dataCollectionId = dataCollectionId; this.dataCollectionId = dataCollectionId;
this.changeEventCreator = changeEventCreator; this.changeEventCreator = changeEventCreator;
@ -144,7 +149,7 @@ private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitte
@Override @Override
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
schema.applySchemaChange(event); historizedSchema.applySchemaChange(event);
} }
} }
} }

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import java.util.function.Predicate;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.TopicSelector;
public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatabaseSchema implements HistorizedDatabaseSchema<TableId> {
protected HistorizedRelationalDatabaseSchema(String serverName, TopicSelector<TableId> topicSelector,
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnMappers,
TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) {
super(serverName, topicSelector, tableFilter, columnFilter, columnMappers, schemaBuilder, tableIdCaseInsensitive);
}
}

View File

@ -5,9 +5,129 @@
*/ */
package io.debezium.relational; package io.debezium.relational;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.DatabaseSchema; import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
public interface RelationalDatabaseSchema extends DatabaseSchema { /**
* A {@link DatabaseSchema} of a relational database such as Postgres.
*
* @author Gunnar Morling
*/
public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId> {
Table getTable(TableId id); private final TopicSelector<TableId> topicSelector;
private final TableSchemaBuilder schemaBuilder;
private final Predicate<TableId> tableFilter;
private final Predicate<ColumnId> columnFilter;
private final ColumnMappers columnMappers;
private final String schemaPrefix;
private final SchemasByTableId schemasByTableId;
protected RelationalDatabaseSchema(String serverName, TopicSelector<TableId> topicSelector,
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnMappers,
TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) {
this.topicSelector = topicSelector;
this.schemaBuilder = schemaBuilder;
this.tableFilter = tableFilter;
this.columnFilter = columnFilter;
this.columnMappers = columnMappers;
this.schemaPrefix = getSchemaPrefix(serverName);
this.schemasByTableId = new SchemasByTableId(tableIdCaseInsensitive);
}
private static String getSchemaPrefix(String serverName) {
if (serverName == null) {
return "";
}
else {
serverName = serverName.trim();
return serverName.endsWith(".") || serverName.isEmpty() ? serverName : serverName + ".";
}
}
@Override
public void close() {
}
/**
* Get the {@link TableSchema Schema information} for the table with the given identifier, if that table exists and
* is included by the filter configuration.
* <p>
* Note that the {@link Schema} will not contain any columns that have been filtered out.
*
* @param id
* the table identifier; may be null
* @return the schema information, or null if there is no table with the given identifier, if the identifier is
* null, or if the table has been excluded by the filters
*/
@Override
public TableSchema schemaFor(TableId id) {
return schemasByTableId.get(id);
}
// TODO have single implementation here
public abstract Table tableFor(TableId id);
protected void clearSchemas() {
schemasByTableId.clear();
}
protected void buildAndRegisterSchema(Table table) {
if (tableFilter.test(table.id())) {
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers);
schemasByTableId.put(table.id(), schema);
}
}
protected void removeSchema(TableId id) {
schemasByTableId.remove(id);
}
private String getEnvelopeSchemaName(Table table) {
return topicSelector.topicNameFor(table.id()) + ".Envelope";
}
/**
* A map of schemas by table id. Table names are stored lower-case if required as per the config.
*/
private static class SchemasByTableId {
private final boolean tableIdCaseInsensitive;
private final ConcurrentMap<TableId, TableSchema> values;
public SchemasByTableId(boolean tableIdCaseInsensitive) {
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
this.values = new ConcurrentHashMap<>();
}
public void clear() {
values.clear();
}
public TableSchema remove(TableId tableId) {
return values.remove(toLowerCaseIfNeeded(tableId));
}
public TableSchema get(TableId tableId) {
return values.get(toLowerCaseIfNeeded(tableId));
}
public TableSchema put(TableId tableId, TableSchema updated) {
return values.put(toLowerCaseIfNeeded(tableId), updated);
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
}
} }

View File

@ -5,15 +5,17 @@
*/ */
package io.debezium.schema; package io.debezium.schema;
import io.debezium.pipeline.spi.OffsetContext; /**
* The schema of a database. Provides information about the structures of the tables (collections etc.) it contains.
public interface DatabaseSchema { *
* @author Gunnar Morling
void applySchemaChange(SchemaChangeEvent schemaChange); *
* @param <I>
void recover(OffsetContext offset); * The type of {@link DataCollectionId} used by a given implementation
*/
public interface DatabaseSchema<I extends DataCollectionId> {
void close(); void close();
DataCollectionSchema getDataCollectionSchema(DataCollectionId id); DataCollectionSchema schemaFor(I id);
} }

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
import io.debezium.pipeline.spi.OffsetContext;
/**
* A database schema that is historized, i.e. it undergoes schema changes and can be recovered from a persistent schema
* history.
*
* @author Gunnar Morling
*
* @param <I>
* The collection id type of this schema
*/
public interface HistorizedDatabaseSchema<I extends DataCollectionId> extends DatabaseSchema<I> {
void applySchemaChange(SchemaChangeEvent schemaChange);
void recover(OffsetContext offset);
}

View File

@ -5,7 +5,23 @@
*/ */
package io.debezium.schema; package io.debezium.schema;
public interface TopicSelector { /**
* Implementations return names for Kafka topics (data and meta-data).
*
* @author Randal Hauch
* @author Gunnar Morling
*
* @param <I>
* The type of {@link DataCollectionId} used by a given implementation
*/
// TODO: further unify; do we actually need distinct implementations per backend?
public interface TopicSelector<I extends DataCollectionId> {
String topicNameFor(DataCollectionId id); /**
* Returns the name of the Kafka topic for a given data collection identifier
*
* @param id the data collection identifier, never {@code null}
* @return the name of the Kafka topic, never {@code null}
*/
String topicNameFor(I id);
} }