DBZ-4180 Pluggable topic selector

This commit is contained in:
harveyyue 2022-05-01 17:49:24 +08:00 committed by Jiri Pechanec
parent 9b1853746a
commit 6ea7abc8fc
109 changed files with 1155 additions and 282 deletions

View File

@ -3,7 +3,9 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
package io.debezium.spi.schema;
import java.util.List;
/**
* Common contract for all identifiers of data collections (RDBMS tables, MongoDB collections etc.)
@ -18,4 +20,19 @@ public interface DataCollectionId {
* @return the collection's fully qualified identifier.
*/
String identifier();
/**
* Get all elements of the data collection.
*/
List<String> parts();
/**
* Get a database list including database, table name.
*/
List<String> databaseParts();
/**
* Get a schema list including schema, table name.
*/
List<String> schemaParts();
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.topic;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.common.annotation.Incubating;
import io.debezium.spi.schema.DataCollectionId;
/**
* An interface that defines the topic naming strategy, including DataChange, SchemaChange, Transaction, Heartbeat events etc.
*
* @param <I>
* @author Harvey Yue
*/
@Incubating
public interface TopicNamingStrategy<I extends DataCollectionId> {
Logger LOGGER = LoggerFactory.getLogger(TopicNamingStrategy.class);
String REPLACEMENT_CHAR = "_";
void configure(Properties props);
String dataChangeTopic(I id);
String schemaChangeTopic();
String heartbeatTopic();
String transactionTopic();
/**
* Sanitize the given character whether is a legal character of a Kafka topic name.
* Legal characters are {@code [a-zA-Z0-9._-]}.
*
* @link https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java
*/
default String sanitizedTopicName(String topicName) {
StringBuilder sanitizedNameBuilder = new StringBuilder(topicName.length());
boolean changed = false;
for (int i = 0; i < topicName.length(); i++) {
char c = topicName.charAt(i);
if (c == '.' || c == '_' || c == '-' || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9')) {
sanitizedNameBuilder.append(c);
}
else {
sanitizedNameBuilder.append(REPLACEMENT_CHAR);
changed = true;
}
}
if (changed) {
String sanitizedName = sanitizedNameBuilder.toString();
LOGGER.warn("Topic '{}' name isn't a valid topic name, replacing it with '{}'.", topicName, sanitizedName);
return sanitizedName;
}
else {
return topicName;
}
}
}

View File

@ -5,8 +5,12 @@
*/
package io.debezium.connector.mongodb;
import java.util.Collections;
import java.util.List;
import io.debezium.annotation.Immutable;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
/**
* A simple identifier for collections in a replica set.
@ -104,6 +108,21 @@ public String identifier() {
return replicaSetName + "." + dbName + "." + name;
}
@Override
public List<String> parts() {
return Collect.arrayListOf(replicaSetName, dbName, name);
}
@Override
public List<String> databaseParts() {
return Collect.arrayListOf(dbName, name);
}
@Override
public List<String> schemaParts() {
return Collections.emptyList();
}
@Override
public int hashCode() {
return name.hashCode();

View File

@ -16,7 +16,7 @@
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
/**

View File

@ -21,8 +21,8 @@
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
import io.debezium.data.SchemaUtil;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.spi.schema.DataCollectionId;
/**
* Defines the Kafka Connect {@link Schema} functionality associated with a given mongodb collection, and which can

View File

@ -30,7 +30,8 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.spi.schema.DataCollectionId;
/**
* The configuration properties.
@ -479,6 +480,15 @@ public boolean isFullUpdate() {
.withImportance(Importance.LOW)
.withDescription("The maximum processing time in milliseconds to wait for the oplog cursor to process a single poll request");
public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy")
.withDisplayName("Topic naming strategy class")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The name of the TopicNamingStrategy class that should be used to determine the topic name " +
"for data change, schema change, transaction, heartbeat event etc.")
.withDefault(DefaultTopicNamingStrategy.class.getName());
private static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("MongoDB")
.type(

View File

@ -75,7 +75,7 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
this.taskContext = new MongoDbTaskContext(config);
final Schema structSchema = connectorConfig.getSourceInfoStructMaker().schema();
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), structSchema, schemaNameAdjuster);
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicNamingStrategy(), structSchema, schemaNameAdjuster);
final ReplicaSets replicaSets = getReplicaSets(config);
final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig, replicaSets);
@ -99,7 +99,7 @@ public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> star
final EventDispatcher<MongoDbPartition, CollectionId> dispatcher = new EventDispatcher<>(
connectorConfig,
taskContext.topicSelector(),
taskContext.topicNamingStrategy(),
schema,
queue,
taskContext.filters().collectionFilter()::test,

View File

@ -13,7 +13,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
/**

View File

@ -35,7 +35,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;

View File

@ -18,7 +18,7 @@
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* A context that facilitates the management of the current offsets across a set of mongodb replica sets.

View File

@ -21,7 +21,7 @@
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -53,16 +53,16 @@ public class MongoDbSchema implements DatabaseSchema<CollectionId> {
.build();
private final Filters filters;
private final TopicSelector<CollectionId> topicSelector;
private final TopicNamingStrategy<CollectionId> topicNamingStrategy;
private final Schema sourceSchema;
private final SchemaNameAdjuster adjuster;
private final ConcurrentMap<CollectionId, MongoDbCollectionSchema> collections = new ConcurrentHashMap<>();
private final JsonSerialization serialization = new JsonSerialization();
public MongoDbSchema(Filters filters, TopicSelector<CollectionId> topicSelector, Schema sourceSchema,
public MongoDbSchema(Filters filters, TopicNamingStrategy<CollectionId> topicNamingStrategy, Schema sourceSchema,
SchemaNameAdjuster schemaNameAdjuster) {
this.filters = filters;
this.topicSelector = topicSelector;
this.topicNamingStrategy = topicNamingStrategy;
this.sourceSchema = sourceSchema;
this.adjuster = schemaNameAdjuster;
}
@ -75,7 +75,7 @@ public void close() {
public DataCollectionSchema schemaFor(CollectionId collectionId) {
return collections.computeIfAbsent(collectionId, id -> {
final FieldFilter fieldFilter = filters.fieldFilterFor(id);
final String topicName = topicSelector.topicNameFor(id);
final String topicName = topicNamingStrategy.dataChangeTopic(id);
final Schema keySchema = SchemaBuilder.struct()
.name(adjuster.adjust(topicName + ".Key"))

View File

@ -10,7 +10,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* @author Randall Hauch
@ -19,7 +19,7 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
private final Filters filters;
private final SourceInfo source;
private final TopicSelector<CollectionId> topicSelector;
private final TopicNamingStrategy topicNamingStrategy;
private final String serverName;
private final ConnectionContext connectionContext;
private final MongoDbConnectorConfig connectorConfig;
@ -30,17 +30,16 @@ public class MongoDbTaskContext extends CdcSourceTaskContext {
public MongoDbTaskContext(Configuration config) {
super(Module.contextName(), config.getString(MongoDbConnectorConfig.LOGICAL_NAME), Collections::emptySet);
final String serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME);
this.filters = new Filters(config);
this.connectorConfig = new MongoDbConnectorConfig(config);
this.source = new SourceInfo(connectorConfig);
this.topicSelector = MongoDbTopicSelector.defaultSelector(serverName, connectorConfig.getHeartbeatTopicsPrefix());
this.topicNamingStrategy = connectorConfig.getTopicNamingStrategy(MongoDbConnectorConfig.TOPIC_NAMING_STRATEGY);
this.serverName = config.getString(MongoDbConnectorConfig.LOGICAL_NAME);
this.connectionContext = new ConnectionContext(config);
}
public TopicSelector<CollectionId> topicSelector() {
return topicSelector;
public TopicNamingStrategy<CollectionId> topicNamingStrategy() {
return topicNamingStrategy;
}
public Filters filters() {

View File

@ -19,7 +19,7 @@
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* An {@link OffsetContext} implementation that is specific to a single {@link ReplicaSet}.

View File

@ -35,7 +35,12 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
assertThat(key.defaultValue).isEqualTo(expected.defaultValue());
if (expected.equals(MongoDbConnectorConfig.TOPIC_NAMING_STRATEGY)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
}
else {
assertThat(key.defaultValue).isEqualTo(expected.defaultValue());
}
assertThat(key.dependents).isEqualTo(expected.dependents());
assertThat(key.width).isNotNull();
assertThat(key.group).isNotNull();

View File

@ -44,7 +44,7 @@ public void shouldAlwaysProduceCollectionSchema() {
private static MongoDbSchema getSchema(Configuration config, MongoDbTaskContext taskContext) {
final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
return new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(),
return new MongoDbSchema(taskContext.filters(), taskContext.topicNamingStrategy(),
connectorConfig.getSourceInfoStructMaker().schema(),
connectorConfig.schemaNameAdjustmentMode().createAdjuster());
}

View File

@ -15,6 +15,7 @@
import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode;
import io.debezium.doc.FixFor;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@ -85,7 +86,7 @@ public void transactionMetadataWithCustomTopicName() throws Exception {
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(MongoDbConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(MongoDbConnectorConfig.TRANSACTION_TOPIC, "tx.of.${database.server.name}")
.with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "tx.of.server")
.build();
context = new MongoDbTaskContext(config);
@ -111,7 +112,7 @@ public void transactionMetadataWithCustomTopicName() throws Exception {
// BEGIN, data, END, data
final SourceRecords records = consumeRecordsByTopic(1 + 6 + 1 + 1);
final List<SourceRecord> c1s = records.recordsForTopic("mongo1.dbA.c1");
final List<SourceRecord> txs = records.recordsForTopic("tx.of.mongo1");
final List<SourceRecord> txs = records.recordsForTopic("mongo1.tx.of.server");
assertThat(c1s).hasSize(7);
assertThat(txs).hasSize(2);

View File

@ -28,16 +28,15 @@
import io.debezium.config.Configuration;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.Configurator;
import io.debezium.connector.mongodb.Filters;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTopicSelector;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.doc.FixFor;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.schema.TopicSelector;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* Unit test for {@link ExtractNewDocumentState}.
@ -50,7 +49,7 @@ public class ExtractNewDocumentStateTest {
private Filters filters;
private SourceInfo source;
private TopicSelector<CollectionId> topicSelector;
private TopicNamingStrategy topicNamingStrategy;
private List<SourceRecord> produced;
private ExtractNewDocumentState<SourceRecord> transformation;
@ -64,14 +63,15 @@ public class ExtractNewDocumentStateTest {
@Before
public void setup() {
filters = new Configurator().createFilters();
source = new SourceInfo(new MongoDbConnectorConfig(
MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(
Configuration.create()
.with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME)
.build()));
topicSelector = MongoDbTopicSelector.defaultSelector(SERVER_NAME, "__debezium-heartbeat");
.build());
source = new SourceInfo(connectorConfig);
topicNamingStrategy = DefaultTopicNamingStrategy.create(connectorConfig);
produced = new ArrayList<>();
transformation = new ExtractNewDocumentState<SourceRecord>();
transformation = new ExtractNewDocumentState<>();
transformation.configure(Collections.singletonMap("array.encoding", "array"));
}

View File

@ -22,7 +22,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

View File

@ -34,6 +34,7 @@
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.storage.kafka.history.KafkaStorageConfiguration;
import io.debezium.util.Collect;
@ -749,6 +750,15 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
+ DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "' string.")
.withDefault(KafkaDatabaseHistory.class.getName());
public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy")
.withDisplayName("Topic naming strategy class")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The name of the TopicNamingStrategy class that should be used to determine the topic name " +
"for data change, schema change, transaction, heartbeat event etc.")
.withDefault(DefaultTopicNamingStrategy.class.getName());
public static final Field INCLUDE_SQL_QUERY = Field.create("include.query")
.withDisplayName("Include original SQL query with in change events")
.withType(Type.BOOLEAN)

View File

@ -31,7 +31,7 @@
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -65,7 +65,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
config.edit()
.with(AbstractDatabaseHistory.INTERNAL_PREFER_DDL, true)
.build());
final TopicSelector<TableId> topicSelector = MySqlTopicSelector.defaultSelector(connectorConfig);
final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY);
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final MySqlValueConverters valueConverters = getValueConverters(connectorConfig);
@ -89,7 +89,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive);
this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicNamingStrategy, schemaNameAdjuster, tableIdCaseInsensitive);
LOGGER.info("Closing connection before starting schema recovery");
@ -138,7 +138,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
final Configuration heartbeatConfig = config;
final EventDispatcher<MySqlPartition, TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
@ -146,7 +146,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
null,
metadataProvider,
connectorConfig.createHeartbeat(
topicSelector,
topicNamingStrategy,
schemaNameAdjuster,
() -> new MySqlConnection(new MySqlConnectionConfiguration(heartbeatConfig), connectorConfig.useCursorFetch()
? new MySqlBinaryProtocolFieldReader(connectorConfig)

View File

@ -42,7 +42,7 @@
import io.debezium.relational.ddl.DdlParserListener.TableIndexEvent;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
@ -76,9 +76,9 @@ public class MySqlDatabaseSchema extends HistorizedRelationalDatabaseSchema {
* The DDL statements passed to the schema are parsed and a logical model of the database schema is created.
*
*/
public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConverters valueConverter, TopicSelector<TableId> topicSelector,
public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConverters valueConverter, TopicNamingStrategy<TableId> topicNamingStrategy,
SchemaNameAdjuster schemaNameAdjuster, boolean tableIdCaseInsensitive) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
super(connectorConfig, topicNamingStrategy, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
valueConverter,
new MySqlDefaultValueConverter(valueConverter),

View File

@ -14,7 +14,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
class MySqlEventMetadataProvider implements EventMetadataProvider {

View File

@ -20,7 +20,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
public class MySqlOffsetContext extends CommonOffsetContext<SourceInfo> {

View File

@ -22,8 +22,8 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
/**

View File

@ -8,8 +8,6 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
/**
* A state (context) associated with a MySQL task
@ -21,13 +19,11 @@ public class MySqlTaskContext extends CdcSourceTaskContext {
private final MySqlDatabaseSchema schema;
private final BinaryLogClient binaryLogClient;
private final TopicSelector<TableId> topicSelector;
public MySqlTaskContext(MySqlConnectorConfig config, MySqlDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
this.schema = schema;
this.binaryLogClient = new BinaryLogClient(config.hostname(), config.port(), config.username(), config.password());
topicSelector = MySqlTopicSelector.defaultSelector(config);
}
public MySqlDatabaseSchema getSchema() {
@ -37,8 +33,4 @@ public MySqlDatabaseSchema getSchema() {
public BinaryLogClient getBinaryLogClient() {
return binaryLogClient;
}
public TopicSelector<TableId> getTopicSelector() {
return topicSelector;
}
}

View File

@ -29,7 +29,7 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.signal.ExecuteSnapshot;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.Threads;

View File

@ -21,6 +21,7 @@
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -49,6 +50,7 @@
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.DdlParserListener.Event;
import io.debezium.relational.ddl.SimpleDdlParserListener;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
@ -105,7 +107,7 @@ public void shouldProcessLargeColumn() {
public void shouldProcessMultipleSignedUnsignedForTable() {
String ddl = "create table if not exists tbl_signed_unsigned(\n"
+ "`id` bigint(20) ZEROFILL signed UNSIGNED signed ZEROFILL unsigned ZEROFILL NOT NULL AUTO_INCREMENT COMMENT 'ID',\n"
+ "c1 int signed unsigned,\n"
+ "c1 int signed unsigned default '',\n"
+ "c2 decimal(10, 2) SIGNED UNSIGNED ZEROFILL,\n"
+ "c3 float SIGNED ZEROFILL,\n"
+ "c4 double precision(18, 4) UNSIGNED SIGNED ZEROFILL,\n"
@ -3352,7 +3354,7 @@ private MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean
}
private Schema getColumnSchema(Table table, String column) {
TableSchema schema = tableSchemaBuilder.create("test-1", "dummy", table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test-1"), table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}
}

View File

@ -34,7 +34,8 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) {
if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)
|| expected.equals(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
}
else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) {

View File

@ -28,6 +28,8 @@
import io.debezium.relational.TableSchema;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.text.ParsingException;
import io.debezium.util.IoUtil;
import io.debezium.util.SchemaNameAdjuster;
@ -65,7 +67,7 @@ private MySqlDatabaseSchema getSchema(Configuration config) {
return new MySqlDatabaseSchema(
connectorConfig,
mySqlValueConverters,
MySqlTopicSelector.defaultSelector(connectorConfig),
(TopicNamingStrategy) DefaultTopicNamingStrategy.create(connectorConfig),
SchemaNameAdjuster.create(),
false);
}

View File

@ -15,6 +15,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@ -33,6 +34,7 @@
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.AbstractDdlParser;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.SchemaNameAdjuster;
@ -602,7 +604,7 @@ public void parseNumericAndDecimalToIntDefaultValue() {
assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(0);
assertThat(tables.size()).isEqualTo(1);
TableSchema schema = tableSchemaBuilder.create("test", "dummy", table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test"), table, null, null, null);
assertThat(getColumnSchema(schema, "c0").defaultValue()).isEqualTo((short) 10);
assertThat(getColumnSchema(schema, "c1").defaultValue()).isEqualTo(5);
assertThat(getColumnSchema(schema, "c2").defaultValue()).isEqualTo(0L);
@ -616,7 +618,7 @@ private Schema getColumnSchema(Table table, String column) {
}
private Schema getColumnSchema(Table table, String column, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create("test", "dummy", table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(new DefaultTopicNamingStrategy(new Properties(), "test"), table, null, null, null);
return schema.getEnvelopeSchema().schema().field("after").schema().field(column).schema();
}

View File

@ -0,0 +1,171 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DefaultRegexTopicNamingStrategy;
import io.debezium.util.Testing;
public class MySqlTopicNamingStrategyIT extends AbstractConnectorTest {
private static final String TABLE_NAME = "dbz4180";
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-comment.txt")
.toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("topic_strategy", "strategy_test")
.withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-4180")
public void testSpecifyDelimiterAndPrefixStrategy() throws SQLException, InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName(TABLE_NAME))
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true")
.with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "my_prefix")
.with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_")
.build();
start(MySqlConnector.class, config);
SourceRecords records = consumeRecordsByTopic(100);
String expectedDataTopic = String.join("_", "my_prefix", DATABASE.getDatabaseName(), TABLE_NAME);
List<SourceRecord> dataChangeEvents = records.recordsForTopic(expectedDataTopic);
assertThat(dataChangeEvents.size()).isEqualTo(1);
String expectedSchemaTopic = "my_prefix";
List<SourceRecord> schemaChangeEvents = records.recordsForTopic(expectedSchemaTopic);
assertThat(schemaChangeEvents.size()).isEqualTo(10);
// insert data
try (final Connection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName()).connection()) {
conn.createStatement().execute("INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'strategy 1', 1290)");
}
dataChangeEvents = consumeRecordsByTopic(1).recordsForTopic(expectedDataTopic);
assertThat(dataChangeEvents.size()).isEqualTo(1);
SourceRecord sourceRecord = dataChangeEvents.get(0);
final Struct change = ((Struct) sourceRecord.value()).getStruct("after");
assertThat(change.getString("c")).isEqualTo("strategy 1");
stopConnector();
}
@Test
@FixFor("DBZ-4180")
public void testSpecifyByLogicalTableStrategy() throws SQLException, InterruptedException {
String tables = DATABASE.qualifiedTableName("dbz_4180_00") + "," + DATABASE.qualifiedTableName("dbz_4180_01");
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(DefaultRegexTopicNamingStrategy.TOPIC_REGEX, "(.*)(dbz_4180)(.*)")
.with(DefaultRegexTopicNamingStrategy.TOPIC_REPLACEMENT, "$1$2_all_shards")
.with(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY, "io.debezium.schema.DefaultRegexTopicNamingStrategy")
.build();
start(MySqlConnector.class, config);
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
// insert data
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection conn = db.connect()) {
String shard0 = "INSERT INTO dbz_4180_00(a, b, c, d) VALUE (10.1, 10.2, 'shard 0', 10);";
String shard1 = "INSERT INTO dbz_4180_01(a, b, c, d) VALUE (10.1, 10.2, 'shard 1', 11);";
conn.execute(shard0, shard1);
}
}
String expectedTopic = DATABASE.topicForTable("dbz_4180_all_shards");
SourceRecords sourceRecords = consumeRecordsByTopic(100);
List<SourceRecord> records = sourceRecords.recordsForTopic(expectedTopic);
assertEquals(2, records.size());
stopConnector();
}
@Test
@FixFor("DBZ-4180")
public void testSpecifyTransactionStrategy() throws SQLException, InterruptedException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName(TABLE_NAME))
.with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, "true")
.with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "my_transaction")
.build();
start(MySqlConnector.class, config);
// Testing.Debug.enable();
assertConnectorIsRunning();
waitForSnapshotToBeCompleted("mysql", DATABASE.getServerName());
// insert data
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection conn = db.connect()) {
conn.setAutoCommit(false);
conn.execute("INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'test transaction', 1290)");
conn.commit();
}
}
SourceRecords sourceRecords = consumeRecordsByTopic(100);
String expectedTransactionTopic = DATABASE.getServerName() + "." + "my_transaction";
List<SourceRecord> transactionRecords = sourceRecords.recordsForTopic(expectedTransactionTopic);
assertEquals(2, transactionRecords.size());
List<SourceRecord> records = sourceRecords.allRecordsInOrder();
// BEGIN + 1 INSERT + END
assertEquals(1 + 1 + 1, records.size());
stopConnector();
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import java.util.Properties;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.schema.DefaultRegexTopicNamingStrategy;
import io.debezium.schema.DefaultTopicNamingStrategy;
public class MySqlTopicNamingStrategyTest {
@Test
public void testDataChangeTopic() {
final TableId tableId = TableId.parse("test_db.dbz_4180");
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put("topic.delimiter", ".");
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
final DefaultTopicNamingStrategy defaultStrategy = new DefaultTopicNamingStrategy(props);
String dataChangeTopic = defaultStrategy.dataChangeTopic(tableId);
assertThat(dataChangeTopic).isEqualTo("mysql-server-1.test_db.dbz_4180");
String sanitizedDataChangeTopic = defaultStrategy.dataChangeTopic(TableId.parse("test_db.dbz#4180#2"));
assertThat(sanitizedDataChangeTopic).isEqualTo("mysql-server-1.test_db.dbz_4180_2");
props.put("topic.prefix", "my_prefix");
defaultStrategy.configure(props);
String prefixDataChangeTopic = defaultStrategy.dataChangeTopic(tableId);
assertThat(prefixDataChangeTopic).isEqualTo("my_prefix.test_db.dbz_4180");
props.put("topic.delimiter", "_");
defaultStrategy.configure(props);
String delimiterDataChangeTopic = defaultStrategy.dataChangeTopic(tableId);
assertThat(delimiterDataChangeTopic).isEqualTo("my_prefix_test_db_dbz_4180");
}
@Test
public void testSchemaChangeTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
final DefaultTopicNamingStrategy defaultStrategy = new DefaultTopicNamingStrategy(props);
String schemaChangeTopic = defaultStrategy.schemaChangeTopic();
assertThat(schemaChangeTopic).isEqualTo("mysql-server-1");
props.put("topic.prefix", "my_prefix");
defaultStrategy.configure(props);
String prefixSchemaChangeTopic = defaultStrategy.schemaChangeTopic();
assertThat(prefixSchemaChangeTopic).isEqualTo("my_prefix");
}
@Test
public void testTransactionTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
final DefaultTopicNamingStrategy mySqlStrategy = new DefaultTopicNamingStrategy(props);
String transactionTopic = mySqlStrategy.transactionTopic();
String expectedTopic = "mysql-server-1." + DefaultTopicNamingStrategy.DEFAULT_TRANSACTION_TOPIC;
assertThat(transactionTopic).isEqualTo(expectedTopic);
}
@Test
public void testHeartbeatTopic() {
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
final DefaultTopicNamingStrategy mySqlStrategy = new DefaultTopicNamingStrategy(props);
String heartbeatTopic = mySqlStrategy.heartbeatTopic();
String expectedTopic = DefaultTopicNamingStrategy.DEFAULT_HEARTBEAT_TOPIC_PREFIX + ".mysql-server-1";
assertThat(heartbeatTopic).isEqualTo(expectedTopic);
}
@Test
public void testLogicTableTopic() {
final TableId tableId = TableId.parse("test_db.dbz_4180_01");
final String logicalName = "mysql-server-1";
final Properties props = new Properties();
props.put("topic.delimiter", ".");
props.put("topic.regex.enable", "true");
props.put("topic.regex", "(.*)(dbz_4180|test)(.*)");
props.put("topic.replacement", "$1$2_all_shards");
props.put(CommonConnectorConfig.LOGICAL_NAME, logicalName);
final DefaultRegexTopicNamingStrategy byLogicalStrategy = new DefaultRegexTopicNamingStrategy(props);
String dataChangeTopic = byLogicalStrategy.dataChangeTopic(tableId);
assertThat(dataChangeTopic).isEqualTo("mysql-server-1.test_db.dbz_4180_all_shards");
}
}

View File

@ -55,6 +55,7 @@
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
@ -501,7 +502,7 @@ public void testHeartbeatActionQueryExecuted() throws Exception {
config = simpleConfig()
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(Heartbeat.HEARTBEAT_TOPICS_PREFIX, HEARTBEAT_TOPIC_PREFIX_VALUE)
.with(AbstractTopicNamingStrategy.DEFAULT_HEARTBEAT_TOPIC_PREFIX, HEARTBEAT_TOPIC_PREFIX_VALUE)
.with(Heartbeat.HEARTBEAT_INTERVAL, "100")
.with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY_PROPERTY_NAME,
String.format("INSERT INTO %s.test_heartbeat_table (text) VALUES ('test_heartbeat');",

View File

@ -27,6 +27,7 @@
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
@ -115,7 +116,7 @@ public void shouldUseConfiguredTransactionTopicName() throws InterruptedExceptio
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(MySqlConnectorConfig.TRANSACTION_TOPIC, "tx.of.${database.server.name}")
.with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "tx.of.server")
.build();
start(MySqlConnector.class, config);
@ -134,7 +135,7 @@ public void shouldUseConfiguredTransactionTopicName() throws InterruptedExceptio
// TX BEGIN + 4 changes + TX END
SourceRecords records = consumeRecordsByTopic(1 + 4 + 1);
List<SourceRecord> txnEvents = records.recordsForTopic("tx.of." + DATABASE.getServerName());
List<SourceRecord> txnEvents = records.recordsForTopic(DATABASE.getServerName() + ".tx.of.server");
assertThat(txnEvents).hasSize(2);
}
@ -145,7 +146,7 @@ public void shouldUseConfiguredTransactionTopicNameWithoutServerName() throws In
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(MySqlConnectorConfig.TRANSACTION_TOPIC, "mytransactions")
.with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "mytransactions")
.build();
start(MySqlConnector.class, config);
@ -164,7 +165,7 @@ public void shouldUseConfiguredTransactionTopicNameWithoutServerName() throws In
// TX BEGIN + 4 changes + TX END
SourceRecords records = consumeRecordsByTopic(1 + 4 + 1);
List<SourceRecord> txnEvents = records.recordsForTopic("mytransactions");
List<SourceRecord> txnEvents = records.recordsForTopic(DATABASE.getServerName() + ".mytransactions");
assertThat(txnEvents).hasSize(2);
}

View File

@ -0,0 +1,28 @@
CREATE TABLE `dbz4180` (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
a NUMERIC(10, 2) NOT NULL DEFAULT 1.23,
b DECIMAL(10, 3) NOT NULL DEFAULT 2.321,
c VARCHAR(255) NULL DEFAULT 'default mysql strategy',
d INT NULL DEFAULT '100',
e DATETIME NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
INSERT INTO `dbz4180`(a, b, c, d)
VALUES (1.33, -2.111 , 'topic strategy', 99);
CREATE TABLE `dbz_4180_00` (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
a NUMERIC(10, 2) NOT NULL DEFAULT 1.23,
b DECIMAL(10, 3) NOT NULL DEFAULT 2.321,
c VARCHAR(255) NULL DEFAULT 'shard 0',
d INT NULL DEFAULT '100',
e DATETIME NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
CREATE TABLE `dbz_4180_01` (
id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
a NUMERIC(10, 2) NOT NULL DEFAULT 1.23,
b DECIMAL(10, 3) NOT NULL DEFAULT 2.321,
c VARCHAR(255) NULL DEFAULT 'shard 1',
d INT NULL DEFAULT '100',
e DATETIME NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

View File

@ -17,7 +17,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

View File

@ -14,6 +14,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
@ -26,7 +27,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
@ -50,7 +51,7 @@ public String version() {
@Override
public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
@ -62,7 +63,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(valueConverters, jdbcConnection);
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, defaultValueConverter, schemaNameAdjuster,
topicSelector, tableNameCaseSensitivity);
topicNamingStrategy, tableNameCaseSensitivity);
Offsets<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader());
@ -90,14 +91,14 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
EventDispatcher<OraclePartition, TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
connectorConfig.createHeartbeat(
topicSelector,
topicNamingStrategy,
schemaNameAdjuster,
() -> getHeartbeatConnection(connectorConfig, jdbcConfig),
exception -> {

View File

@ -25,7 +25,7 @@
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import oracle.jdbc.OracleTypes;
@ -47,8 +47,8 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, OracleValueConverters valueConverters,
DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster,
TopicSelector<TableId> topicSelector, TableNameCaseSensitivity tableNameCaseSensitivity) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(),
TopicNamingStrategy<TableId> topicNamingStrategy, TableNameCaseSensitivity tableNameCaseSensitivity) {
super(connectorConfig, topicNamingStrategy, connectorConfig.getTableFilters().dataCollectionFilter(),
connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
valueConverters,

View File

@ -13,7 +13,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
class OracleEventMetadataProvider implements EventMetadataProvider {

View File

@ -19,7 +19,7 @@
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
public class OracleOffsetContext extends CommonOffsetContext<SourceInfo> {

View File

@ -19,6 +19,8 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
public class OracleConnectorTest {
OracleConnector connector;
@ -66,7 +68,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
if (expected.equals(OracleConnectorConfig.DATABASE_HISTORY)) {
if (expected.equals(OracleConnectorConfig.DATABASE_HISTORY) || expected.equals(CommonConnectorConfig.TOPIC_NAMING_STRATEGY)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
}
assertThat(key.dependents).isEqualTo(expected.dependents());

View File

@ -27,7 +27,6 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDefaultValueConverter;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
@ -35,8 +34,8 @@
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
@ -293,10 +292,10 @@ private OracleDatabaseSchema createSchema(OracleConnectorConfig connectorConfig)
OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(converters, connection);
TableNameCaseSensitivity tableNameSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(connection);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
TopicNamingStrategy topicNamingStrategy = SchemaTopicNamingStrategy.create(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
return new OracleDatabaseSchema(connectorConfig, converters, defaultValueConverter, schemaNameAdjuster, topicSelector, tableNameSensitivity);
return new OracleDatabaseSchema(connectorConfig, converters, defaultValueConverter, schemaNameAdjuster, topicNamingStrategy, tableNameSensitivity);
}
private String getPdbPredicate(OracleConnectorConfig config) {

View File

@ -36,7 +36,6 @@
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.OracleTopicSelector;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
@ -51,7 +50,8 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -273,7 +273,7 @@ public void testAbandonTransactionHavingAnotherOne() throws Exception {
private OracleDatabaseSchema createOracleDatabaseSchema() throws Exception {
final OracleConnectorConfig connectorConfig = new OracleConnectorConfig(getConfig().build());
final TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
final TopicNamingStrategy topicNamingStrategy = SchemaTopicNamingStrategy.create(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final OracleValueConverters converters = new OracleValueConverters(connectorConfig, connection);
final OracleDefaultValueConverter defaultValueConverter = new OracleDefaultValueConverter(converters, connection);
@ -283,7 +283,7 @@ private OracleDatabaseSchema createOracleDatabaseSchema() throws Exception {
converters,
defaultValueConverter,
schemaNameAdjuster,
topicSelector,
topicNamingStrategy,
sensitivity);
Table table = Table.editor()

View File

@ -20,7 +20,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

View File

@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
@ -36,7 +37,7 @@
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
@ -61,7 +62,7 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, Pos
@Override
public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> start(Configuration config) {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
final TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
@ -92,8 +93,8 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final TypeRegistry typeRegistry = jdbcConnection.getTypeRegistry();
final PostgresDefaultValueConverter defaultValueConverter = jdbcConnection.getDefaultValueConverter();
schema = new PostgresSchema(connectorConfig, typeRegistry, defaultValueConverter, topicSelector, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
schema = new PostgresSchema(connectorConfig, typeRegistry, defaultValueConverter, topicNamingStrategy, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy);
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
@ -169,7 +170,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresEventDispatcher<TableId> dispatcher = new PostgresEventDispatcher<>(
connectorConfig,
topicSelector,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
@ -177,7 +178,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
PostgresChangeRecordEmitter::updateSchema,
metadataProvider,
connectorConfig.createHeartbeat(
topicSelector,
topicNamingStrategy,
schemaNameAdjuster,
() -> new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL),
exception -> {

View File

@ -21,9 +21,9 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -37,27 +37,27 @@ public class PostgresEventDispatcher<T extends DataCollectionId> extends EventDi
private final LogicalDecodingMessageMonitor logicalDecodingMessageMonitor;
private final LogicalDecodingMessageFilter messageFilter;
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
connectorConfig.createHeartbeat(topicSelector, schemaNameAdjuster, null, null), schemaNameAdjuster, null);
this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider,
connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, null, null), schemaNameAdjuster, null);
}
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider,
heartbeat, schemaNameAdjuster, null);
}
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public PostgresEventDispatcher(PostgresConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<PostgresPartition, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster,
JdbcConnection jdbcConnection) {
super(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider,
super(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, inconsistentSchemaHandler, metadataProvider,
heartbeat, schemaNameAdjuster);
this.queue = queue;
this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(connectorConfig, this::enqueueLogicalDecodingMessage);

View File

@ -13,7 +13,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.time.Conversions;
import io.debezium.util.Collect;

View File

@ -25,7 +25,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.time.Conversions;
import io.debezium.util.Clock;

View File

@ -28,7 +28,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* Component that records the schema information for the {@link PostgresConnector}. The schema information contains
@ -56,8 +56,8 @@ public class PostgresSchema extends RelationalDatabaseSchema {
* @param config the connector configuration, which is presumed to be valid
*/
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, PostgresDefaultValueConverter defaultValueConverter,
TopicSelector<TableId> topicSelector, PostgresValueConverter valueConverter) {
super(config, topicSelector, config.getTableFilters().dataCollectionFilter(),
TopicNamingStrategy<TableId> topicNamingStrategy, PostgresValueConverter valueConverter) {
super(config, topicNamingStrategy, config.getTableFilters().dataCollectionFilter(),
config.getColumnFilter(), getTableSchemaBuilder(config, valueConverter, defaultValueConverter),
false, config.getKeyMapper());

View File

@ -18,7 +18,7 @@
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
@ -34,26 +34,26 @@ public class PostgresTaskContext extends CdcSourceTaskContext {
protected final static Logger LOGGER = LoggerFactory.getLogger(PostgresTaskContext.class);
private final PostgresConnectorConfig config;
private final TopicSelector<TableId> topicSelector;
private final TopicNamingStrategy<TableId> topicNamingStrategy;
private final PostgresSchema schema;
private ElapsedTimeStrategy refreshXmin;
private Long lastXmin;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector<TableId> topicSelector) {
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy) {
super(config.getContextName(), config.getLogicalName(), Collections::emptySet);
this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis());
}
this.topicSelector = topicSelector;
this.topicNamingStrategy = topicNamingStrategy;
assert schema != null;
this.schema = schema;
}
protected TopicSelector<TableId> topicSelector() {
return topicSelector;
protected TopicNamingStrategy<TableId> topicNamingStrategy() {
return topicNamingStrategy;
}
protected PostgresSchema schema() {

View File

@ -13,7 +13,7 @@
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
/**
@ -57,8 +57,8 @@ protected PostgresSchema schema() {
return taskContext.schema();
}
protected TopicSelector<TableId> topicSelector() {
return taskContext.topicSelector();
protected TopicNamingStrategy<TableId> topicNamingStrategy() {
return taskContext.topicNamingStrategy();
}
protected Clock clock() {

View File

@ -16,6 +16,8 @@
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.doc.FixFor;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* Integration test for {@link PostgresConnectorTask} class.
@ -50,7 +52,7 @@ public void retryOnFailureToCreateConnection() throws Exception {
config,
null,
null,
PostgresTopicSelector.create(config), null)), true, 3, Duration.ofSeconds(2));
(TopicNamingStrategy) SchemaTopicNamingStrategy.create(config), null)), true, 3, Duration.ofSeconds(2));
// Verify retry happened for 10 seconds
long endTime = System.currentTimeMillis();

View File

@ -40,6 +40,8 @@
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Throwables;
/**
@ -85,7 +87,7 @@ private TestHelper() {
*
* @param slotName the name of the logical decoding slot
* @param dropOnClose true if the slot should be dropped upon close
* @param connectorConfig customized connector configuration
* @param config customized connector configuration
* @return the PostgresConnection instance; never null
* @throws SQLException if there is a problem obtaining a replication connection
*/
@ -244,7 +246,7 @@ public static PostgresSchema getSchema(PostgresConnectorConfig config, TypeRegis
config,
typeRegistry,
TestHelper.getDefaultValueConverter(),
PostgresTopicSelector.create(config),
(TopicNamingStrategy) SchemaTopicNamingStrategy.create(config),
getPostgresValueConverter(typeRegistry, config));
}

View File

@ -17,7 +17,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

View File

@ -13,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
@ -26,7 +27,7 @@
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -65,19 +66,21 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
.build();
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final boolean multiPartitionMode = connectorConfig.isMultiPartitionModeEnabled();
final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY, multiPartitionMode);
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjustmentMode().createAdjuster();
final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), connectorConfig.getSourceTimestampMode(), valueConverters,
() -> getClass().getClassLoader(),
connectorConfig.getSkippedOperations(), connectorConfig.isMultiPartitionModeEnabled(), connectorConfig.getOptionRecompile());
connectorConfig.getSkippedOperations(), multiPartitionMode, connectorConfig.getOptionRecompile());
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), connectorConfig.getSourceTimestampMode(), valueConverters,
() -> getClass().getClassLoader(),
connectorConfig.getSkippedOperations(), connectorConfig.isMultiPartitionModeEnabled());
connectorConfig.getSkippedOperations(), multiPartitionMode);
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicSelector, schemaNameAdjuster);
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicNamingStrategy,
schemaNameAdjuster);
this.schema.initializeStorage();
Offsets<SqlServerPartition, SqlServerOffsetContext> offsets = getPreviousOffsets(
@ -103,7 +106,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
final EventDispatcher<SqlServerPartition, TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
@ -117,7 +120,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
SqlServerConnector.class,
connectorConfig,
new SqlServerChangeEventSourceFactory(connectorConfig, dataConnection, metadataConnection, errorHandler, dispatcher, clock, schema),
createChangeEventSourceMetricsFactory(connectorConfig.isMultiPartitionModeEnabled(), offsets),
createChangeEventSourceMetricsFactory(multiPartitionMode, offsets),
dispatcher,
schema,
clock);

View File

@ -17,7 +17,7 @@
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -30,9 +30,9 @@ public class SqlServerDatabaseSchema extends HistorizedRelationalDatabaseSchema
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerDatabaseSchema.class);
public SqlServerDatabaseSchema(SqlServerConnectorConfig connectorConfig, SqlServerDefaultValueConverter defaultValueConverter,
ValueConverterProvider valueConverter, TopicSelector<TableId> topicSelector,
ValueConverterProvider valueConverter, TopicNamingStrategy<TableId> topicNamingStrategy,
SchemaNameAdjuster schemaNameAdjuster) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
super(connectorConfig, topicNamingStrategy, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
valueConverter,
defaultValueConverter,

View File

@ -13,7 +13,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
class SqlServerEventMetadataProvider implements EventMetadataProvider {

View File

@ -17,7 +17,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext extends CommonOffsetContext<SourceInfo> {

View File

@ -16,7 +16,7 @@
import io.debezium.pipeline.meters.CommonEventMeter;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* Base implementation of partition-scoped multi-partition SQL Server connector metrics.

View File

@ -21,7 +21,7 @@
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.metrics.ChangeEventSourceMetrics;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
/**

View File

@ -12,7 +12,7 @@
import io.debezium.pipeline.meters.SnapshotMeter;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
class SqlServerSnapshotPartitionMetrics extends AbstractSqlServerPartitionMetrics
implements SqlServerSnapshotPartitionMetricsMXBean {

View File

@ -13,7 +13,7 @@
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
class SqlServerSnapshotTaskMetrics extends AbstractSqlServerTaskMetrics<SqlServerSnapshotPartitionMetrics>

View File

@ -14,7 +14,7 @@
import io.debezium.pipeline.meters.StreamingMeter;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
class SqlServerStreamingPartitionMetrics extends AbstractSqlServerPartitionMetrics
implements SqlServerStreamingPartitionMetricsMXBean {

View File

@ -15,6 +15,7 @@
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
@ -34,6 +35,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Testing;
@ -480,7 +482,8 @@ private void assertColumnHasNotDefaultValue(Table table, String columnName) {
}
private void assertColumnHasDefaultValue(Table table, String columnName, Object expectedValue, TableSchemaBuilder tableSchemaBuilder) {
TableSchema schema = tableSchemaBuilder.create("test", "dummy", table, null, null, null);
TableSchema schema = tableSchemaBuilder.create(new SchemaTopicNamingStrategy(new Properties(), "test", tableSchemaBuilder.isMultiPartitionMode()), table, null,
null, null);
Schema columnSchema = schema.getEnvelopeSchema().schema().field("after").schema().field(columnName).schema();
Column column = table.columnWithName(columnName);

View File

@ -19,6 +19,8 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.CommonConnectorConfig;
public class SqlServerConnectorTest {
SqlServerConnector connector;
@ -66,7 +68,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
if (expected.equals(SqlServerConnectorConfig.DATABASE_HISTORY)) {
if (expected.equals(SqlServerConnectorConfig.DATABASE_HISTORY) || expected.equals(CommonConnectorConfig.TOPIC_NAMING_STRATEGY)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
}
assertThat(key.dependents).isEqualTo(expected.dependents());

View File

@ -12,6 +12,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
@ -24,6 +25,7 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,10 +39,11 @@
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.heartbeat.HeartbeatImpl;
import io.debezium.relational.CustomConverterRegistry;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
@ -51,6 +54,8 @@
*/
public abstract class CommonConnectorConfig {
public static final String TASK_ID = "task.id";
public static final String LOGICAL_NAME = "logical.name";
public static final String MULTI_PARTITION_MODE = "multi.partition.mode";
private static final Logger LOGGER = LoggerFactory.getLogger(CommonConnectorConfig.class);
@ -517,15 +522,15 @@ public static SchemaNameAdjustmentMode parse(String value) {
.withImportance(Importance.MEDIUM)
.withDescription("The name of the data collection that is used to send signals/commands to Debezium. Signaling is disabled when not set.");
public static final Field TRANSACTION_TOPIC = Field.create("transaction.topic")
.withDisplayName("Transaction topic name")
public static final Field TOPIC_NAMING_STRATEGY = Field.create("topic.naming.strategy")
.withDisplayName("Topic naming strategy class")
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 21))
.withType(Type.STRING)
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDefault("${database.server.name}.transaction")
.withDescription(
"The name of the transaction metadata topic. The placeholder ${database.server.name} can be used for referring to the connector's logical name; defaults to ${database.server.name}.transaction.");
.withDescription("The name of the TopicNamingStrategy class that should be used to determine the topic name " +
"for data change, schema change, transaction, heartbeat event etc.")
.withDefault(SchemaTopicNamingStrategy.class.getName());
public static final Field CUSTOM_RETRIABLE_EXCEPTION = Field.createInternal("custom.retriable.exception")
.withDisplayName("Regular expression to match the exception message.")
@ -560,7 +565,7 @@ public static SchemaNameAdjustmentMode parse(String value) {
Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX,
SIGNAL_DATA_COLLECTION,
TRANSACTION_TOPIC)
TOPIC_NAMING_STRATEGY)
.create();
private final Configuration config;
@ -588,7 +593,6 @@ public static SchemaNameAdjustmentMode parse(String value) {
private final SchemaNameAdjustmentMode schemaNameAdjustmentMode;
private final String signalingDataCollection;
private final EnumSet<Operation> skippedOperations;
private final String transactionTopic;
private final String taskId;
protected CommonConnectorConfig(Configuration config, String logicalName, int defaultSnapshotFetchSize) {
@ -617,7 +621,6 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.binaryHandlingMode = BinaryHandlingMode.parse(config.getString(BINARY_HANDLING_MODE));
this.signalingDataCollection = config.getString(SIGNAL_DATA_COLLECTION);
this.skippedOperations = determineSkippedOperations(config);
this.transactionTopic = config.getString(TRANSACTION_TOPIC).replace("${database.server.name}", logicalName);
this.taskId = config.getString(TASK_ID);
}
@ -720,13 +723,6 @@ public CustomConverterRegistry customConverterRegistry() {
return customConverterRegistry;
}
/**
* Returns the name to be used for the connector's TX metadata topic.
*/
public String getTransactionTopic() {
return transactionTopic;
}
/**
* Whether a particular connector supports an optimized way for implementing operation skipping, or not.
*/
@ -742,6 +738,25 @@ public boolean isIncrementalSnapshotSchemaChangesEnabled() {
return supportsSchemaChangesDuringIncrementalSnapshot() && incrementalSnapshotAllowSchemaChanges;
}
@SuppressWarnings("unchecked")
public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField) {
return getTopicNamingStrategy(topicNamingStrategyField, false);
}
@SuppressWarnings("unchecked")
public TopicNamingStrategy getTopicNamingStrategy(Field topicNamingStrategyField, boolean multiPartitionMode) {
Properties props = config.asProperties();
props.put(LOGICAL_NAME, logicalName);
props.put(MULTI_PARTITION_MODE, multiPartitionMode);
String strategyName = config.getString(topicNamingStrategyField);
TopicNamingStrategy topicNamingStrategy = config.getInstance(topicNamingStrategyField, TopicNamingStrategy.class, props);
if (topicNamingStrategy == null) {
throw new ConnectException("Unable to instantiate the topic naming strategy class " + strategyName);
}
LOGGER.info("Loading the custom topic naming strategy plugin: {}", strategyName);
return topicNamingStrategy;
}
@SuppressWarnings("unchecked")
private List<CustomConverter<SchemaBuilder, ConvertedField>> getCustomConverters() {
final String converterNameList = config.getString(CUSTOM_CONVERTERS);
@ -926,11 +941,11 @@ public String getTaskId() {
return taskId;
}
public Heartbeat createHeartbeat(TopicSelector<? extends DataCollectionId> topicSelector, SchemaNameAdjuster schemaNameAdjuster,
public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, SchemaNameAdjuster schemaNameAdjuster,
HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
if (getHeartbeatInterval().isZero()) {
return Heartbeat.DEFAULT_NOOP_HEARTBEAT;
}
return new HeartbeatImpl(getHeartbeatInterval(), topicSelector.getHeartbeatTopic(), getLogicalName(), schemaNameAdjuster);
return new HeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(), getLogicalName(), schemaNameAdjuster);
}
}

View File

@ -1511,6 +1511,20 @@ default <T> T getInstance(Field field, Class<T> clazz, Configuration configurati
return Instantiator.getInstance(getString(field), () -> getClass().getClassLoader(), configuration);
}
/**
* Get an instance of the class given by the value in the configuration associated with the given field.
* The instance is created using {@code Instance(Configuration)} constructor.
*
* @param field the field for the configuration property
* @param clazz the Class of which the resulting object is expected to be an instance of; may not be null
* @param props the {@link Properties} object that is passed as a parameter to the constructor
* @return the new instance, or null if there is no such key-value pair in the configuration or if there is a key-value
* configuration but the value could not be converted to an existing class with a zero-argument constructor
*/
default <T> T getInstance(Field field, Class<T> clazz, Properties props) {
return Instantiator.getInstanceWithProperties(getString(field), () -> getClass().getClassLoader(), props);
}
/**
* Return a new {@link Configuration} that contains only the subset of keys that match the given prefix.
* If desired, the keys in the resulting Configuration will have the prefix (plus any terminating "{@code .}" character if

View File

@ -13,7 +13,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.heartbeat;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
* A factory for creating the appropriate {@link Heartbeat} implementation based on the connector
* type and its configured properties.
*
* @author Chris Cranford
*/
public class HeartbeatFactory<T extends DataCollectionId> {
private final CommonConnectorConfig connectorConfig;
private final TopicNamingStrategy<T> topicNamingStrategy;
private final SchemaNameAdjuster schemaNameAdjuster;
private final HeartbeatConnectionProvider connectionProvider;
private final HeartbeatErrorHandler errorHandler;
public HeartbeatFactory(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicNamingStrategy, schemaNameAdjuster, null, null);
}
public HeartbeatFactory(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy, SchemaNameAdjuster schemaNameAdjuster,
HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
this.connectorConfig = connectorConfig;
this.topicNamingStrategy = topicNamingStrategy;
this.schemaNameAdjuster = schemaNameAdjuster;
this.connectionProvider = connectionProvider;
this.errorHandler = errorHandler;
}
public Heartbeat createHeartbeat() {
if (connectorConfig.getHeartbeatInterval().isZero()) {
return Heartbeat.DEFAULT_NOOP_HEARTBEAT;
}
if (connectorConfig instanceof RelationalDatabaseConnectorConfig) {
RelationalDatabaseConnectorConfig relConfig = (RelationalDatabaseConnectorConfig) connectorConfig;
if (relConfig.getHeartbeatActionQuery() != null) {
return new DatabaseHeartbeatImpl(
connectorConfig.getHeartbeatInterval(),
topicNamingStrategy.heartbeatTopic(),
connectorConfig.getLogicalName(),
connectionProvider.get(),
relConfig.getHeartbeatActionQuery(),
errorHandler,
schemaNameAdjuster);
}
}
return new HeartbeatImpl(
connectorConfig.getHeartbeatInterval(),
topicNamingStrategy.heartbeatTopic(),
connectorConfig.getLogicalName(),
schemaNameAdjuster);
}
}

View File

@ -36,8 +36,8 @@
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.spi.SnapshotResult.SnapshotResultStatus;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.LoggingContext;
import io.debezium.util.Threads;

View File

@ -41,12 +41,12 @@
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord.Fields;
import io.debezium.schema.DataCollectionFilters.DataCollectionFilter;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -62,7 +62,7 @@ public class EventDispatcher<P extends Partition, T extends DataCollectionId> im
private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
private final TopicSelector<T> topicSelector;
private final TopicNamingStrategy<T> topicNamingStrategy;
private final DatabaseSchema<T> schema;
private final HistorizedDatabaseSchema<T> historizedSchema;
private final ChangeEventQueue<DataChangeEvent> queue;
@ -88,28 +88,28 @@ public class EventDispatcher<P extends Partition, T extends DataCollectionId> im
*/
private final StreamingChangeRecordReceiver streamingReceiver;
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
connectorConfig.createHeartbeat(topicSelector, schemaNameAdjuster, null, null), schemaNameAdjuster);
this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider,
connectorConfig.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, null, null), schemaNameAdjuster);
}
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider,
Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider,
this(connectorConfig, topicNamingStrategy, schema, queue, filter, changeEventCreator, null, metadataProvider,
heartbeat, schemaNameAdjuster);
}
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector,
public EventDispatcher(CommonConnectorConfig connectorConfig, TopicNamingStrategy<T> topicNamingStrategy,
DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilter<T> filter,
ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler,
EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
this.connectorConfig = connectorConfig;
this.topicSelector = topicSelector;
this.topicNamingStrategy = topicNamingStrategy;
this.schema = schema;
this.historizedSchema = schema.isHistorized() ? (HistorizedDatabaseSchema<T>) schema : null;
this.queue = queue;
@ -122,7 +122,7 @@ public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> t
this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, schemaNameAdjuster,
this::enqueueTransactionMessage);
this::enqueueTransactionMessage, topicNamingStrategy.transactionTopic());
this.signal = new Signal<>(connectorConfig, this);
this.heartbeat = heartbeat;
@ -392,7 +392,7 @@ public void changeRecord(P partition,
// Truncate events must have null key schema as they are sent to table topics without keys
Schema keySchema = (key == null && operation == Operation.TRUNCATE) ? null
: dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id());
SourceRecord record = new SourceRecord(partition.getSourcePartition(),
offsetContext.getOffset(),
@ -442,7 +442,7 @@ public void changeRecord(P partition,
}
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id());
// the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks
bufferedEvent = () -> {
@ -499,7 +499,7 @@ public void changeRecord(P partition,
LOGGER.trace("Received change record for {} operation on key {}", operation, key);
Schema keySchema = dataCollectionSchema.keySchema();
String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());
String topicName = topicNamingStrategy.dataChangeTopic((T) dataCollectionSchema.id());
SourceRecord record = new SourceRecord(
partition.getSourcePartition(),
@ -541,7 +541,7 @@ public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedExcepti
historizedSchema.applySchemaChange(event);
if (connectorConfig.isSchemaChangesHistoryEnabled()) {
final String topicName = topicSelector.getPrimaryTopic();
final String topicName = topicNamingStrategy.schemaChangeTopic();
final Integer partition = 0;
final Struct key = schemaChangeRecordKey(event);
final Struct value = schemaChangeRecordValue(event);

View File

@ -14,7 +14,7 @@
import io.debezium.pipeline.metrics.traits.CommonEventMetricsMXBean;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
/**

View File

@ -18,7 +18,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.pipeline.metrics.traits.SnapshotMetricsMXBean;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
/**

View File

@ -19,7 +19,7 @@
import io.debezium.pipeline.metrics.traits.StreamingMetricsMXBean;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* Carries streaming metrics.

View File

@ -15,7 +15,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The default implementation of metrics related to the snapshot phase of a connector.

View File

@ -19,7 +19,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The default implementation of metrics related to the streaming phase of a connector.

View File

@ -20,7 +20,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* Base for metrics implementations.

View File

@ -16,7 +16,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.signal.Signal.Payload;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The action to trigger an ad-hoc snapshot.

View File

@ -17,8 +17,8 @@
import io.debezium.relational.history.JsonTableChangeSerializer;
import io.debezium.relational.history.TableChanges;
import io.debezium.relational.history.TableChanges.TableChangeType;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
public class SchemaChanges<P extends Partition> implements Signal.Action<P> {

View File

@ -25,7 +25,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The class responsible for processing of signals delivered to Debezium via a dedicated signaling table.

View File

@ -15,7 +15,7 @@
import io.debezium.document.Document;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The action to stop an ad-hoc snapshot.

View File

@ -22,7 +22,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;

View File

@ -44,8 +44,8 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;

View File

@ -12,7 +12,7 @@
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.signal.Signal.Payload;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
public class CloseIncrementalSnapshotWindow<P extends Partition> implements Signal.Action<P> {

View File

@ -9,7 +9,7 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* A Contract t

View File

@ -18,8 +18,8 @@
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
@NotThreadSafe

View File

@ -10,7 +10,7 @@
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* A factory for creating {@link ChangeEventSource}s specific to one database.

View File

@ -12,7 +12,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* A class invoked by {@link EventDispatcher} whenever an event is available for processing.

View File

@ -11,7 +11,7 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* An interface implemented by each connector that enables metrics metadata to be extracted

View File

@ -7,7 +7,7 @@
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* Invoked whenever an important event or change of state happens during the snapshot phase.

View File

@ -16,7 +16,7 @@
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* Keeps track of the current offset within the source DB's change stream. This reflects in the offset as committed to

View File

@ -11,7 +11,7 @@
import java.util.Map.Entry;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
/**
* The context holds internal state necessary for book-keeping of events in active transaction.

View File

@ -27,7 +27,7 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -80,7 +80,8 @@ public class TransactionMonitor {
private final CommonConnectorConfig connectorConfig;
public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataProvider eventMetadataProvider,
SchemaNameAdjuster schemaNameAdjuster, BlockingConsumer<SourceRecord> sender) {
SchemaNameAdjuster schemaNameAdjuster, BlockingConsumer<SourceRecord> sender,
String topicName) {
Objects.requireNonNull(eventMetadataProvider);
transactionKeySchema = SchemaBuilder.struct()
@ -97,7 +98,7 @@ public TransactionMonitor(CommonConnectorConfig connectorConfig, EventMetadataPr
.field(DEBEZIUM_TRANSACTION_TS_MS, Schema.INT64_SCHEMA)
.build();
this.topicName = connectorConfig.getTransactionTopic();
this.topicName = topicName;
this.eventMetadataProvider = eventMetadataProvider;
this.sender = sender;
this.connectorConfig = connectorConfig;

View File

@ -18,7 +18,7 @@
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* A {@link DatabaseSchema} or a relational database which has a schema history, that can be recovered to the current
@ -33,10 +33,10 @@ public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatab
protected final DatabaseHistory databaseHistory;
private boolean recoveredTables;
protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector,
protected HistorizedRelationalDatabaseSchema(HistorizedRelationalDatabaseConnectorConfig config, TopicNamingStrategy<TableId> topicNamingStrategy,
TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder,
boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) {
super(config, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);
super(config, topicNamingStrategy, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive, customKeysMapper);
this.databaseHistory = config.getDatabaseHistory();
this.databaseHistory.start();

View File

@ -38,8 +38,7 @@
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
@ -690,19 +689,19 @@ public Map<TableId, String> getSnapshotSelectOverridesByTable() {
}
@Override
public Heartbeat createHeartbeat(TopicSelector<? extends DataCollectionId> topicSelector, SchemaNameAdjuster schemaNameAdjuster,
public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy, SchemaNameAdjuster schemaNameAdjuster,
HeartbeatConnectionProvider connectionProvider, HeartbeatErrorHandler errorHandler) {
if (!Strings.isNullOrBlank(getHeartbeatActionQuery()) && !getHeartbeatInterval().isZero()) {
return new DatabaseHeartbeatImpl(
getHeartbeatInterval(),
topicSelector.getHeartbeatTopic(),
topicNamingStrategy.heartbeatTopic(),
getLogicalName(),
connectionProvider.get(),
getHeartbeatActionQuery(),
errorHandler,
schemaNameAdjuster);
}
return super.createHeartbeat(topicSelector, schemaNameAdjuster, connectionProvider, errorHandler);
return super.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, connectionProvider, errorHandler);
}
private static int validateSchemaExcludeList(Configuration config, Field field, Field.ValidationOutput problems) {

View File

@ -13,13 +13,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.data.Envelope;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.spi.topic.TopicNamingStrategy;
/**
* A {@link DatabaseSchema} of a relational database such as Postgres. Provides information about the physical structure
@ -30,43 +29,31 @@
public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId> {
private final static Logger LOG = LoggerFactory.getLogger(RelationalDatabaseSchema.class);
private final TopicSelector<TableId> topicSelector;
private final TopicNamingStrategy<TableId> topicNamingStrategy;
private final TableSchemaBuilder schemaBuilder;
private final TableFilter tableFilter;
private final ColumnNameFilter columnFilter;
private final ColumnMappers columnMappers;
private final KeyMapper customKeysMapper;
private final String schemaPrefix;
private final SchemasByTableId schemasByTableId;
private final Tables tables;
protected RelationalDatabaseSchema(RelationalDatabaseConnectorConfig config, TopicSelector<TableId> topicSelector,
protected RelationalDatabaseSchema(RelationalDatabaseConnectorConfig config, TopicNamingStrategy<TableId> topicNamingStrategy,
TableFilter tableFilter, ColumnNameFilter columnFilter, TableSchemaBuilder schemaBuilder,
boolean tableIdCaseInsensitive, KeyMapper customKeysMapper) {
this.topicSelector = topicSelector;
this.topicNamingStrategy = topicNamingStrategy;
this.schemaBuilder = schemaBuilder;
this.tableFilter = tableFilter;
this.columnFilter = columnFilter;
this.columnMappers = ColumnMappers.create(config);
this.customKeysMapper = customKeysMapper;
this.schemaPrefix = getSchemaPrefix(config.getLogicalName());
this.schemasByTableId = new SchemasByTableId(tableIdCaseInsensitive);
this.tables = new Tables(tableIdCaseInsensitive);
}
private static String getSchemaPrefix(String serverName) {
if (serverName == null) {
return "";
}
else {
serverName = serverName.trim();
return serverName.endsWith(".") || serverName.isEmpty() ? serverName : serverName + ".";
}
}
@Override
public void close() {
}
@ -132,7 +119,7 @@ protected void clearSchemas() {
*/
protected void buildAndRegisterSchema(Table table) {
if (tableFilter.isIncluded(table.id())) {
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers, customKeysMapper);
TableSchema schema = schemaBuilder.create(topicNamingStrategy, table, columnFilter, columnMappers, customKeysMapper);
schemasByTableId.put(table.id(), schema);
}
}
@ -141,10 +128,6 @@ protected void removeSchema(TableId id) {
schemasByTableId.remove(id);
}
private String getEnvelopeSchemaName(Table table) {
return Envelope.schemaName(topicSelector.topicNameFor(table.id()));
}
/**
* A map of schemas by table id. Table names are stored lower-case if required as per the config.
*/

View File

@ -5,9 +5,12 @@
*/
package io.debezium.relational;
import java.util.List;
import io.debezium.annotation.Immutable;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.schema.DataCollectionId;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
/**
* Unique identifier for a database table.
@ -182,6 +185,21 @@ public String identifier() {
return id;
}
@Override
public List<String> parts() {
return Collect.arrayListOf(catalogName, schemaName, tableName);
}
@Override
public List<String> databaseParts() {
return Collect.arrayListOf(catalogName, tableName);
}
@Override
public List<String> schemaParts() {
return Collect.arrayListOf(schemaName, tableName);
}
@Override
public int compareTo(TableId that) {
if (this == that) {

View File

@ -33,8 +33,8 @@
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.FieldNameSelector.FieldNamer;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings;
/**
* Builder that constructs {@link TableSchema} instances for {@link Table} definitions.
@ -112,24 +112,18 @@ public TableSchemaBuilder(ValueConverterProvider valueConverterProvider,
* <p>
* This is equivalent to calling {@code create(table,false)}.
*
* @param schemaPrefix the prefix added to the table identifier to construct the schema names; may be null if there is no
* prefix
* @param envelopSchemaName the name of the schema of the built table's envelope
* @param topicNamingStrategy the topic naming strategy
* @param table the table definition; may not be null
* @param filter the filter that specifies whether columns in the table should be included; may be null if all columns
* are to be included
* @param mappers the mapping functions for columns; may be null if none of the columns are to be mapped to different values
* @return the table schema that can be used for sending rows of data for this table to Kafka Connect; never null
*/
public TableSchema create(String schemaPrefix, String envelopSchemaName, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) {
if (schemaPrefix == null) {
schemaPrefix = "";
}
public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) {
// Build the schemas ...
final TableId tableId = table.id();
final String tableIdStr = tableSchemaName(tableId);
final String schemaNamePrefix = schemaPrefix + tableIdStr;
final String schemaNamePrefix = topicNamingStrategy.dataChangeTopic(tableId);
final String envelopSchemaName = Envelope.schemaName(schemaNamePrefix);
LOGGER.debug("Mapping table '{}' to schemas under '{}'", tableId, schemaNamePrefix);
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));
@ -171,28 +165,8 @@ public TableSchema create(String schemaPrefix, String envelopSchemaName, Table t
return new TableSchema(tableId, keySchema, keyGenerator, envelope, valSchema, valueGenerator);
}
/**
* Returns the type schema name for the given table.
*/
private String tableSchemaName(TableId tableId) {
if (Strings.isNullOrEmpty(tableId.catalog())) {
if (Strings.isNullOrEmpty(tableId.schema())) {
return tableId.table();
}
else {
return tableId.schema() + "." + tableId.table();
}
}
else if (Strings.isNullOrEmpty(tableId.schema())) {
return tableId.catalog() + "." + tableId.table();
}
else if (multiPartitionMode) {
return tableId.catalog() + "." + tableId.schema() + "." + tableId.table();
}
// When both catalog and schema is present then only schema is used
else {
return tableId.schema() + "." + tableId.table();
}
public boolean isMultiPartitionMode() {
return multiPartitionMode;
}
/**

View File

@ -0,0 +1,144 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.Strings;
/**
* An abstract implementation of {@link io.debezium.spi.topic.TopicNamingStrategy}
*
* @author Harvey Yue
*/
public abstract class AbstractTopicNamingStrategy<I extends DataCollectionId> implements TopicNamingStrategy<I> {
protected static final String LOGIC_NAME_PLACEHOLDER = "${logical.name}";
public static final String DEFAULT_HEARTBEAT_TOPIC_PREFIX = "__debezium-heartbeat";
public static final String DEFAULT_TRANSACTION_TOPIC = "transaction";
public static final Field TOPIC_DELIMITER = Field.create("topic.delimiter")
.withDisplayName("Topic delimiter")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(".")
.withDescription("Specify the delimiter for topic name.");
public static final Field TOPIC_PREFIX = Field.create("topic.prefix")
.withDisplayName("Topic prefix")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(LOGIC_NAME_PLACEHOLDER)
.withDescription("The name of the prefix to be used for all topics, the placeholder " + LOGIC_NAME_PLACEHOLDER +
"can be used for referring to the connector's logical name as default value.");
public static final Field TOPIC_CACHE_SIZE = Field.create("topic.cache.size")
.withDisplayName("Topic cache size")
.withType(ConfigDef.Type.INT)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(10000)
.withDescription("The size used for holding the topic names in bounded concurrent hash map. The cache " +
"will help to determine the topic name corresponding to a given data collection");
public static final Field TOPIC_HEARTBEAT_PREFIX = Field.create("topic.heartbeat.prefix")
.withDisplayName("Prefix name of heartbeat topic")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(DEFAULT_HEARTBEAT_TOPIC_PREFIX)
.withDescription("Specify the heartbeat topic name. Defaults to " +
DEFAULT_HEARTBEAT_TOPIC_PREFIX + "." + LOGIC_NAME_PLACEHOLDER);
public static final Field TOPIC_TRANSACTION = Field.create("topic.transaction")
.withDisplayName("Transaction topic name")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(DEFAULT_TRANSACTION_TOPIC)
.withDescription("Specify the transaction topic name. Defaults to " +
LOGIC_NAME_PLACEHOLDER + "." + DEFAULT_TRANSACTION_TOPIC);
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTopicNamingStrategy.class);
protected BoundedConcurrentHashMap<I, String> topicNames;
protected String logicalName;
protected String delimiter;
protected String prefix;
protected String transaction;
protected String heartbeatPrefix;
public AbstractTopicNamingStrategy(Properties props) {
this.logicalName = props.getProperty(RelationalDatabaseConnectorConfig.LOGICAL_NAME);
assert logicalName != null;
this.configure(props);
}
public AbstractTopicNamingStrategy(Properties props, String logicalName) {
this.logicalName = logicalName;
this.configure(props);
}
@Override
public void configure(Properties props) {
Configuration config = Configuration.from(props);
final Field.Set configFields = Field.setOf(
TOPIC_PREFIX,
TOPIC_DELIMITER,
TOPIC_CACHE_SIZE,
TOPIC_TRANSACTION,
TOPIC_HEARTBEAT_PREFIX);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
}
topicNames = new BoundedConcurrentHashMap<>(
config.getInteger(TOPIC_CACHE_SIZE),
10,
BoundedConcurrentHashMap.Eviction.LRU);
delimiter = config.getString(TOPIC_DELIMITER);
prefix = config.getString(TOPIC_PREFIX).replace(LOGIC_NAME_PLACEHOLDER, logicalName);
heartbeatPrefix = config.getString(TOPIC_HEARTBEAT_PREFIX);
transaction = config.getString(TOPIC_TRANSACTION);
}
@Override
public abstract String dataChangeTopic(I id);
@Override
public String schemaChangeTopic() {
return prefix;
}
@Override
public String heartbeatTopic() {
return String.join(delimiter, heartbeatPrefix, prefix);
}
@Override
public String transactionTopic() {
return String.join(delimiter, prefix, transaction);
}
protected String mkString(List<String> data, String delimiter) {
return data.stream().filter(f -> !Strings.isNullOrBlank(f)).collect(Collectors.joining(delimiter));
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.schema;
import io.debezium.spi.schema.DataCollectionId;
/**
* Provides factory methods for obtaining {@link DataCollectionFilter} instances as per the current connector configuration.
*

View File

@ -8,6 +8,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.data.Envelope;
import io.debezium.spi.schema.DataCollectionId;
public interface DataCollectionSchema {

View File

@ -5,6 +5,8 @@
*/
package io.debezium.schema;
import io.debezium.spi.schema.DataCollectionId;
/**
* The schema of a database. Provides information about the structures of the tables (collections etc.) it contains.
*

View File

@ -0,0 +1,115 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
public class DefaultRegexTopicNamingStrategy extends AbstractTopicNamingStrategy<DataCollectionId> {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRegexTopicNamingStrategy.class);
public static final Field TOPIC_REGEX = Field.create("topic.regex")
.withDisplayName("Topic regex")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.required()
.withValidation(Field::isRegex)
.withDescription("The regex used for extracting the name of the logical table from the original topic name.");
public static final Field TOPIC_REPLACEMENT = Field.create("topic.replacement")
.withDisplayName("Topic replacement")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.required()
.withValidation(DefaultRegexTopicNamingStrategy::validateTopicReplacement)
.withDescription("The replacement string used in conjunction with " + TOPIC_REGEX.name() +
". This will be used to create the new topic name.");
/**
* If TOPIC_REGEX has a value that is really a regex, then the TOPIC_REPLACEMENT must be a non-empty value.
*/
private static int validateTopicReplacement(Configuration config, Field field, Field.ValidationOutput problems) {
String topicRegex = config.getString(TOPIC_REGEX);
if (topicRegex != null) {
topicRegex = topicRegex.trim();
}
String topicReplacement = config.getString(TOPIC_REPLACEMENT);
if (topicReplacement != null) {
topicReplacement = topicReplacement.trim();
}
if (!Strings.isNullOrEmpty(topicRegex) && Strings.isNullOrEmpty(topicReplacement)) {
problems.accept(
TOPIC_REPLACEMENT,
null,
String.format("%s must be non-empty if %s is set.",
TOPIC_REPLACEMENT.name(),
TOPIC_REGEX.name()));
return 1;
}
return 0;
}
private Pattern topicRegex;
private String topicReplacement;
public DefaultRegexTopicNamingStrategy(Properties props) {
super(props);
}
@Override
public void configure(Properties props) {
super.configure(props);
Configuration config = Configuration.from(props);
topicRegex = Pattern.compile(config.getString(TOPIC_REGEX));
topicReplacement = config.getString(TOPIC_REPLACEMENT);
}
@Override
public String dataChangeTopic(DataCollectionId id) {
String oldTopic = mkString(Collect.arrayListOf(prefix, id.databaseParts()), delimiter);
return determineNewTopic(id, sanitizedTopicName(oldTopic));
}
/**
* Determine the new topic name.
*
* @param tableId the table id
* @param oldTopic the name of the old topic
* @return return the new topic name, if the regex applies. Otherwise, return original topic.
*/
private String determineNewTopic(DataCollectionId tableId, String oldTopic) {
String newTopic = topicNames.get(tableId);
if (newTopic == null) {
newTopic = oldTopic;
final Matcher matcher = topicRegex.matcher(oldTopic);
if (matcher.matches()) {
newTopic = matcher.replaceFirst(topicReplacement);
if (newTopic.isEmpty()) {
LOGGER.warn("Routing regex returned an empty topic name, propagating original topic");
}
}
topicNames.put(tableId, newTopic);
}
return newTopic;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
import java.util.Properties;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
public class DefaultTopicNamingStrategy extends AbstractTopicNamingStrategy<DataCollectionId> {
public DefaultTopicNamingStrategy(Properties props) {
super(props);
}
public DefaultTopicNamingStrategy(Properties props, String logicalName) {
super(props, logicalName);
}
public static DefaultTopicNamingStrategy create(CommonConnectorConfig config) {
return new DefaultTopicNamingStrategy(config.getConfig().asProperties(), config.getLogicalName());
}
@Override
public String dataChangeTopic(DataCollectionId id) {
String topicName = mkString(Collect.arrayListOf(prefix, id.databaseParts()), delimiter);
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}
}

View File

@ -11,6 +11,7 @@
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;
/**
* A database schema that is historized, i.e. it undergoes schema changes and can be recovered from a persistent schema

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.schema;
import java.util.Properties;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;
public class SchemaTopicNamingStrategy extends AbstractTopicNamingStrategy<DataCollectionId> {
private final boolean multiPartitionMode;
public SchemaTopicNamingStrategy(Properties props) {
super(props);
this.multiPartitionMode = props.get(CommonConnectorConfig.MULTI_PARTITION_MODE) == null ? false
: Boolean.parseBoolean(props.get(CommonConnectorConfig.MULTI_PARTITION_MODE).toString());
}
public SchemaTopicNamingStrategy(Properties props, String logicalName, boolean multiPartitionMode) {
super(props, logicalName);
this.multiPartitionMode = multiPartitionMode;
}
public static SchemaTopicNamingStrategy create(CommonConnectorConfig config) {
return create(config, false);
}
public static SchemaTopicNamingStrategy create(CommonConnectorConfig config, boolean multiPartitionMode) {
return new SchemaTopicNamingStrategy(config.getConfig().asProperties(), config.getLogicalName(), multiPartitionMode);
}
@Override
public String dataChangeTopic(DataCollectionId id) {
String topicName;
if (multiPartitionMode) {
topicName = mkString(Collect.arrayListOf(prefix, id.parts()), delimiter);
}
else {
topicName = mkString(Collect.arrayListOf(prefix, id.schemaParts()), delimiter);
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}
}

View File

@ -10,6 +10,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.BoundedConcurrentHashMap.Eviction;

View File

@ -119,6 +119,13 @@ public static <T> List<T> arrayListOf(T first, T... additional) {
return result;
}
public static <T> List<T> arrayListOf(T first, List<T> additional) {
List<T> result = new ArrayList<>();
result.add(first);
result.addAll(additional);
return result;
}
public static <T> List<T> arrayListOf(Iterable<T> values) {
List<T> result = new ArrayList<>();
values.forEach((value) -> result.add(value));

View File

@ -12,6 +12,7 @@
import java.nio.ByteBuffer;
import java.sql.Types;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
@ -30,6 +31,9 @@
import io.debezium.relational.Key.CustomKeyMapper;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaTopicNamingStrategy;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.time.Date;
import io.debezium.util.SchemaNameAdjuster;
@ -56,6 +60,7 @@ public class TableSchemaBuilderTest {
private Column c10;
private TableSchema schema;
private TopicNamingStrategy topicNamingStrategy;
private SchemaNameAdjuster adjuster;
private final CustomConverterRegistry customConverterRegistry = new CustomConverterRegistry(null);
@ -64,6 +69,7 @@ public void beforeEach() {
adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) -> {
fail("Should not have come across an invalid schema name");
});
topicNamingStrategy = new SchemaTopicNamingStrategy(new Properties(), prefix, false);
schema = null;
table = Table.editor()
.tableId(id)
@ -142,14 +148,14 @@ public void checkPreconditions() {
public void shouldFailToBuildTableSchemaFromNullTable() {
new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", null, null, null, null);
.create(topicNamingStrategy, null, null, null, null);
}
@Test
public void shouldBuildTableSchemaFromTable() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
}
@ -159,7 +165,7 @@ public void shouldBuildCorrectSchemaNames() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("schema.table.Value");
@ -171,7 +177,7 @@ public void shouldBuildCorrectSchemaNames() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(new DefaultTopicNamingStrategy(new Properties(), prefix), table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testDb.testTable.Key");
@ -184,7 +190,7 @@ public void shouldBuildCorrectSchemaNames() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testSchema.testTable.Key");
@ -197,7 +203,7 @@ public void shouldBuildCorrectSchemaNames() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("testTable.Key");
@ -210,7 +216,7 @@ public void shouldBuildCorrectSchemaNamesInMultiPartitionMode() {
// table id with catalog and schema
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, true)
.create(prefix, "sometopic", table, null, null, null);
.create(new SchemaTopicNamingStrategy(new Properties(), prefix, true), table, null, null, null);
assertThat(schema).isNotNull();
assertThat(schema.keySchema().name()).isEqualTo("catalog.schema.table.Key");
assertThat(schema.valueSchema().name()).isEqualTo("catalog.schema.table.Value");
@ -221,7 +227,7 @@ public void shouldBuildTableSchemaFromTableWithoutPrimaryKey() {
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
// Check the keys ...
assertThat(schema.keySchema()).isNull();
@ -282,7 +288,7 @@ public void shouldSanitizeFieldNamesAndBuildTableSchemaFromTableWithoutPrimaryKe
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
// Check the keys ...
assertThat(schema.keySchema()).isNull();
@ -350,7 +356,7 @@ public void shouldSanitizeFieldNamesAndValidateSerialization() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), true, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
Struct key = (Struct) schema.keyFromColumnData(keyData);
Struct value = schema.valueFromColumnData(data);
@ -369,7 +375,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
table = table.edit().setPrimaryKeyNames().create();
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull();
Schema keys = schema.keySchema();
assertThat(keys).isNotNull();
@ -383,7 +389,7 @@ public void shouldBuildTableSchemaFromTableWithCustomKey() {
public void shouldOverrideIdentityKey() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
.create(topicNamingStrategy, table, null, null, CustomKeyMapper.getInstance("(.*).table:C2,C3", null));
assertThat(schema).isNotNull();
Schema keys = schema.keySchema();
assertThat(keys).isNotNull();
@ -398,7 +404,7 @@ public void shouldOverrideIdentityKey() {
public void shouldFallbackToIdentyKeyWhenCustomMapperIsNull() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, null);
.create(topicNamingStrategy, table, null, null, null);
assertThat(schema).isNotNull();
Schema keys = schema.keySchema();
assertThat(keys).isNotNull();
@ -430,7 +436,7 @@ public void customKeyMapperShouldMapMultipleTables() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table, null, null, keyMapper);
.create(topicNamingStrategy, table, null, null, keyMapper);
assertThat(schema).isNotNull();
Schema keys = schema.keySchema();
@ -442,7 +448,7 @@ public void customKeyMapperShouldMapMultipleTables() {
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, keyMapper);
.create(topicNamingStrategy, table2, null, null, keyMapper);
assertThat(schema2).isNotNull();
Schema key2 = schema2.keySchema();
@ -470,7 +476,7 @@ public void defaultKeyMapperShouldOrderKeyColumnsBasedOnPrimaryKeyColumnNamesOrd
TableSchema schema2 = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, null, null);
.create(topicNamingStrategy, table2, null, null, null);
Schema key2 = schema2.keySchema();
assertThat(key2).isNotNull();
@ -502,7 +508,7 @@ public void mapperConvertersShouldLeaveEmptyDatesAsZero() {
schema = new TableSchemaBuilder(new JdbcValueConverters(), null, adjuster, customConverterRegistry,
SchemaBuilder.struct().build(), false, false)
.create(prefix, "sometopic", table2, null, mappers, null);
.create(topicNamingStrategy, table2, null, mappers, null);
Struct value = schema.valueFromColumnData(data);
assertThat(value.get("C1")).isEqualTo(0);