DBZ-627 Unifying TopicSelector implementations

This commit is contained in:
Gunnar Morling 2018-07-18 09:41:01 +02:00 committed by Jiri Pechanec
parent 9be74bcf35
commit a0f3aed63e
3 changed files with 11 additions and 17 deletions

View File

@ -16,6 +16,7 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.document.Document;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@ -106,7 +107,8 @@ public class OracleConnectorConfig extends RelationalDatabaseConnectorConfig {
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
CommonConnectorConfig.POLL_INTERVAL_MS,
CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE
CommonConnectorConfig.MAX_QUEUE_SIZE,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
private final String databaseName;
@ -129,7 +131,8 @@ public static ConfigDef configDef() {
Field.group(config, "Oracle", LOGICAL_NAME, DATABASE_NAME, PDB_NAME, XSTREAM_SERVER_NAME, SNAPSHOT_MODE);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
Field.group(config, "Connector", CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE);

View File

@ -26,6 +26,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -74,7 +75,7 @@ public void start(Configuration config) {
.build();
errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
OracleTopicSelector topicSelector = OracleTopicSelector.defaultSelector(connectorConfig.getLogicalName());
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
Configuration jdbcConfig = config.subset("database.", true);

View File

@ -8,20 +8,10 @@
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
public class OracleTopicSelector implements TopicSelector<TableId> {
public class OracleTopicSelector {
private final String prefix;
private OracleTopicSelector(String prefix) {
this.prefix = prefix;
}
public static OracleTopicSelector defaultSelector(String prefix) {
return new OracleTopicSelector(prefix);
}
@Override
public String topicNameFor(TableId tableId) {
return String.join(".", prefix, tableId.schema(), tableId.table());
public static TopicSelector<TableId> defaultSelector(OracleConnectorConfig connectorConfig) {
return TopicSelector.defaultSelector(connectorConfig,
(tableId, prefix, delimiter) -> String.join(delimiter, prefix, tableId.schema(), tableId.table()));
}
}