DBZ-2975: Add partition awareness to source task components

Co-authored-by: Mike Kamornikov <mikekamornikov@gmail.com>
This commit is contained in:
Sergei Morozov 2021-06-21 14:44:38 -07:00 committed by Gunnar Morling
parent 0c0c4f5e33
commit db105baeba
49 changed files with 571 additions and 168 deletions

View File

@ -19,7 +19,7 @@
*
* @author Chris Cranford
*/
public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory<MongoDbOffsetContext> {
public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory<MongoDbPartition, MongoDbOffsetContext> {
private final MongoDbConnectorConfig configuration;
private final ErrorHandler errorHandler;
@ -39,7 +39,7 @@ public MongoDbChangeEventSourceFactory(MongoDbConnectorConfig configuration, Err
}
@Override
public SnapshotChangeEventSource<MongoDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
return new MongoDbSnapshotChangeEventSource(
configuration,
taskContext,
@ -51,7 +51,7 @@ public SnapshotChangeEventSource<MongoDbOffsetContext> getSnapshotChangeEventSou
}
@Override
public StreamingChangeEventSource<MongoDbOffsetContext> getStreamingChangeEventSource() {
public StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getStreamingChangeEventSource() {
return new MongoDbStreamingChangeEventSource(
configuration,
taskContext,

View File

@ -6,6 +6,7 @@
package io.debezium.connector.mongodb;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -44,7 +45,7 @@
* @author Randall Hauch
*/
@ThreadSafe
public final class MongoDbConnectorTask extends BaseSourceTask<MongoDbOffsetContext> {
public final class MongoDbConnectorTask extends BaseSourceTask<MongoDbPartition, MongoDbOffsetContext> {
private static final String CONTEXT_NAME = "mongodb-connector-task";
@ -63,7 +64,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator<MongoDbOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> start(Configuration config) {
final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
@ -74,7 +75,7 @@ public ChangeEventSourceCoordinator<MongoDbOffsetContext> start(Configuration co
this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), structSchema);
final ReplicaSets replicaSets = getReplicaSets(config);
final MongoDbOffsetContext previousOffsets = getPreviousOffsets(connectorConfig, replicaSets);
final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig, replicaSets);
final Clock clock = Clock.system();
PreviousContext previousLogContext = taskContext.configureLoggingContext(taskName);
@ -103,8 +104,8 @@ public ChangeEventSourceCoordinator<MongoDbOffsetContext> start(Configuration co
metadataProvider,
schemaNameAdjuster);
ChangeEventSourceCoordinator<MongoDbOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
Collections.singletonMap(new MongoDbPartition(), previousOffset),
errorHandler,
MongoDbConnector.class,
connectorConfig,
@ -152,7 +153,7 @@ protected Iterable<Field> getAllConfigurationFields() {
return MongoDbConnectorConfig.ALL_FIELDS;
}
private MongoDbOffsetContext getPreviousOffsets(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) {
private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) {
MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, replicaSets);
Collection<Map<String, String>> partitions = loader.getPartitions();

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.util.Map;
import io.debezium.connector.common.Partition;
public class MongoDbPartition implements Partition {
@Override
public Map<String, String> getSourcePartition() {
throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector");
}
@Override
public boolean equals(Object obj) {
throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector");
}
@Override
public int hashCode() {
throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector");
}
}

View File

@ -49,7 +49,7 @@
*
* @author Chris Cranford
*/
public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource<MongoDbOffsetContext> {
public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class);

View File

@ -47,7 +47,7 @@
*
* @author Chris Cranford
*/
public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource<MongoDbOffsetContext> {
public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
@ -79,7 +79,8 @@ public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig,
}
@Override
public void execute(ChangeEventSourceContext context, MongoDbOffsetContext offsetContext) throws InterruptedException {
public void execute(ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext)
throws InterruptedException {
final List<ReplicaSet> validReplicaSets = replicaSets.validReplicaSets();
if (offsetContext == null) {

View File

@ -25,7 +25,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory<MySqlOffsetContext> {
public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory<MySqlPartition, MySqlOffsetContext> {
private final MySqlConnectorConfig configuration;
private final MySqlConnection connection;
@ -57,7 +57,7 @@ public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlCo
}
@Override
public SnapshotChangeEventSource<MySqlOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
public SnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
return new MySqlSnapshotChangeEventSource(configuration, connection, taskContext.getSchema(), dispatcher, clock,
(MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, record -> modifyAndFlushLastRecord(record));
}
@ -68,7 +68,7 @@ private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> modif
}
@Override
public StreamingChangeEventSource<MySqlOffsetContext> getStreamingChangeEventSource() {
public StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> getStreamingChangeEventSource() {
queue.disableBuffering();
return new MySqlStreamingChangeEventSource(
configuration,

View File

@ -7,6 +7,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
@ -41,7 +42,7 @@
* @author Jiri Pechanec
*
*/
public class MySqlConnectorTask extends BaseSourceTask<MySqlOffsetContext> {
public class MySqlConnectorTask extends BaseSourceTask<MySqlPartition, MySqlOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnectorTask.class);
private static final String CONTEXT_NAME = "mysql-connector-task";
@ -58,7 +59,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Configuration config) {
final Clock clock = Clock.system();
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(
config.edit()
@ -82,10 +83,8 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
validateBinlogConfiguration(connectorConfig);
MySqlOffsetContext previousOffset = getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig));
if (previousOffset == null) {
LOGGER.info("No previous offset found");
}
Map<MySqlPartition, MySqlOffsetContext> previousOffsets = getPreviousOffsets(new MySqlPartition.Provider(connectorConfig),
new MySqlOffsetContext.Loader(connectorConfig));
final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive();
@ -100,6 +99,8 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
throw new DebeziumException(e);
}
MySqlOffsetContext previousOffset = getTheOnlyOffset(previousOffsets);
validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema);
LOGGER.info("Reconnecting after finishing schema recovery");
@ -113,7 +114,8 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
// If the binlog position is not available it is necessary to reexecute snapshot
if (validateSnapshotFeasibility(connectorConfig, previousOffset)) {
previousOffset = null;
MySqlPartition partition = getTheOnlyPartition(previousOffsets);
previousOffsets.put(partition, null);
}
taskContext = new MySqlTaskContext(connectorConfig, schema);
@ -147,8 +149,8 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
final MySqlStreamingChangeEventSourceMetrics streamingMetrics = new MySqlStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider);
ChangeEventSourceCoordinator<MySqlOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffset,
ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
MySqlConnector.class,
connectorConfig,
@ -234,8 +236,7 @@ private void validateBinlogConfiguration(MySqlConnectorConfig config) {
}
/**
* Determine whether the binlog position as set on the {@link MySqlTaskContext#source() SourceInfo} is available in the
* server.
* Determine whether the binlog position as set on the {@link MySqlOffsetContext} is available in the server.
*
* @return {@code true} if the server has the binlog coordinates, or {@code false} otherwise
*/

View File

@ -194,11 +194,6 @@ public Loader(MySqlConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public MySqlOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)) || "true".equals(offset.get(SourceInfo.SNAPSHOT_KEY));

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import io.debezium.connector.common.Partition;
import io.debezium.util.Collect;
public class MySqlPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";
private final String serverName;
public MySqlPartition(String serverName) {
this.serverName = serverName;
}
@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final MySqlPartition other = (MySqlPartition) obj;
return Objects.equals(serverName, other.serverName);
}
@Override
public int hashCode() {
return serverName.hashCode();
}
static class Provider implements Partition.Provider<MySqlPartition> {
private final MySqlConnectorConfig connectorConfig;
Provider(MySqlConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Set<MySqlPartition> getPartitions() {
return Collections.singleton(new MySqlPartition(connectorConfig.getLogicalName()));
}
}
}

View File

@ -44,7 +44,7 @@
import io.debezium.util.Collect;
import io.debezium.util.Strings;
public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<MySqlOffsetContext> {
public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);

View File

@ -87,7 +87,7 @@
*
* @author Jiri Pechanec
*/
public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource<MySqlOffsetContext> {
public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);
@ -793,7 +793,7 @@ private SSLMode sslModeFor(SecureConnectionMode mode) {
}
@Override
public void execute(ChangeEventSourceContext context, MySqlOffsetContext offsetContext) throws InterruptedException {
public void execute(ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
if (!connectorConfig.getSnapshotMode().shouldStream()) {
LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode());
return;

View File

@ -32,6 +32,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
@ -45,7 +46,7 @@
* @author Randall Hauch
*/
@NotThreadSafe
public final class MySqlConnectorTask extends BaseSourceTask<MySqlOffsetContext> {
public final class MySqlConnectorTask extends BaseSourceTask<MySqlPartition, MySqlOffsetContext> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private volatile MySqlTaskContext taskContext;
@ -67,7 +68,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Configuration config) {
final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
PreviousContext prevLoggingContext = LoggingContext.forConnector(Module.contextName(), serverName, "task");

View File

@ -674,11 +674,13 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl
stopConnector();
// Read the last committed offsets, and verify the binlog coordinates ...
final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
final MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create()
.with(MySqlConnectorConfig.SERVER_NAME, config.getString(MySqlConnectorConfig.SERVER_NAME))
.with(MySqlConnectorConfig.SERVER_NAME, serverName)
.build()));
Map<String, ?> lastCommittedOffset = readLastCommittedOffset(config, loader.getPartition());
final MySqlOffsetContext offsetContext = (MySqlOffsetContext) loader.load(lastCommittedOffset);
final Map<String, String> partition = new MySqlPartition(serverName).getSourcePartition();
Map<String, ?> lastCommittedOffset = readLastCommittedOffset(config, partition);
final MySqlOffsetContext offsetContext = loader.load(lastCommittedOffset);
final SourceInfo persistedOffsetSource = offsetContext.getSource();
Testing.print("Position before inserts: " + positionBeforeInserts);
Testing.print("Position after inserts: " + positionAfterInserts);

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import io.debezium.connector.common.AbstractPartitionTest;
public class MySqlPartitionTest extends AbstractPartitionTest<MySqlPartition> {
@Override
protected MySqlPartition createPartition1() {
return new MySqlPartition("server1");
}
@Override
protected MySqlPartition createPartition2() {
return new MySqlPartition("server2");
}
}

View File

@ -15,7 +15,7 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory<OracleOffsetContext> {
public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory<OraclePartition, OracleOffsetContext> {
private final OracleConnectorConfig configuration;
private final OracleConnection jdbcConnection;
@ -43,13 +43,13 @@ public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, Oracl
}
@Override
public SnapshotChangeEventSource<OracleOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
public SnapshotChangeEventSource<OraclePartition, OracleOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
return new OracleSnapshotChangeEventSource(configuration, jdbcConnection,
schema, dispatcher, clock, snapshotProgressListener);
}
@Override
public StreamingChangeEventSource<OracleOffsetContext> getStreamingChangeEventSource() {
public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getStreamingChangeEventSource() {
return configuration.getAdapter().getSource(
jdbcConnection,
dispatcher,

View File

@ -7,6 +7,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
@ -27,7 +28,7 @@
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
public class OracleConnectorTask extends BaseSourceTask<OracleOffsetContext> {
public class OracleConnectorTask extends BaseSourceTask<OraclePartition, OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class);
private static final String CONTEXT_NAME = "oracle-connector-task";
@ -44,7 +45,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator<OracleOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
@ -56,7 +57,9 @@ public ChangeEventSourceCoordinator<OracleOffsetContext> start(Configuration con
TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection);
this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity);
this.schema.initializeStorage();
OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader());
Map<OraclePartition, OracleOffsetContext> previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig),
connectorConfig.getAdapter().getOffsetContextLoader());
OracleOffsetContext previousOffset = getTheOnlyOffset(previousOffsets);
if (previousOffset != null) {
schema.recover(previousOffset);
@ -91,8 +94,8 @@ public ChangeEventSourceCoordinator<OracleOffsetContext> start(Configuration con
final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
connectorConfig);
ChangeEventSourceCoordinator<OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffset,
ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
OracleConnector.class,
connectorConfig,

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import io.debezium.connector.common.Partition;
import io.debezium.util.Collect;
public class OraclePartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";
private final String serverName;
public OraclePartition(String serverName) {
this.serverName = serverName;
}
@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final OraclePartition other = (OraclePartition) obj;
return Objects.equals(serverName, other.serverName);
}
@Override
public int hashCode() {
return serverName.hashCode();
}
static class Provider implements Partition.Provider<OraclePartition> {
private final OracleConnectorConfig connectorConfig;
Provider(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Set<OraclePartition> getPartitions() {
return Collections.singleton(new OraclePartition(connectorConfig.getLogicalName()));
}
}
}

View File

@ -32,7 +32,7 @@
*
* @author Gunnar Morling
*/
public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<OracleOffsetContext> {
public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<OraclePartition, OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class);

View File

@ -48,10 +48,13 @@ enum TableNameCaseSensitivity {
OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader();
StreamingChangeEventSource<OracleOffsetContext> getSource(OracleConnection connection, EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema,
OracleTaskContext taskContext, Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics);
StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler, Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics);
/**
* Returns whether table names are case sensitive.

View File

@ -11,6 +11,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.document.Document;
@ -54,14 +55,14 @@ public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
}
@Override
public StreamingChangeEventSource<OracleOffsetContext> getSource(OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
return new LogMinerStreamingChangeEventSource(
connectorConfig,
connection,

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.oracle.logminer;
import java.util.Collections;
import java.util.Map;
import io.debezium.connector.oracle.OracleConnectorConfig;
@ -26,11 +25,6 @@ public LogMinerOracleOffsetContextLoader(OracleConnectorConfig connectorConfig)
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));

View File

@ -45,6 +45,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
@ -60,7 +61,7 @@
* A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility.
* The event handler loop is executed in a separate executor.
*/
public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource<OracleOffsetContext> {
public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class);
@ -115,7 +116,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
* change event source context
*/
@Override
public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) {
public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) {
try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(connectorConfig, schema, clock, errorHandler, streamingMetrics)) {
try {
startScn = offsetContext.getScn();

View File

@ -11,6 +11,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.Scn;
@ -65,14 +66,14 @@ public OffsetContext.Loader<OracleOffsetContext> getOffsetContextLoader() {
}
@Override
public StreamingChangeEventSource<OracleOffsetContext> getSource(OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
public StreamingChangeEventSource<OraclePartition, OracleOffsetContext> getSource(OracleConnection connection,
EventDispatcher<TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock,
OracleDatabaseSchema schema,
OracleTaskContext taskContext,
Configuration jdbcConfig,
OracleStreamingChangeEventSourceMetrics streamingMetrics) {
return new XstreamStreamingChangeEventSource(
connectorConfig,
connection,

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.oracle.xstream;
import java.util.Collections;
import java.util.Map;
import io.debezium.connector.oracle.OracleConnectorConfig;
@ -28,11 +27,6 @@ public XStreamOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public OracleOffsetContext load(Map<String, ?> offset) {
boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY));

View File

@ -17,6 +17,7 @@
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.SourceInfo;
@ -39,7 +40,7 @@
*
* @author Gunnar Morling
*/
public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource<OracleOffsetContext> {
public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class);
@ -78,7 +79,8 @@ public XstreamStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
}
@Override
public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) throws InterruptedException {
public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext)
throws InterruptedException {
LcrEventHandler eventHandler = new LcrEventHandler(connectorConfig, errorHandler, dispatcher, clock, schema, offsetContext,
TableNameCaseSensitivity.INSENSITIVE.equals(connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection)), this,

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle;
import io.debezium.connector.common.AbstractPartitionTest;
public class OraclePartitionTest extends AbstractPartitionTest<OraclePartition> {
@Override
protected OraclePartition createPartition1() {
return new OraclePartition("server1");
}
@Override
protected OraclePartition createPartition2() {
return new OraclePartition("server2");
}
}

View File

@ -6,6 +6,7 @@
package io.debezium.connector.postgresql;
import java.sql.SQLException;
import java.util.Map;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
@ -28,28 +29,32 @@
* Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base
* {@link ChangeEventSourceCoordinator} to support a pre-snapshot catch up streaming phase.
*/
public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator<PostgresOffsetContext> {
public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class);
private final Snapshotter snapshotter;
private final SlotState slotInfo;
public PostgresChangeEventSourceCoordinator(PostgresOffsetContext previousOffset, ErrorHandler errorHandler,
public PostgresChangeEventSourceCoordinator(Map<PostgresPartition, PostgresOffsetContext> previousOffsets,
ErrorHandler errorHandler,
Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig,
PostgresChangeEventSourceFactory changeEventSourceFactory,
ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory,
EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema,
Snapshotter snapshotter, SlotState slotInfo) {
super(previousOffset, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, schema);
super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory,
changeEventSourceMetricsFactory, eventDispatcher, schema);
this.snapshotter = snapshotter;
this.slotInfo = slotInfo;
}
@Override
protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext previousOffset, ChangeEventSourceContext context,
SnapshotChangeEventSource<PostgresOffsetContext> snapshotSource)
protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContext context,
SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> snapshotSource,
PostgresPartition partition,
PostgresOffsetContext previousOffset)
throws InterruptedException {
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) {
try {
@ -61,7 +66,7 @@ protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext p
}
LOGGER.info("Previous connector state exists and will stream events until {} then perform snapshot",
previousOffset.getStreamingStoppingLsn());
streamEvents(previousOffset, context);
streamEvents(context, partition, previousOffset);
return new CatchUpStreamingResult(true);
}

View File

@ -25,7 +25,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactory<PostgresOffsetContext> {
public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactory<PostgresPartition, PostgresOffsetContext> {
private final PostgresConnectorConfig configuration;
private final PostgresConnection jdbcConnection;
@ -57,7 +57,7 @@ public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, S
}
@Override
public SnapshotChangeEventSource<PostgresOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
public SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
return new PostgresSnapshotChangeEventSource(
configuration,
snapshotter,
@ -71,7 +71,7 @@ public SnapshotChangeEventSource<PostgresOffsetContext> getSnapshotChangeEventSo
}
@Override
public StreamingChangeEventSource<PostgresOffsetContext> getStreamingChangeEventSource() {
public StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> getStreamingChangeEventSource() {
return new PostgresStreamingChangeEventSource(
configuration,
snapshotter,

View File

@ -11,6 +11,7 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -49,7 +50,7 @@
*
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class PostgresConnectorTask extends BaseSourceTask<PostgresOffsetContext> {
public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, PostgresOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConnectorTask.class);
private static final String CONTEXT_NAME = "postgres-connector-task";
@ -61,7 +62,7 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresOffsetContext>
private volatile PostgresSchema schema;
@Override
public ChangeEventSourceCoordinator<PostgresOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> start(Configuration config) {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
final TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
@ -93,8 +94,10 @@ public ChangeEventSourceCoordinator<PostgresOffsetContext> start(Configuration c
schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry));
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector);
final PostgresOffsetContext previousOffset = getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig));
final Map<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
final PostgresOffsetContext previousOffset = getTheOnlyOffset(previousOffsets);
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
@ -197,8 +200,8 @@ public ChangeEventSourceCoordinator<PostgresOffsetContext> start(Configuration c
schemaNameAdjuster,
jdbcConnection);
ChangeEventSourceCoordinator<PostgresOffsetContext> coordinator = new PostgresChangeEventSourceCoordinator(
previousOffset,
ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> coordinator = new PostgresChangeEventSourceCoordinator(
previousOffsets,
errorHandler,
PostgresConnector.class,
connectorConfig,

View File

@ -195,11 +195,6 @@ public Loader(PostgresConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
private Long readOptionalLong(Map<String, ?> offset, String key) {
final Object obj = offset.get(key);
return (obj == null) ? null : ((Number) obj).longValue();

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import io.debezium.connector.common.Partition;
import io.debezium.util.Collect;
public class PostgresPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";
private final String serverName;
public PostgresPartition(String serverName) {
this.serverName = serverName;
}
@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final PostgresPartition other = (PostgresPartition) obj;
return Objects.equals(serverName, other.serverName);
}
@Override
public int hashCode() {
return serverName.hashCode();
}
static class Provider implements Partition.Provider<PostgresPartition> {
private final PostgresConnectorConfig connectorConfig;
Provider(PostgresConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Set<PostgresPartition> getPartitions() {
return Collections.singleton(new PostgresPartition(connectorConfig.getLogicalName()));
}
}
}

View File

@ -29,7 +29,7 @@
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<PostgresOffsetContext> {
public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);

View File

@ -35,7 +35,7 @@
*
* @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec
*/
public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource<PostgresOffsetContext> {
public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> {
private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive";
@ -86,7 +86,8 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
}
@Override
public void execute(ChangeEventSourceContext context, PostgresOffsetContext offsetContext) throws InterruptedException {
public void execute(ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext offsetContext)
throws InterruptedException {
if (!snapshotter.shouldStream()) {
LOGGER.info("Streaming is not enabled in correct configuration");
return;

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import io.debezium.connector.common.AbstractPartitionTest;
public class PostgresPartitionTest extends AbstractPartitionTest<PostgresPartition> {
@Override
protected PostgresPartition createPartition1() {
return new PostgresPartition("server1");
}
@Override
protected PostgresPartition createPartition2() {
return new PostgresPartition("server2");
}
}

View File

@ -20,7 +20,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory<SqlServerOffsetContext> {
public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory<SqlServerPartition, SqlServerOffsetContext> {
private final SqlServerConnectorConfig configuration;
private final SqlServerConnection dataConnection;
@ -42,13 +42,13 @@ public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration,
}
@Override
public SnapshotChangeEventSource<SqlServerOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
public SnapshotChangeEventSource<SqlServerPartition, SqlServerOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) {
return new SqlServerSnapshotChangeEventSource(configuration, dataConnection, schema, dispatcher, clock,
snapshotProgressListener);
}
@Override
public StreamingChangeEventSource<SqlServerOffsetContext> getStreamingChangeEventSource() {
public StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> getStreamingChangeEventSource() {
return new SqlServerStreamingChangeEventSource(
configuration,
dataConnection,

View File

@ -7,6 +7,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -37,7 +38,7 @@
* @author Jiri Pechanec
*
*/
public class SqlServerConnectorTask extends BaseSourceTask<SqlServerOffsetContext> {
public class SqlServerConnectorTask extends BaseSourceTask<SqlServerPartition, SqlServerOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnectorTask.class);
private static final String CONTEXT_NAME = "sql-server-connector-task";
@ -55,7 +56,7 @@ public String version() {
}
@Override
public ChangeEventSourceCoordinator<SqlServerOffsetContext> start(Configuration config) {
public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext> start(Configuration config) {
final Clock clock = Clock.system();
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
@ -85,7 +86,11 @@ public ChangeEventSourceCoordinator<SqlServerOffsetContext> start(Configuration
this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster);
this.schema.initializeStorage();
final SqlServerOffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig));
Map<SqlServerPartition, SqlServerOffsetContext> offsets = getPreviousOffsets(
new SqlServerPartition.Provider(connectorConfig),
new SqlServerOffsetContext.Loader(connectorConfig));
SqlServerOffsetContext previousOffset = getTheOnlyOffset(offsets);
if (previousOffset != null) {
schema.recover(previousOffset);
}
@ -115,8 +120,8 @@ public ChangeEventSourceCoordinator<SqlServerOffsetContext> start(Configuration
metadataProvider,
schemaNameAdjuster);
ChangeEventSourceCoordinator<SqlServerOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffset,
ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
offsets,
errorHandler,
SqlServerConnector.class,
connectorConfig,

View File

@ -148,11 +148,6 @@ public Loader(SqlServerConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Map<String, ?> getPartition() {
return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
}
@Override
public SqlServerOffsetContext load(Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY));

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import io.debezium.connector.common.Partition;
import io.debezium.util.Collect;
public class SqlServerPartition implements Partition {
private static final String SERVER_PARTITION_KEY = "server";
private final String serverName;
public SqlServerPartition(String serverName) {
this.serverName = serverName;
}
@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final SqlServerPartition other = (SqlServerPartition) obj;
return Objects.equals(serverName, other.serverName);
}
@Override
public int hashCode() {
return serverName.hashCode();
}
static class Provider implements Partition.Provider<SqlServerPartition> {
private final SqlServerConnectorConfig connectorConfig;
Provider(SqlServerConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override
public Set<SqlServerPartition> getPartitions() {
return Collections.singleton(new SqlServerPartition(connectorConfig.getLogicalName()));
}
}
}

View File

@ -30,7 +30,7 @@
import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType;
import io.debezium.util.Clock;
public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<SqlServerOffsetContext> {
public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class);

View File

@ -56,7 +56,7 @@
*
* @author Jiri Pechanec
*/
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource<SqlServerOffsetContext> {
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
@ -107,7 +107,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon
}
@Override
public void execute(ChangeEventSourceContext context, SqlServerOffsetContext offsetContext) throws InterruptedException {
public void execute(ChangeEventSourceContext context, SqlServerPartition partition, SqlServerOffsetContext offsetContext) throws InterruptedException {
if (connectorConfig.getSnapshotMode().equals(SnapshotMode.INITIAL_ONLY)) {
LOGGER.info("Streaming is not enabled in current configuration");
return;

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.sqlserver;
import io.debezium.connector.common.AbstractPartitionTest;
public class SqlServerPartitionTest extends AbstractPartitionTest<SqlServerPartition> {
@Override
protected SqlServerPartition createPartition1() {
return new SqlServerPartition("server1");
}
@Override
protected SqlServerPartition createPartition2() {
return new SqlServerPartition("server2");
}
}

View File

@ -11,6 +11,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@ -40,7 +41,7 @@
*
* @author Gunnar Morling
*/
public abstract class BaseSourceTask<O extends OffsetContext> extends SourceTask {
public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext> extends SourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class);
private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
@ -69,7 +70,7 @@ protected static enum State {
* The change event source coordinator for those connectors adhering to the new
* framework structure, {@code null} for legacy-style connectors.
*/
private ChangeEventSourceCoordinator<O> coordinator;
private ChangeEventSourceCoordinator<P, O> coordinator;
/**
* The latest offset that has been acknowledged by the Kafka producer. Will be
@ -142,7 +143,7 @@ public final void start(Map<String, String> props) {
* the task configuration; implementations should wrap it in a dedicated implementation of
* {@link CommonConnectorConfig} and work with typed access to configuration properties that way
*/
protected abstract ChangeEventSourceCoordinator<O> start(Configuration config);
protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration config);
@Override
public final List<SourceRecord> poll() throws InterruptedException {
@ -297,28 +298,58 @@ public void commit() throws InterruptedException {
protected abstract Iterable<Field> getAllConfigurationFields();
/**
* Loads the connector's persistent offset (if present) via the given loader.
* Loads the connector's persistent offsets (if present) via the given loader.
*/
protected O getPreviousOffset(OffsetContext.Loader<O> loader) {
Map<String, ?> partition = loader.getPartition();
protected Map<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
Set<P> partitions = provider.getPartitions();
OffsetReader<P, O, OffsetContext.Loader<O>> reader = new OffsetReader<>(
context.offsetStorageReader(), loader);
Map<P, O> offsets = reader.offsets(partitions);
if (lastOffset != null) {
O offsetContext = loader.load(lastOffset);
LOGGER.info("Found previous offset after restart {}", offsetContext);
return offsetContext;
boolean found = false;
for (P partition : partitions) {
O offset = offsets.get(partition);
if (offset != null) {
found = true;
LOGGER.info("Found previous partition offset {}: {}", partition, offset);
}
}
Map<String, Object> previousOffset = context.offsetStorageReader()
.offsets(Collections.singleton(partition))
.get(partition);
if (!found) {
LOGGER.info("No previous offsets found");
}
if (previousOffset != null) {
O offsetContext = loader.load(previousOffset);
LOGGER.info("Found previous offset {}", offsetContext);
return offsetContext;
return offsets;
}
/**
* Returns the offset of the only partition that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
protected P getTheOnlyPartition(Map<P, O> offsets) {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
else {
return null;
return offsets.entrySet().iterator().next().getKey();
}
/**
* Returns the offset of the only partition that the task is configured to use.
*
* This method is meant to be used only by the connectors that do not implement handling
* multiple partitions per task.
*/
protected O getTheOnlyOffset(Map<P, O> offsets) {
if (offsets.size() != 1) {
throw new DebeziumException("The task must be configured to use exactly one partition, "
+ offsets.size() + " found");
}
return offsets.entrySet().iterator().next().getValue();
}
}

View File

@ -13,6 +13,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,6 +22,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.common.Partition;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
@ -45,7 +47,7 @@
* @author Gunnar Morling
*/
@ThreadSafe
public class ChangeEventSourceCoordinator<O extends OffsetContext> {
public class ChangeEventSourceCoordinator<P extends Partition, O extends OffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class);
@ -54,26 +56,26 @@ public class ChangeEventSourceCoordinator<O extends OffsetContext> {
*/
public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90);
private final O previousOffset;
private final Map<P, O> previousOffsets;
private final ErrorHandler errorHandler;
private final ChangeEventSourceFactory<O> changeEventSourceFactory;
private final ChangeEventSourceFactory<P, O> changeEventSourceFactory;
private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory;
private final ExecutorService executor;
private final EventDispatcher<?> eventDispatcher;
private final DatabaseSchema<?> schema;
private volatile boolean running;
private volatile StreamingChangeEventSource<O> streamingSource;
private volatile StreamingChangeEventSource<P, O> streamingSource;
private final ReentrantLock commitOffsetLock = new ReentrantLock();
private SnapshotChangeEventSourceMetrics snapshotMetrics;
private StreamingChangeEventSourceMetrics streamingMetrics;
public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType,
public ChangeEventSourceCoordinator(Map<P, O> previousOffsets, ErrorHandler errorHandler, Class<? extends SourceConnector> connectorType,
CommonConnectorConfig connectorConfig,
ChangeEventSourceFactory<O> changeEventSourceFactory,
ChangeEventSourceFactory<P, O> changeEventSourceFactory,
ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher<?> eventDispatcher, DatabaseSchema<?> schema) {
this.previousOffset = previousOffset;
this.previousOffsets = previousOffsets;
this.errorHandler = errorHandler;
this.changeEventSourceFactory = changeEventSourceFactory;
this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory;
@ -84,6 +86,16 @@ public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler,
public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics,
EventMetadataProvider metadataProvider) {
if (previousOffsets.size() != 1) {
throw new ConnectException("The coordinator must be provided with exactly one partition, "
+ previousOffsets.size() + " found");
}
Map.Entry<P, O> entry = previousOffsets.entrySet().iterator().next();
final P partition = entry.getKey();
final O previousOffset = entry.getValue();
AtomicReference<LoggingContext.PreviousContext> previousLogContext = new AtomicReference<>();
try {
this.snapshotMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider);
@ -101,8 +113,8 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu
ChangeEventSourceContext context = new ChangeEventSourceContextImpl();
LOGGER.info("Context created");
SnapshotChangeEventSource<O> snapshotSource = changeEventSourceFactory.getSnapshotChangeEventSource(snapshotMetrics);
CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(previousOffset, context, snapshotSource);
SnapshotChangeEventSource<P, O> snapshotSource = changeEventSourceFactory.getSnapshotChangeEventSource(snapshotMetrics);
CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(context, snapshotSource, partition, previousOffset);
if (catchUpStreamingResult.performedCatchUpStreaming) {
streamingConnected(false);
commitOffsetLock.lock();
@ -110,7 +122,7 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu
commitOffsetLock.unlock();
}
eventDispatcher.setEventListener(snapshotMetrics);
SnapshotResult<O> snapshotResult = snapshotSource.execute(context, previousOffset);
SnapshotResult<O> snapshotResult = snapshotSource.execute(context, partition, previousOffset);
LOGGER.info("Snapshot ended with {}", snapshotResult);
if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) {
@ -119,7 +131,7 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu
if (running && snapshotResult.isCompletedOrSkipped()) {
previousLogContext.set(taskContext.configureLoggingContext("streaming"));
streamEvents(snapshotResult.getOffset(), context);
streamEvents(context, partition, snapshotResult.getOffset());
}
}
catch (InterruptedException e) {
@ -141,13 +153,14 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu
}
}
protected CatchUpStreamingResult executeCatchUpStreaming(O previousOffset, ChangeEventSourceContext context,
SnapshotChangeEventSource<O> snapshotSource)
protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContext context,
SnapshotChangeEventSource<P, O> snapshotSource,
P partition, O previousOffset)
throws InterruptedException {
return new CatchUpStreamingResult(false);
}
protected void streamEvents(O offsetContext, ChangeEventSourceContext context) throws InterruptedException {
protected void streamEvents(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException {
streamingSource = changeEventSourceFactory.getStreamingChangeEventSource();
final Optional<IncrementalSnapshotChangeEventSource<? extends DataCollectionId>> incrementalSnapshotChangeEventSource = changeEventSourceFactory
.getIncrementalSnapshotChangeEventSource(offsetContext, snapshotMetrics, snapshotMetrics);
@ -156,7 +169,7 @@ protected void streamEvents(O offsetContext, ChangeEventSourceContext context) t
streamingConnected(true);
LOGGER.info("Starting streaming");
incrementalSnapshotChangeEventSource.ifPresent(x -> x.init(offsetContext));
streamingSource.execute(context, offsetContext);
streamingSource.execute(context, partition, offsetContext);
LOGGER.info("Finished streaming");
}

View File

@ -16,6 +16,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.common.Partition;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
@ -31,7 +32,7 @@
*
* @author Chris Cranford
*/
public abstract class AbstractSnapshotChangeEventSource<O extends OffsetContext> implements SnapshotChangeEventSource<O> {
public abstract class AbstractSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> implements SnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSnapshotChangeEventSource.class);
@ -44,7 +45,7 @@ public AbstractSnapshotChangeEventSource(CommonConnectorConfig connectorConfig,
}
@Override
public SnapshotResult<O> execute(ChangeEventSourceContext context, O previousOffset) throws InterruptedException {
public SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);
if (snapshottingTask.shouldSkipSnapshot()) {
LOGGER.debug("Skipping snapshotting");

View File

@ -7,6 +7,7 @@
import java.util.Optional;
import io.debezium.connector.common.Partition;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
@ -16,7 +17,7 @@
*
* @author Gunnar Morling
*/
public interface ChangeEventSourceFactory<O extends OffsetContext> {
public interface ChangeEventSourceFactory<P extends Partition, O extends OffsetContext> {
/**
* Returns a snapshot change event source that may emit change events for schema and/or data changes. Depending on
@ -30,12 +31,12 @@ public interface ChangeEventSourceFactory<O extends OffsetContext> {
*
* @return A snapshot change event source
*/
SnapshotChangeEventSource<O> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener);
SnapshotChangeEventSource<P, O> getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener);
/**
* Returns a streaming change event source that starts streaming at the given offset.
*/
StreamingChangeEventSource<O> getStreamingChangeEventSource();
StreamingChangeEventSource<P, O> getStreamingChangeEventSource();
/**
* Returns and incremental snapshot change event source that can run in parallel with streaming

View File

@ -5,6 +5,7 @@
*/
package io.debezium.pipeline.source.spi;
import io.debezium.connector.common.Partition;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
@ -14,7 +15,7 @@
*
* @author Gunnar Morling
*/
public interface SnapshotChangeEventSource<O extends OffsetContext> extends ChangeEventSource {
public interface SnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends ChangeEventSource {
/**
* Executes this source. Implementations should regularly check via the given context if they should stop. If that's
@ -23,11 +24,13 @@ public interface SnapshotChangeEventSource<O extends OffsetContext> extends Chan
*
* @param context
* contextual information for this source's execution
* @param partition
* the source partition from which the snapshot should be taken
* @param previousOffset
* previous offset restored from Kafka
* @return an indicator to the position at which the snapshot was taken
* @throws InterruptedException
* in case the snapshot was aborted before completion
*/
SnapshotResult<O> execute(ChangeEventSourceContext context, O previousOffset) throws InterruptedException;
SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException;
}

View File

@ -7,6 +7,7 @@
import java.util.Map;
import io.debezium.connector.common.Partition;
import io.debezium.pipeline.spi.OffsetContext;
/**
@ -14,7 +15,7 @@
*
* @author Gunnar Morling
*/
public interface StreamingChangeEventSource<O extends OffsetContext> extends ChangeEventSource {
public interface StreamingChangeEventSource<P extends Partition, O extends OffsetContext> extends ChangeEventSource {
/**
* Executes this source. Implementations should regularly check via the given context if they should stop. If that's
@ -23,12 +24,14 @@ public interface StreamingChangeEventSource<O extends OffsetContext> extends Cha
*
* @param context
* contextual information for this source's execution
* @param partition
* the source partition from which the changes should be streamed
* @param offsetContext
* @return an indicator to the position at which the snapshot was taken
* @throws InterruptedException
* in case the snapshot was aborted before completion
*/
void execute(ChangeEventSourceContext context, O offsetContext) throws InterruptedException;
void execute(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException;
/**
* Commits the given offset with the source database. Used by some connectors

View File

@ -30,8 +30,6 @@ public interface OffsetContext {
* Implementations load a connector-specific offset context based on the offset values stored in Kafka.
*/
interface Loader<O extends OffsetContext> {
Map<String, ?> getPartition();
O load(Map<String, ?> offset);
}

View File

@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.common.Partition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
@ -50,7 +51,7 @@
*
* @author Gunnar Morling
*/
public abstract class RelationalSnapshotChangeEventSource<O extends OffsetContext> extends AbstractSnapshotChangeEventSource<O> {
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);