DBZ-627 Unifying TopicSelector implementations

This commit is contained in:
Gunnar Morling 2018-07-17 23:13:46 +02:00 committed by Jiri Pechanec
parent 610da70cc5
commit 20db9299c5
15 changed files with 141 additions and 151 deletions

View File

@ -36,6 +36,7 @@
import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator; import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.TopicSelector;
import io.debezium.text.MultipleParsingExceptions; import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException; import io.debezium.text.ParsingException;
import io.debezium.util.Collect; import io.debezium.util.Collect;
@ -82,7 +83,7 @@ public class MySqlSchema extends RelationalDatabaseSchema {
* may be null if not needed * may be null if not needed
* @param tableIdCaseInsensitive true if table lookup ignores letter case * @param tableIdCaseInsensitive true if table lookup ignores letter case
*/ */
public MySqlSchema(MySqlConnectorConfig configuration, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, MySqlTopicSelector topicSelector) { public MySqlSchema(MySqlConnectorConfig configuration, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, TopicSelector<TableId> topicSelector) {
super( super(
configuration, configuration,
topicSelector, topicSelector,

View File

@ -20,8 +20,9 @@
import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
import io.debezium.heartbeat.Heartbeat; import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -40,7 +41,7 @@ public final class MySqlTaskContext extends CdcSourceTaskContext {
private final MySqlConnectorConfig connectorConfig; private final MySqlConnectorConfig connectorConfig;
private final SourceInfo source; private final SourceInfo source;
private final MySqlSchema dbSchema; private final MySqlSchema dbSchema;
private final MySqlTopicSelector topicSelector; private final TopicSelector<TableId> topicSelector;
private final RecordMakers recordProcessor; private final RecordMakers recordProcessor;
private final Predicate<String> gtidSourceFilter; private final Predicate<String> gtidSourceFilter;
private final Predicate<String> ddlFilter; private final Predicate<String> ddlFilter;
@ -62,7 +63,7 @@ public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
this.connectionContext = new MySqlJdbcContext(config); this.connectionContext = new MySqlJdbcContext(config);
// Set up the topic selector ... // Set up the topic selector ...
this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), getHeartbeatTopicsPrefix()); this.topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig.getLogicalName(), connectorConfig.getHeartbeatTopicsPrefix());
// Set up the source information ... // Set up the source information ...
this.source = new SourceInfo(); this.source = new SourceInfo();
@ -107,7 +108,7 @@ public String connectorName() {
return config.getString("name"); return config.getString("name");
} }
public MySqlTopicSelector topicSelector() { public TopicSelector<TableId> topicSelector() {
return topicSelector; return topicSelector;
} }
@ -246,10 +247,6 @@ public String getSnapshotSelectOverrides() {
return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE); return config.getString(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
} }
public String getHeartbeatTopicsPrefix() {
return config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
}
public Duration snapshotDelay() { public Duration snapshotDelay() {
return Duration.ofMillis(config.getLong(MySqlConnectorConfig.SNAPSHOT_DELAY_MS)); return Duration.ofMillis(config.getLong(MySqlConnectorConfig.SNAPSHOT_DELAY_MS));
} }

View File

@ -10,12 +10,13 @@
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
/** /**
* A function that determines the name of topics for data and metadata. * Factory for this connector's {@link TopicSelector}.
* *
* @author Randall Hauch * @author Randall Hauch
*/ */
@ThreadSafe @ThreadSafe
public interface MySqlTopicSelector extends TopicSelector<TableId> { public class MySqlTopicSelector {
/** /**
* Get the default topic selector logic, which uses a '.' delimiter character when needed. * Get the default topic selector logic, which uses a '.' delimiter character when needed.
* *
@ -25,91 +26,8 @@ public interface MySqlTopicSelector extends TopicSelector<TableId> {
* {@code delimiter} * {@code delimiter}
* @return the topic selector; never null * @return the topic selector; never null
*/ */
static MySqlTopicSelector defaultSelector(String prefix, String heartbeatPrefix) { static TopicSelector<TableId> defaultSelector(String prefix, String heartbeatPrefix) {
return defaultSelector(prefix, heartbeatPrefix, "."); return TopicSelector.defaultSelector(prefix, heartbeatPrefix, ".",
(t, pref, delimiter) -> String.join(delimiter, pref, t.catalog(), t.table()));
} }
/**
* Get the default topic selector logic, which uses the supplied delimiter character when needed.
*
* @param prefix the name of the prefix to be used for all topics; may not be null and must not terminate in the
* {@code delimiter}
* @param heartbeatPrefix - a prefix that will be used for heartbeat topics. All heartbeat topics will start with this prefix and will use
* {@code delimiter} to separate the heartbeat prefix and the rest of the name
* @param delimiter the string delineating the server, database, and table names; may not be null
* @return the topic selector; never null
*/
static MySqlTopicSelector defaultSelector(String prefix, String heartbeatPrefix, String delimiter) {
return new MySqlTopicSelector() {
/**
* Get the name of the topic for the given server, database, and table names. This method returns
* "{@code <serverName>}".
*
* @return the topic name; never null
*/
@Override
public String getPrimaryTopic() {
return prefix;
}
/**
* Get the name of the topic for the given server name. This method returns
* "{@code <prefix>.<databaseName>.<tableName>}".
*
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
*/
@Override
public String getTopic(String databaseName, String tableName) {
return String.join(delimiter, prefix, databaseName, tableName);
}
/**
* Get the name of the heartbeat topic for the given server. This method returns
* "{@code <prefix>-heartbeat}".
*
* @return the topic name; never null
*/
@Override
public String getHeartbeatTopic() {
return String.join(delimiter, heartbeatPrefix, prefix);
}
};
}
/**
* Get the name of the topic for the given server name.
*
* @param tableId the identifier of the table; may not be null
* @return the topic name; never null
*/
@Override
default String topicNameFor(TableId tableId) {
return getTopic(tableId.catalog(),tableId.table());
}
/**
* Get the name of the topic for the given server name.
*
* @param databaseName the name of the database; may not be null
* @param tableName the name of the table; may not be null
* @return the topic name; never null
*/
String getTopic(String databaseName, String tableName);
/**
* Get the name of the primary topic.
*
* @return the topic name; never null
*/
String getPrimaryTopic();
/**
* Get the name of the heartbeat topic.
*
* @return the topic name; never null
*/
String getHeartbeatTopic();
} }

View File

@ -25,6 +25,7 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema; import io.debezium.relational.TableSchema;
import io.debezium.relational.history.HistoryRecord.Fields; import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
/** /**
@ -37,7 +38,7 @@ public class RecordMakers {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final MySqlSchema schema; private final MySqlSchema schema;
private final SourceInfo source; private final SourceInfo source;
private final MySqlTopicSelector topicSelector; private final TopicSelector<TableId> topicSelector;
private final boolean emitTombstoneOnDelete; private final boolean emitTombstoneOnDelete;
private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); private final Map<Long, Converter> convertersByTableNumber = new HashMap<>();
private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>();
@ -53,7 +54,7 @@ public class RecordMakers {
* @param source the connector's source information; may not be null * @param source the connector's source information; may not be null
* @param topicSelector the selector for topic names; may not be null * @param topicSelector the selector for topic names; may not be null
*/ */
public RecordMakers(MySqlSchema schema, SourceInfo source, MySqlTopicSelector topicSelector, boolean emitTombstoneOnDelete) { public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId> topicSelector, boolean emitTombstoneOnDelete) {
this.schema = schema; this.schema = schema;
this.source = source; this.source = source;
this.topicSelector = topicSelector; this.topicSelector = topicSelector;

View File

@ -25,9 +25,11 @@
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder; import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
import io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.NonStreamingWal2JsonMessageDecoder;
import io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.StreamingWal2JsonMessageDecoder;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.TableId;
/** /**
* The configuration properties for the {@link PostgresConnector} * The configuration properties for the {@link PostgresConnector}
@ -258,12 +260,24 @@ public enum TopicSelectionStrategy implements EnumeratedValue {
/** /**
* Create a topic for each distinct DB table * Create a topic for each distinct DB table
*/ */
TOPIC_PER_TABLE("topic_per_table"), TOPIC_PER_TABLE("topic_per_table") {
@Override
public String getTopicName(TableId tableId, String prefix, String delimiter) {
return String.join(delimiter, prefix, tableId.schema(), tableId.table());
}
},
/** /**
* Create a topic for an entire DB schema * Create a topic for an entire DB schema
*/ */
TOPIC_PER_SCHEMA("topic_per_schema"); TOPIC_PER_SCHEMA("topic_per_schema") {
@Override
public String getTopicName(TableId tableId, String prefix, String delimiter) {
return String.join(delimiter, prefix, tableId.schema());
}
};
private String value; private String value;
@ -272,6 +286,8 @@ public String getValue() {
return value; return value;
} }
public abstract String getTopicName(TableId tableId, String prefix, String delimiter);
TopicSelectionStrategy(String value) { TopicSelectionStrategy(String value) {
this.value = value; this.value = value;
} }
@ -692,7 +708,10 @@ public String getPostgresPluginName() {
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP, public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_NAME, DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_NAME,
TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.MAX_BATCH_SIZE, TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, SCHEMA_WHITELIST, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX,
SCHEMA_WHITELIST,
SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, COLUMN_BLACKLIST, SNAPSHOT_MODE,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
@ -837,7 +856,8 @@ protected static ConfigDef configDef() {
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE); DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST, Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE); CommonConnectorConfig.TOMBSTONES_ON_DELETE, Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX);
Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE, Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE,
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE); SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE);
return config; return config;

View File

@ -22,6 +22,8 @@
import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask; import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext;
/** /**
@ -61,7 +63,7 @@ public void start(Configuration config) {
} }
// create the task context and schema... // create the task context and schema...
PostgresTopicSelector topicSelector = PostgresTopicSelector.create(connectorConfig); TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector); PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);

View File

@ -23,6 +23,7 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder; import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
/** /**
@ -50,7 +51,7 @@ public class PostgresSchema extends RelationalDatabaseSchema {
* @param config the connector configuration, which is presumed to be valid * @param config the connector configuration, which is presumed to be valid
*/ */
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry,
PostgresTopicSelector topicSelector) { TopicSelector<TableId> topicSelector) {
super(config, topicSelector, new Filters(config).tableFilter(), super(config, topicSelector, new Filters(config).tableFilter(),
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry), false); new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry), false);

View File

@ -12,6 +12,8 @@
import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* The context of a {@link PostgresConnectorTask}. This deals with most of the brunt of reading various configuration options * The context of a {@link PostgresConnectorTask}. This deals with most of the brunt of reading various configuration options
@ -23,10 +25,10 @@
public class PostgresTaskContext extends CdcSourceTaskContext { public class PostgresTaskContext extends CdcSourceTaskContext {
private final PostgresConnectorConfig config; private final PostgresConnectorConfig config;
private final PostgresTopicSelector topicSelector; private final TopicSelector<TableId> topicSelector;
private final PostgresSchema schema; private final PostgresSchema schema;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, PostgresTopicSelector topicSelector) { protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector<TableId> topicSelector) {
super("Postgres", config.getLogicalName()); super("Postgres", config.getLogicalName());
this.config = config; this.config = config;
@ -35,7 +37,7 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema; this.schema = schema;
} }
protected PostgresTopicSelector topicSelector() { protected TopicSelector<TableId> topicSelector() {
return topicSelector; return topicSelector;
} }

View File

@ -6,47 +6,21 @@
package io.debezium.connector.postgresql; package io.debezium.connector.postgresql;
import io.debezium.connector.postgresql.PostgresConnectorConfig.TopicSelectionStrategy;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector; import io.debezium.schema.TopicSelector;
/** /**
* Generator of topic names for {@link io.debezium.relational.Table table ids} used by the Postgres connector to determine * Factory for this connector's {@link TopicSelector}.
* which Kafka topics contain which messages
* *
* @author Horia Chiorean (hchiorea@redhat.com) * @author Horia Chiorean (hchiorea@redhat.com)
*/ */
public interface PostgresTopicSelector extends TopicSelector<TableId> { public class PostgresTopicSelector {
public static PostgresTopicSelector create(PostgresConnectorConfig config) { public static TopicSelector<TableId> create(PostgresConnectorConfig connectorConfig) {
PostgresConnectorConfig.TopicSelectionStrategy topicSelectionStrategy = config.topicSelectionStrategy(); TopicSelectionStrategy topicSelectionStrategy = connectorConfig.topicSelectionStrategy();
switch (topicSelectionStrategy) { return TopicSelector.defaultSelector(connectorConfig,
case TOPIC_PER_SCHEMA: (id, prefix, delimiter) -> topicSelectionStrategy.getTopicName(id, prefix, delimiter));
return topicPerSchema(config.getLogicalName());
case TOPIC_PER_TABLE:
return topicPerTable(config.getLogicalName());
default:
throw new IllegalArgumentException("Unknown topic selection strategy: " + topicSelectionStrategy);
}
}
/**
* Generates a topic name for each table, based on the table schema, table name and a prefix
*
* @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null}
*/
static PostgresTopicSelector topicPerTable(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema(), tableId.table());
}
/**
* Generates a topic name for each table, based on the table schema and a prefix
*
* @param prefix a prefix which will be prepended to the topic name
* @return a {@link TopicSelector} instance, never {@code null}
*/
static PostgresTopicSelector topicPerSchema(String prefix) {
return tableId -> String.join(".", prefix, tableId.schema());
} }
} }

View File

@ -12,6 +12,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.function.BlockingConsumer; import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock; import io.debezium.util.Clock;
/** /**
@ -55,7 +57,7 @@ protected PostgresSchema schema() {
return taskContext.schema(); return taskContext.schema();
} }
protected PostgresTopicSelector topicSelector() { protected TopicSelector<TableId> topicSelector() {
return taskContext.topicSelector(); return taskContext.topicSelector();
} }

View File

@ -27,6 +27,8 @@
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor; import io.debezium.doc.FixFor;
import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* Integration test for {@link RecordsSnapshotProducerIT} * Integration test for {@link RecordsSnapshotProducerIT}
@ -47,7 +49,7 @@ public void before() throws Exception {
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig() PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.build()); .build());
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -93,7 +95,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl"); TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -183,7 +185,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
.with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS) .with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.build()); .build());
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
@ -224,7 +226,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING) .with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING)
.build()); .build());
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),

View File

@ -37,6 +37,7 @@
import io.debezium.junit.ConditionalFail; import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen; import io.debezium.junit.ShouldFailWhen;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* Integration test for the {@link RecordsStreamProducer} class. This also tests indirectly the PG plugin functionality for * Integration test for the {@link RecordsStreamProducer} class. This also tests indirectly the PG plugin functionality for
@ -662,7 +663,7 @@ private void setupRecordsProducer(PostgresConnectorConfig config) {
recordsProducer.stop(); recordsProducer.stop();
} }
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
PostgresTaskContext context = new PostgresTaskContext( PostgresTaskContext context = new PostgresTaskContext(
config, config,

View File

@ -19,6 +19,8 @@
import org.junit.Test; import org.junit.Test;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/** /**
* Integration test for {@link io.debezium.connector.postgresql.PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE} * Integration test for {@link io.debezium.connector.postgresql.PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE}
@ -51,7 +53,7 @@ public void before(Configuration overrides) throws SQLException {
TestHelper.dropAllSchemas(); TestHelper.dropAllSchemas();
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build()); PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build());
PostgresTopicSelector selector = PostgresTopicSelector.create(config); TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext( context = new PostgresTaskContext(
config, config,
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector), new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),

View File

@ -12,6 +12,8 @@
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigDef.Width;
import io.debezium.heartbeat.Heartbeat;
/** /**
* Configuration options common to all Debezium connectors. * Configuration options common to all Debezium connectors.
* *
@ -68,6 +70,7 @@ public class CommonConnectorConfig {
private final int maxBatchSize; private final int maxBatchSize;
private final Duration pollInterval; private final Duration pollInterval;
private final String logicalName; private final String logicalName;
private final String heartbeatTopicsPrefix;
protected CommonConnectorConfig(Configuration config, String logicalName) { protected CommonConnectorConfig(Configuration config, String logicalName) {
this.config = config; this.config = config;
@ -76,6 +79,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName) {
this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE); this.maxBatchSize = config.getInteger(MAX_BATCH_SIZE);
this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS); this.pollInterval = config.getDuration(POLL_INTERVAL_MS, ChronoUnit.MILLIS);
this.logicalName = logicalName; this.logicalName = logicalName;
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
} }
/** /**
@ -106,6 +110,11 @@ public String getLogicalName() {
return logicalName; return logicalName;
} }
public String getHeartbeatTopicsPrefix() {
return heartbeatTopicsPrefix;
}
private static int validateMaxQueueSize(Configuration config, Field field, Field.ValidationOutput problems) { private static int validateMaxQueueSize(Configuration config, Field field, Field.ValidationOutput problems) {
int maxQueueSize = config.getInteger(field); int maxQueueSize = config.getInteger(field);
int maxBatchSize = config.getInteger(MAX_BATCH_SIZE); int maxBatchSize = config.getInteger(MAX_BATCH_SIZE);

View File

@ -5,6 +5,8 @@
*/ */
package io.debezium.schema; package io.debezium.schema;
import io.debezium.config.CommonConnectorConfig;
/** /**
* Implementations return names for Kafka topics (data and meta-data). * Implementations return names for Kafka topics (data and meta-data).
* *
@ -14,8 +16,30 @@
* @param <I> * @param <I>
* The type of {@link DataCollectionId} used by a given implementation * The type of {@link DataCollectionId} used by a given implementation
*/ */
// TODO: further unify; do we actually need distinct implementations per backend? public class TopicSelector<I extends DataCollectionId> {
public interface TopicSelector<I extends DataCollectionId> {
private final String prefix;
private final String heartbeatPrefix;
private final String delimiter;
private final DataCollectionTopicNamer<I> dataCollectionTopicNamer;
private TopicSelector(String prefix, String heartbeatPrefix, String delimiter, DataCollectionTopicNamer<I> dataCollectionTopicNamer) {
this.prefix = prefix;
this.heartbeatPrefix = heartbeatPrefix;
this.delimiter = delimiter;
this.dataCollectionTopicNamer = dataCollectionTopicNamer;
}
public static <I extends DataCollectionId> TopicSelector<I> defaultSelector(String prefix, String heartbeatPrefix, String delimiter, DataCollectionTopicNamer<I> dataCollectionTopicNamer) {
return new TopicSelector<>(prefix, heartbeatPrefix, delimiter, dataCollectionTopicNamer);
}
public static <I extends DataCollectionId> TopicSelector<I> defaultSelector(CommonConnectorConfig connectorConfig, DataCollectionTopicNamer<I> dataCollectionTopicNamer) {
String prefix = connectorConfig.getLogicalName();
String heartbeatTopicsPrefix = connectorConfig.getHeartbeatTopicsPrefix();
String delimiter = ".";
return new TopicSelector<>(prefix, heartbeatTopicsPrefix, delimiter, dataCollectionTopicNamer);
}
/** /**
* Returns the name of the Kafka topic for a given data collection identifier * Returns the name of the Kafka topic for a given data collection identifier
@ -23,5 +47,39 @@ public interface TopicSelector<I extends DataCollectionId> {
* @param id the data collection identifier, never {@code null} * @param id the data collection identifier, never {@code null}
* @return the name of the Kafka topic, never {@code null} * @return the name of the Kafka topic, never {@code null}
*/ */
String topicNameFor(I id); public String topicNameFor(I id) {
return dataCollectionTopicNamer.topicNameFor(id, prefix, delimiter);
}
/**
* Get the name of the primary topic.
*
* @return the topic name; never null
*/
public String getPrimaryTopic() {
return prefix;
}
/**
* Get the name of the heartbeat topic.
*
* @return the topic name; never null
*/
/**
* Get the name of the heartbeat topic for the given server. This method returns
* "{@code <prefix>-heartbeat}".
*
* @return the topic name; never null
*/
public String getHeartbeatTopic() {
return String.join(delimiter, heartbeatPrefix, prefix);
}
/**
* Implementations determine the topic name corresponding to a given data collection.
*/
@FunctionalInterface
public interface DataCollectionTopicNamer<I extends DataCollectionId> {
String topicNameFor(I id, String prefix, String delimiter);
}
} }