diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceFactory.java index 5cadc8df4..7f2a04436 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceFactory.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbChangeEventSourceFactory.java @@ -19,7 +19,7 @@ * * @author Chris Cranford */ -public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory { +public class MongoDbChangeEventSourceFactory implements ChangeEventSourceFactory { private final MongoDbConnectorConfig configuration; private final ErrorHandler errorHandler; @@ -39,7 +39,7 @@ public MongoDbChangeEventSourceFactory(MongoDbConnectorConfig configuration, Err } @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { + public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { return new MongoDbSnapshotChangeEventSource( configuration, taskContext, @@ -51,7 +51,7 @@ public SnapshotChangeEventSource getSnapshotChangeEventSou } @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { + public StreamingChangeEventSource getStreamingChangeEventSource() { return new MongoDbStreamingChangeEventSource( configuration, taskContext, diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java index c2e19056e..c06a460b6 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorTask.java @@ -6,6 +6,7 @@ package io.debezium.connector.mongodb; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,7 +45,7 @@ * @author Randall Hauch */ @ThreadSafe -public final class MongoDbConnectorTask extends BaseSourceTask { +public final class MongoDbConnectorTask extends BaseSourceTask { private static final String CONTEXT_NAME = "mongodb-connector-task"; @@ -63,7 +64,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config); final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); @@ -74,7 +75,7 @@ public ChangeEventSourceCoordinator start(Configuration co this.schema = new MongoDbSchema(taskContext.filters(), taskContext.topicSelector(), structSchema); final ReplicaSets replicaSets = getReplicaSets(config); - final MongoDbOffsetContext previousOffsets = getPreviousOffsets(connectorConfig, replicaSets); + final MongoDbOffsetContext previousOffset = getPreviousOffset(connectorConfig, replicaSets); final Clock clock = Clock.system(); PreviousContext previousLogContext = taskContext.configureLoggingContext(taskName); @@ -103,8 +104,8 @@ public ChangeEventSourceCoordinator start(Configuration co metadataProvider, schemaNameAdjuster); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( - previousOffsets, + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( + Collections.singletonMap(new MongoDbPartition(), previousOffset), errorHandler, MongoDbConnector.class, connectorConfig, @@ -152,7 +153,7 @@ protected Iterable getAllConfigurationFields() { return MongoDbConnectorConfig.ALL_FIELDS; } - private MongoDbOffsetContext getPreviousOffsets(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) { + private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) { MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(connectorConfig, replicaSets); Collection> partitions = loader.getPartitions(); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbPartition.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbPartition.java new file mode 100644 index 000000000..1528b0c10 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbPartition.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import java.util.Map; + +import io.debezium.connector.common.Partition; + +public class MongoDbPartition implements Partition { + + @Override + public Map getSourcePartition() { + throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector"); + } + + @Override + public boolean equals(Object obj) { + throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector"); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException("Currently unsupported by the MongoDB connector"); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 958f864e7..0ff550d11 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -49,7 +49,7 @@ * * @author Chris Cranford */ -public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource { +public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 336569fb4..b3eec396c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -47,7 +47,7 @@ * * @author Chris Cranford */ -public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource { +public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class); @@ -79,7 +79,8 @@ public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, } @Override - public void execute(ChangeEventSourceContext context, MongoDbOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext) + throws InterruptedException { final List validReplicaSets = replicaSets.validReplicaSets(); if (offsetContext == null) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java index 0930ff27a..830c9d266 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceFactory.java @@ -25,7 +25,7 @@ import io.debezium.schema.DataCollectionId; import io.debezium.util.Clock; -public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory { +public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory { private final MySqlConnectorConfig configuration; private final MySqlConnection connection; @@ -57,7 +57,7 @@ public MySqlChangeEventSourceFactory(MySqlConnectorConfig configuration, MySqlCo } @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { + public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { return new MySqlSnapshotChangeEventSource(configuration, connection, taskContext.getSchema(), dispatcher, clock, (MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, record -> modifyAndFlushLastRecord(record)); } @@ -68,7 +68,7 @@ private void modifyAndFlushLastRecord(Function modif } @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { + public StreamingChangeEventSource getStreamingChangeEventSource() { queue.disableBuffering(); return new MySqlStreamingChangeEventSource( configuration, diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 624dd5129..2aa50669a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -7,6 +7,7 @@ import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.connect.source.SourceRecord; @@ -41,7 +42,7 @@ * @author Jiri Pechanec * */ -public class MySqlConnectorTask extends BaseSourceTask { +public class MySqlConnectorTask extends BaseSourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnectorTask.class); private static final String CONTEXT_NAME = "mysql-connector-task"; @@ -58,7 +59,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final Clock clock = Clock.system(); final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig( config.edit() @@ -82,10 +83,8 @@ public ChangeEventSourceCoordinator start(Configuration conf validateBinlogConfiguration(connectorConfig); - MySqlOffsetContext previousOffset = getPreviousOffset(new MySqlOffsetContext.Loader(connectorConfig)); - if (previousOffset == null) { - LOGGER.info("No previous offset found"); - } + Map previousOffsets = getPreviousOffsets(new MySqlPartition.Provider(connectorConfig), + new MySqlOffsetContext.Loader(connectorConfig)); final boolean tableIdCaseInsensitive = connection.isTableIdCaseSensitive(); @@ -100,6 +99,8 @@ public ChangeEventSourceCoordinator start(Configuration conf throw new DebeziumException(e); } + MySqlOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); + validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema); LOGGER.info("Reconnecting after finishing schema recovery"); @@ -113,7 +114,8 @@ public ChangeEventSourceCoordinator start(Configuration conf // If the binlog position is not available it is necessary to reexecute snapshot if (validateSnapshotFeasibility(connectorConfig, previousOffset)) { - previousOffset = null; + MySqlPartition partition = getTheOnlyPartition(previousOffsets); + previousOffsets.put(partition, null); } taskContext = new MySqlTaskContext(connectorConfig, schema); @@ -147,8 +149,8 @@ public ChangeEventSourceCoordinator start(Configuration conf final MySqlStreamingChangeEventSourceMetrics streamingMetrics = new MySqlStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( - previousOffset, + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( + previousOffsets, errorHandler, MySqlConnector.class, connectorConfig, @@ -234,8 +236,7 @@ private void validateBinlogConfiguration(MySqlConnectorConfig config) { } /** - * Determine whether the binlog position as set on the {@link MySqlTaskContext#source() SourceInfo} is available in the - * server. + * Determine whether the binlog position as set on the {@link MySqlOffsetContext} is available in the server. * * @return {@code true} if the server has the binlog coordinates, or {@code false} otherwise */ diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index 6f894f573..f54772f21 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -194,11 +194,6 @@ public Loader(MySqlConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } - @Override - public Map getPartition() { - return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - } - @Override public MySqlOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)) || "true".equals(offset.get(SourceInfo.SNAPSHOT_KEY)); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlPartition.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlPartition.java new file mode 100644 index 000000000..a8c914d09 --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlPartition.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.debezium.connector.common.Partition; +import io.debezium.util.Collect; + +public class MySqlPartition implements Partition { + private static final String SERVER_PARTITION_KEY = "server"; + + private final String serverName; + + public MySqlPartition(String serverName) { + this.serverName = serverName; + } + + @Override + public Map getSourcePartition() { + return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final MySqlPartition other = (MySqlPartition) obj; + return Objects.equals(serverName, other.serverName); + } + + @Override + public int hashCode() { + return serverName.hashCode(); + } + + static class Provider implements Partition.Provider { + private final MySqlConnectorConfig connectorConfig; + + Provider(MySqlConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override + public Set getPartitions() { + return Collections.singleton(new MySqlPartition(connectorConfig.getLogicalName())); + } + } +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index 3f7ed6bfa..78c971c10 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -44,7 +44,7 @@ import io.debezium.util.Collect; import io.debezium.util.Strings; -public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { +public class MySqlSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 236c49a0d..b589aff06 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,7 +87,7 @@ * * @author Jiri Pechanec */ -public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { +public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class); @@ -793,7 +793,7 @@ private SSLMode sslModeFor(SecureConnectionMode mode) { } @Override - public void execute(ChangeEventSourceContext context, MySqlOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException { if (!connectorConfig.getSnapshotMode().shouldStream()) { LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode()); return; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlConnectorTask.java index c5a32c5b6..40439a3ac 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlConnectorTask.java @@ -32,6 +32,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlOffsetContext; +import io.debezium.connector.mysql.MySqlPartition; import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.schema.TopicSelector; import io.debezium.util.Collect; @@ -45,7 +46,7 @@ * @author Randall Hauch */ @NotThreadSafe -public final class MySqlConnectorTask extends BaseSourceTask { +public final class MySqlConnectorTask extends BaseSourceTask { private final Logger logger = LoggerFactory.getLogger(getClass()); private volatile MySqlTaskContext taskContext; @@ -67,7 +68,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME); PreviousContext prevLoggingContext = LoggingContext.forConnector(Module.contextName(), serverName, "task"); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 90e0dbb55..fae57b380 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -674,11 +674,13 @@ private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncl stopConnector(); // Read the last committed offsets, and verify the binlog coordinates ... + final String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME); final MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(new MySqlConnectorConfig(Configuration.create() - .with(MySqlConnectorConfig.SERVER_NAME, config.getString(MySqlConnectorConfig.SERVER_NAME)) + .with(MySqlConnectorConfig.SERVER_NAME, serverName) .build())); - Map lastCommittedOffset = readLastCommittedOffset(config, loader.getPartition()); - final MySqlOffsetContext offsetContext = (MySqlOffsetContext) loader.load(lastCommittedOffset); + final Map partition = new MySqlPartition(serverName).getSourcePartition(); + Map lastCommittedOffset = readLastCommittedOffset(config, partition); + final MySqlOffsetContext offsetContext = loader.load(lastCommittedOffset); final SourceInfo persistedOffsetSource = offsetContext.getSource(); Testing.print("Position before inserts: " + positionBeforeInserts); Testing.print("Position after inserts: " + positionAfterInserts); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlPartitionTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlPartitionTest.java new file mode 100644 index 000000000..3d58dd636 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlPartitionTest.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import io.debezium.connector.common.AbstractPartitionTest; + +public class MySqlPartitionTest extends AbstractPartitionTest { + + @Override + protected MySqlPartition createPartition1() { + return new MySqlPartition("server1"); + } + + @Override + protected MySqlPartition createPartition2() { + return new MySqlPartition("server2"); + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java index a02ad3e26..f1fa20e46 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleChangeEventSourceFactory.java @@ -15,7 +15,7 @@ import io.debezium.relational.TableId; import io.debezium.util.Clock; -public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory { +public class OracleChangeEventSourceFactory implements ChangeEventSourceFactory { private final OracleConnectorConfig configuration; private final OracleConnection jdbcConnection; @@ -43,13 +43,13 @@ public OracleChangeEventSourceFactory(OracleConnectorConfig configuration, Oracl } @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { + public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { return new OracleSnapshotChangeEventSource(configuration, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener); } @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { + public StreamingChangeEventSource getStreamingChangeEventSource() { return configuration.getAdapter().getSource( jdbcConnection, dispatcher, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index b9fc48e70..67388706e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -7,6 +7,7 @@ import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.connect.source.SourceRecord; @@ -27,7 +28,7 @@ import io.debezium.util.Clock; import io.debezium.util.SchemaNameAdjuster; -public class OracleConnectorTask extends BaseSourceTask { +public class OracleConnectorTask extends BaseSourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class); private static final String CONTEXT_NAME = "oracle-connector-task"; @@ -44,7 +45,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); TopicSelector topicSelector = OracleTopicSelector.defaultSelector(connectorConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); @@ -56,7 +57,9 @@ public ChangeEventSourceCoordinator start(Configuration con TableNameCaseSensitivity tableNameCaseSensitivity = connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection); this.schema = new OracleDatabaseSchema(connectorConfig, valueConverters, schemaNameAdjuster, topicSelector, tableNameCaseSensitivity); this.schema.initializeStorage(); - OracleOffsetContext previousOffset = getPreviousOffset(connectorConfig.getAdapter().getOffsetContextLoader()); + Map previousOffsets = getPreviousOffsets(new OraclePartition.Provider(connectorConfig), + connectorConfig.getAdapter().getOffsetContextLoader()); + OracleOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); if (previousOffset != null) { schema.recover(previousOffset); @@ -91,8 +94,8 @@ public ChangeEventSourceCoordinator start(Configuration con final OracleStreamingChangeEventSourceMetrics streamingMetrics = new OracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider, connectorConfig); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( - previousOffset, + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( + previousOffsets, errorHandler, OracleConnector.class, connectorConfig, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OraclePartition.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OraclePartition.java new file mode 100644 index 000000000..30504006f --- /dev/null +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OraclePartition.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.debezium.connector.common.Partition; +import io.debezium.util.Collect; + +public class OraclePartition implements Partition { + private static final String SERVER_PARTITION_KEY = "server"; + + private final String serverName; + + public OraclePartition(String serverName) { + this.serverName = serverName; + } + + @Override + public Map getSourcePartition() { + return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final OraclePartition other = (OraclePartition) obj; + return Objects.equals(serverName, other.serverName); + } + + @Override + public int hashCode() { + return serverName.hashCode(); + } + + static class Provider implements Partition.Provider { + private final OracleConnectorConfig connectorConfig; + + Provider(OracleConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override + public Set getPartitions() { + return Collections.singleton(new OraclePartition(connectorConfig.getLogicalName())); + } + } +} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index ff36161cf..5610ead6f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -32,7 +32,7 @@ * * @author Gunnar Morling */ -public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { +public class OracleSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(OracleSnapshotChangeEventSource.class); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java index 3a9ad1d5c..1da25d08e 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/StreamingAdapter.java @@ -48,10 +48,13 @@ enum TableNameCaseSensitivity { OffsetContext.Loader getOffsetContextLoader(); - StreamingChangeEventSource getSource(OracleConnection connection, EventDispatcher dispatcher, - ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema, - OracleTaskContext taskContext, Configuration jdbcConfig, - OracleStreamingChangeEventSourceMetrics streamingMetrics); + StreamingChangeEventSource getSource(OracleConnection connection, + EventDispatcher dispatcher, + ErrorHandler errorHandler, Clock clock, + OracleDatabaseSchema schema, + OracleTaskContext taskContext, + Configuration jdbcConfig, + OracleStreamingChangeEventSourceMetrics streamingMetrics); /** * Returns whether table names are case sensitive. diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java index a0b5c2f20..12afc279d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerAdapter.java @@ -11,6 +11,7 @@ import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.OracleTaskContext; import io.debezium.document.Document; @@ -54,14 +55,14 @@ public OffsetContext.Loader getOffsetContextLoader() { } @Override - public StreamingChangeEventSource getSource(OracleConnection connection, - EventDispatcher dispatcher, - ErrorHandler errorHandler, - Clock clock, - OracleDatabaseSchema schema, - OracleTaskContext taskContext, - Configuration jdbcConfig, - OracleStreamingChangeEventSourceMetrics streamingMetrics) { + public StreamingChangeEventSource getSource(OracleConnection connection, + EventDispatcher dispatcher, + ErrorHandler errorHandler, + Clock clock, + OracleDatabaseSchema schema, + OracleTaskContext taskContext, + Configuration jdbcConfig, + OracleStreamingChangeEventSourceMetrics streamingMetrics) { return new LogMinerStreamingChangeEventSource( connectorConfig, connection, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java index 7ea04e615..88c94f75d 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerOracleOffsetContextLoader.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.oracle.logminer; -import java.util.Collections; import java.util.Map; import io.debezium.connector.oracle.OracleConnectorConfig; @@ -26,11 +25,6 @@ public LogMinerOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) this.connectorConfig = connectorConfig; } - @Override - public Map getPartition() { - return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - } - @Override public OracleOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index a1d42a7dd..aa3a4f716 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -45,6 +45,7 @@ import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.Scn; import io.debezium.jdbc.JdbcConfiguration; @@ -60,7 +61,7 @@ * A {@link StreamingChangeEventSource} based on Oracle's LogMiner utility. * The event handler loop is executed in a separate executor. */ -public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource { +public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class); @@ -115,7 +116,7 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig, * change event source context */ @Override - public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) { + public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) { try (TransactionalBuffer transactionalBuffer = new TransactionalBuffer(connectorConfig, schema, clock, errorHandler, streamingMetrics)) { try { startScn = offsetContext.getScn(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java index b4348c2bc..f8561e692 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamAdapter.java @@ -11,6 +11,7 @@ import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.OracleTaskContext; import io.debezium.connector.oracle.Scn; @@ -65,14 +66,14 @@ public OffsetContext.Loader getOffsetContextLoader() { } @Override - public StreamingChangeEventSource getSource(OracleConnection connection, - EventDispatcher dispatcher, - ErrorHandler errorHandler, - Clock clock, - OracleDatabaseSchema schema, - OracleTaskContext taskContext, - Configuration jdbcConfig, - OracleStreamingChangeEventSourceMetrics streamingMetrics) { + public StreamingChangeEventSource getSource(OracleConnection connection, + EventDispatcher dispatcher, + ErrorHandler errorHandler, + Clock clock, + OracleDatabaseSchema schema, + OracleTaskContext taskContext, + Configuration jdbcConfig, + OracleStreamingChangeEventSourceMetrics streamingMetrics) { return new XstreamStreamingChangeEventSource( connectorConfig, connection, diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java index a7beb8a51..78e012280 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XStreamOracleOffsetContextLoader.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.oracle.xstream; -import java.util.Collections; import java.util.Map; import io.debezium.connector.oracle.OracleConnectorConfig; @@ -28,11 +27,6 @@ public XStreamOracleOffsetContextLoader(OracleConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } - @Override - public Map getPartition() { - return Collections.singletonMap(OracleOffsetContext.SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - } - @Override public OracleOffsetContext load(Map offset) { boolean snapshot = Boolean.TRUE.equals(offset.get(SourceInfo.SNAPSHOT_KEY)); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java index 32b67431b..cae7855ac 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.java @@ -17,6 +17,7 @@ import io.debezium.connector.oracle.OracleDatabaseSchema; import io.debezium.connector.oracle.OracleDatabaseVersion; import io.debezium.connector.oracle.OracleOffsetContext; +import io.debezium.connector.oracle.OraclePartition; import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics; import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.SourceInfo; @@ -39,7 +40,7 @@ * * @author Gunnar Morling */ -public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource { +public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class); @@ -78,7 +79,8 @@ public XstreamStreamingChangeEventSource(OracleConnectorConfig connectorConfig, } @Override - public void execute(ChangeEventSourceContext context, OracleOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, OraclePartition partition, OracleOffsetContext offsetContext) + throws InterruptedException { LcrEventHandler eventHandler = new LcrEventHandler(connectorConfig, errorHandler, dispatcher, clock, schema, offsetContext, TableNameCaseSensitivity.INSENSITIVE.equals(connectorConfig.getAdapter().getTableNameCaseSensitivity(jdbcConnection)), this, diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePartitionTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePartitionTest.java new file mode 100644 index 000000000..09bc66862 --- /dev/null +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OraclePartitionTest.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle; + +import io.debezium.connector.common.AbstractPartitionTest; + +public class OraclePartitionTest extends AbstractPartitionTest { + + @Override + protected OraclePartition createPartition1() { + return new OraclePartition("server1"); + } + + @Override + protected OraclePartition createPartition2() { + return new OraclePartition("server2"); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index 14f997e1b..d2f53dfe7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -6,6 +6,7 @@ package io.debezium.connector.postgresql; import java.sql.SQLException; +import java.util.Map; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; @@ -28,28 +29,32 @@ * Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base * {@link ChangeEventSourceCoordinator} to support a pre-snapshot catch up streaming phase. */ -public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator { +public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoordinator { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class); private final Snapshotter snapshotter; private final SlotState slotInfo; - public PostgresChangeEventSourceCoordinator(PostgresOffsetContext previousOffset, ErrorHandler errorHandler, + public PostgresChangeEventSourceCoordinator(Map previousOffsets, + ErrorHandler errorHandler, Class connectorType, CommonConnectorConfig connectorConfig, PostgresChangeEventSourceFactory changeEventSourceFactory, ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher eventDispatcher, DatabaseSchema schema, Snapshotter snapshotter, SlotState slotInfo) { - super(previousOffset, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, schema); + super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, + changeEventSourceMetricsFactory, eventDispatcher, schema); this.snapshotter = snapshotter; this.slotInfo = slotInfo; } @Override - protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext previousOffset, ChangeEventSourceContext context, - SnapshotChangeEventSource snapshotSource) + protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContext context, + SnapshotChangeEventSource snapshotSource, + PostgresPartition partition, + PostgresOffsetContext previousOffset) throws InterruptedException { if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) { try { @@ -61,7 +66,7 @@ protected CatchUpStreamingResult executeCatchUpStreaming(PostgresOffsetContext p } LOGGER.info("Previous connector state exists and will stream events until {} then perform snapshot", previousOffset.getStreamingStoppingLsn()); - streamEvents(previousOffset, context); + streamEvents(context, partition, previousOffset); return new CatchUpStreamingResult(true); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java index 7f631eb0c..b065bfac3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java @@ -25,7 +25,7 @@ import io.debezium.schema.DataCollectionId; import io.debezium.util.Clock; -public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactory { +public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactory { private final PostgresConnectorConfig configuration; private final PostgresConnection jdbcConnection; @@ -57,7 +57,7 @@ public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, S } @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { + public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { return new PostgresSnapshotChangeEventSource( configuration, snapshotter, @@ -71,7 +71,7 @@ public SnapshotChangeEventSource getSnapshotChangeEventSo } @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { + public StreamingChangeEventSource getStreamingChangeEventSource() { return new PostgresStreamingChangeEventSource( configuration, snapshotter, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 5ab58f432..eac676e23 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; @@ -49,7 +50,7 @@ * * @author Horia Chiorean (hchiorea@redhat.com) */ -public class PostgresConnectorTask extends BaseSourceTask { +public class PostgresConnectorTask extends BaseSourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConnectorTask.class); private static final String CONTEXT_NAME = "postgres-connector-task"; @@ -61,7 +62,7 @@ public class PostgresConnectorTask extends BaseSourceTask private volatile PostgresSchema schema; @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config); final TopicSelector topicSelector = PostgresTopicSelector.create(connectorConfig); final Snapshotter snapshotter = connectorConfig.getSnapshotter(); @@ -93,8 +94,10 @@ public ChangeEventSourceCoordinator start(Configuration c schema = new PostgresSchema(connectorConfig, typeRegistry, topicSelector, valueConverterBuilder.build(typeRegistry)); this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicSelector); - final PostgresOffsetContext previousOffset = getPreviousOffset(new PostgresOffsetContext.Loader(connectorConfig)); + final Map previousOffsets = getPreviousOffsets( + new PostgresPartition.Provider(connectorConfig), new PostgresOffsetContext.Loader(connectorConfig)); final Clock clock = Clock.system(); + final PostgresOffsetContext previousOffset = getTheOnlyOffset(previousOffsets); LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { @@ -197,8 +200,8 @@ public ChangeEventSourceCoordinator start(Configuration c schemaNameAdjuster, jdbcConnection); - ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator( - previousOffset, + ChangeEventSourceCoordinator coordinator = new PostgresChangeEventSourceCoordinator( + previousOffsets, errorHandler, PostgresConnector.class, connectorConfig, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 5cb343b1e..87525cc58 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -195,11 +195,6 @@ public Loader(PostgresConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } - @Override - public Map getPartition() { - return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - } - private Long readOptionalLong(Map offset, String key) { final Object obj = offset.get(key); return (obj == null) ? null : ((Number) obj).longValue(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java new file mode 100644 index 000000000..edcd03532 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.debezium.connector.common.Partition; +import io.debezium.util.Collect; + +public class PostgresPartition implements Partition { + private static final String SERVER_PARTITION_KEY = "server"; + + private final String serverName; + + public PostgresPartition(String serverName) { + this.serverName = serverName; + } + + @Override + public Map getSourcePartition() { + return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final PostgresPartition other = (PostgresPartition) obj; + return Objects.equals(serverName, other.serverName); + } + + @Override + public int hashCode() { + return serverName.hashCode(); + } + + static class Provider implements Partition.Provider { + private final PostgresConnectorConfig connectorConfig; + + Provider(PostgresConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override + public Set getPartitions() { + return Collections.singleton(new PostgresPartition(connectorConfig.getLogicalName())); + } + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index cc6a91f7d..9fc2639e2 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -29,7 +29,7 @@ import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; -public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { +public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 940dd7636..ee6208cf5 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -35,7 +35,7 @@ * * @author Horia Chiorean (hchiorea@redhat.com), Jiri Pechanec */ -public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource { +public class PostgresStreamingChangeEventSource implements StreamingChangeEventSource { private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive"; @@ -86,7 +86,8 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi } @Override - public void execute(ChangeEventSourceContext context, PostgresOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext offsetContext) + throws InterruptedException { if (!snapshotter.shouldStream()) { LOGGER.info("Streaming is not enabled in correct configuration"); return; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java new file mode 100644 index 000000000..d96917719 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql; + +import io.debezium.connector.common.AbstractPartitionTest; + +public class PostgresPartitionTest extends AbstractPartitionTest { + + @Override + protected PostgresPartition createPartition1() { + return new PostgresPartition("server1"); + } + + @Override + protected PostgresPartition createPartition2() { + return new PostgresPartition("server2"); + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java index af3c35146..2400237d6 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceFactory.java @@ -20,7 +20,7 @@ import io.debezium.schema.DataCollectionId; import io.debezium.util.Clock; -public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory { +public class SqlServerChangeEventSourceFactory implements ChangeEventSourceFactory { private final SqlServerConnectorConfig configuration; private final SqlServerConnection dataConnection; @@ -42,13 +42,13 @@ public SqlServerChangeEventSourceFactory(SqlServerConnectorConfig configuration, } @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { + public SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener) { return new SqlServerSnapshotChangeEventSource(configuration, dataConnection, schema, dispatcher, clock, snapshotProgressListener); } @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { + public StreamingChangeEventSource getStreamingChangeEventSource() { return new SqlServerStreamingChangeEventSource( configuration, dataConnection, diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 40ce9cf38..994c43fe1 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -7,6 +7,7 @@ import java.sql.SQLException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.connect.errors.ConnectException; @@ -37,7 +38,7 @@ * @author Jiri Pechanec * */ -public class SqlServerConnectorTask extends BaseSourceTask { +public class SqlServerConnectorTask extends BaseSourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerConnectorTask.class); private static final String CONTEXT_NAME = "sql-server-connector-task"; @@ -55,7 +56,7 @@ public String version() { } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final Clock clock = Clock.system(); final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config); final TopicSelector topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig); @@ -85,7 +86,11 @@ public ChangeEventSourceCoordinator start(Configuration this.schema = new SqlServerDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster); this.schema.initializeStorage(); - final SqlServerOffsetContext previousOffset = getPreviousOffset(new SqlServerOffsetContext.Loader(connectorConfig)); + Map offsets = getPreviousOffsets( + new SqlServerPartition.Provider(connectorConfig), + new SqlServerOffsetContext.Loader(connectorConfig)); + SqlServerOffsetContext previousOffset = getTheOnlyOffset(offsets); + if (previousOffset != null) { schema.recover(previousOffset); } @@ -115,8 +120,8 @@ public ChangeEventSourceCoordinator start(Configuration metadataProvider, schemaNameAdjuster); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( - previousOffset, + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( + offsets, errorHandler, SqlServerConnector.class, connectorConfig, diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index 17fb9275b..184c7abba 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -148,11 +148,6 @@ public Loader(SqlServerConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } - @Override - public Map getPartition() { - return Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName()); - } - @Override public SqlServerOffsetContext load(Map offset) { final Lsn changeLsn = Lsn.valueOf((String) offset.get(SourceInfo.CHANGE_LSN_KEY)); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerPartition.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerPartition.java new file mode 100644 index 000000000..6a0c204b2 --- /dev/null +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerPartition.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import io.debezium.connector.common.Partition; +import io.debezium.util.Collect; + +public class SqlServerPartition implements Partition { + private static final String SERVER_PARTITION_KEY = "server"; + + private final String serverName; + + public SqlServerPartition(String serverName) { + this.serverName = serverName; + } + + @Override + public Map getSourcePartition() { + return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final SqlServerPartition other = (SqlServerPartition) obj; + return Objects.equals(serverName, other.serverName); + } + + @Override + public int hashCode() { + return serverName.hashCode(); + } + + static class Provider implements Partition.Provider { + private final SqlServerConnectorConfig connectorConfig; + + Provider(SqlServerConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + + @Override + public Set getPartitions() { + return Collections.singleton(new SqlServerPartition(connectorConfig.getLogicalName())); + } + } +} diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index a05afc807..68909cf9e 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -30,7 +30,7 @@ import io.debezium.schema.SchemaChangeEvent.SchemaChangeEventType; import io.debezium.util.Clock; -public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { +public class SqlServerSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerSnapshotChangeEventSource.class); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java index b2b7da551..392339e20 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.java @@ -56,7 +56,7 @@ * * @author Jiri Pechanec */ -public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource { +public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource { private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\."); @@ -107,7 +107,7 @@ public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig connectorCon } @Override - public void execute(ChangeEventSourceContext context, SqlServerOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, SqlServerPartition partition, SqlServerOffsetContext offsetContext) throws InterruptedException { if (connectorConfig.getSnapshotMode().equals(SnapshotMode.INITIAL_ONLY)) { LOGGER.info("Streaming is not enabled in current configuration"); return; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerPartitionTest.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerPartitionTest.java new file mode 100644 index 000000000..56007b5bb --- /dev/null +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerPartitionTest.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.sqlserver; + +import io.debezium.connector.common.AbstractPartitionTest; + +public class SqlServerPartitionTest extends AbstractPartitionTest { + + @Override + protected SqlServerPartition createPartition1() { + return new SqlServerPartition("server1"); + } + + @Override + protected SqlServerPartition createPartition2() { + return new SqlServerPartition("server2"); + } +} diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index af00af99f..5f39e9873 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -11,6 +11,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -40,7 +41,7 @@ * * @author Gunnar Morling */ -public abstract class BaseSourceTask extends SourceTask { +public abstract class BaseSourceTask

extends SourceTask { private static final Logger LOGGER = LoggerFactory.getLogger(BaseSourceTask.class); private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5); @@ -69,7 +70,7 @@ protected static enum State { * The change event source coordinator for those connectors adhering to the new * framework structure, {@code null} for legacy-style connectors. */ - private ChangeEventSourceCoordinator coordinator; + private ChangeEventSourceCoordinator coordinator; /** * The latest offset that has been acknowledged by the Kafka producer. Will be @@ -142,7 +143,7 @@ public final void start(Map props) { * the task configuration; implementations should wrap it in a dedicated implementation of * {@link CommonConnectorConfig} and work with typed access to configuration properties that way */ - protected abstract ChangeEventSourceCoordinator start(Configuration config); + protected abstract ChangeEventSourceCoordinator start(Configuration config); @Override public final List poll() throws InterruptedException { @@ -297,28 +298,58 @@ public void commit() throws InterruptedException { protected abstract Iterable getAllConfigurationFields(); /** - * Loads the connector's persistent offset (if present) via the given loader. + * Loads the connector's persistent offsets (if present) via the given loader. */ - protected O getPreviousOffset(OffsetContext.Loader loader) { - Map partition = loader.getPartition(); + protected Map getPreviousOffsets(Partition.Provider

provider, OffsetContext.Loader loader) { + Set

partitions = provider.getPartitions(); + OffsetReader> reader = new OffsetReader<>( + context.offsetStorageReader(), loader); + Map offsets = reader.offsets(partitions); - if (lastOffset != null) { - O offsetContext = loader.load(lastOffset); - LOGGER.info("Found previous offset after restart {}", offsetContext); - return offsetContext; + boolean found = false; + for (P partition : partitions) { + O offset = offsets.get(partition); + + if (offset != null) { + found = true; + LOGGER.info("Found previous partition offset {}: {}", partition, offset); + } } - Map previousOffset = context.offsetStorageReader() - .offsets(Collections.singleton(partition)) - .get(partition); + if (!found) { + LOGGER.info("No previous offsets found"); + } - if (previousOffset != null) { - O offsetContext = loader.load(previousOffset); - LOGGER.info("Found previous offset {}", offsetContext); - return offsetContext; + return offsets; + } + + /** + * Returns the offset of the only partition that the task is configured to use. + * + * This method is meant to be used only by the connectors that do not implement handling + * multiple partitions per task. + */ + protected P getTheOnlyPartition(Map offsets) { + if (offsets.size() != 1) { + throw new DebeziumException("The task must be configured to use exactly one partition, " + + offsets.size() + " found"); } - else { - return null; + + return offsets.entrySet().iterator().next().getKey(); + } + + /** + * Returns the offset of the only partition that the task is configured to use. + * + * This method is meant to be used only by the connectors that do not implement handling + * multiple partitions per task. + */ + protected O getTheOnlyOffset(Map offsets) { + if (offsets.size() != 1) { + throw new DebeziumException("The task must be configured to use exactly one partition, " + + offsets.size() + " found"); } + + return offsets.entrySet().iterator().next().getValue(); } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 6cfc13b2b..4740056b0 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,6 +22,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.base.ChangeEventQueueMetrics; import io.debezium.connector.common.CdcSourceTaskContext; +import io.debezium.connector.common.Partition; import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics; import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics; import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory; @@ -45,7 +47,7 @@ * @author Gunnar Morling */ @ThreadSafe -public class ChangeEventSourceCoordinator { +public class ChangeEventSourceCoordinator

{ private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventSourceCoordinator.class); @@ -54,26 +56,26 @@ public class ChangeEventSourceCoordinator { */ public static final Duration SHUTDOWN_WAIT_TIMEOUT = Duration.ofSeconds(90); - private final O previousOffset; + private final Map previousOffsets; private final ErrorHandler errorHandler; - private final ChangeEventSourceFactory changeEventSourceFactory; + private final ChangeEventSourceFactory changeEventSourceFactory; private final ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory; private final ExecutorService executor; private final EventDispatcher eventDispatcher; private final DatabaseSchema schema; private volatile boolean running; - private volatile StreamingChangeEventSource streamingSource; + private volatile StreamingChangeEventSource streamingSource; private final ReentrantLock commitOffsetLock = new ReentrantLock(); private SnapshotChangeEventSourceMetrics snapshotMetrics; private StreamingChangeEventSourceMetrics streamingMetrics; - public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler, Class connectorType, + public ChangeEventSourceCoordinator(Map previousOffsets, ErrorHandler errorHandler, Class connectorType, CommonConnectorConfig connectorConfig, - ChangeEventSourceFactory changeEventSourceFactory, + ChangeEventSourceFactory changeEventSourceFactory, ChangeEventSourceMetricsFactory changeEventSourceMetricsFactory, EventDispatcher eventDispatcher, DatabaseSchema schema) { - this.previousOffset = previousOffset; + this.previousOffsets = previousOffsets; this.errorHandler = errorHandler; this.changeEventSourceFactory = changeEventSourceFactory; this.changeEventSourceMetricsFactory = changeEventSourceMetricsFactory; @@ -84,6 +86,16 @@ public ChangeEventSourceCoordinator(O previousOffset, ErrorHandler errorHandler, public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) { + if (previousOffsets.size() != 1) { + throw new ConnectException("The coordinator must be provided with exactly one partition, " + + previousOffsets.size() + " found"); + } + + Map.Entry entry = previousOffsets.entrySet().iterator().next(); + + final P partition = entry.getKey(); + final O previousOffset = entry.getValue(); + AtomicReference previousLogContext = new AtomicReference<>(); try { this.snapshotMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics(taskContext, changeEventQueueMetrics, metadataProvider); @@ -101,8 +113,8 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu ChangeEventSourceContext context = new ChangeEventSourceContextImpl(); LOGGER.info("Context created"); - SnapshotChangeEventSource snapshotSource = changeEventSourceFactory.getSnapshotChangeEventSource(snapshotMetrics); - CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(previousOffset, context, snapshotSource); + SnapshotChangeEventSource snapshotSource = changeEventSourceFactory.getSnapshotChangeEventSource(snapshotMetrics); + CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(context, snapshotSource, partition, previousOffset); if (catchUpStreamingResult.performedCatchUpStreaming) { streamingConnected(false); commitOffsetLock.lock(); @@ -110,7 +122,7 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu commitOffsetLock.unlock(); } eventDispatcher.setEventListener(snapshotMetrics); - SnapshotResult snapshotResult = snapshotSource.execute(context, previousOffset); + SnapshotResult snapshotResult = snapshotSource.execute(context, partition, previousOffset); LOGGER.info("Snapshot ended with {}", snapshotResult); if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) { @@ -119,7 +131,7 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu if (running && snapshotResult.isCompletedOrSkipped()) { previousLogContext.set(taskContext.configureLoggingContext("streaming")); - streamEvents(snapshotResult.getOffset(), context); + streamEvents(context, partition, snapshotResult.getOffset()); } } catch (InterruptedException e) { @@ -141,13 +153,14 @@ public synchronized void start(CdcSourceTaskContext taskContext, ChangeEventQueu } } - protected CatchUpStreamingResult executeCatchUpStreaming(O previousOffset, ChangeEventSourceContext context, - SnapshotChangeEventSource snapshotSource) + protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContext context, + SnapshotChangeEventSource snapshotSource, + P partition, O previousOffset) throws InterruptedException { return new CatchUpStreamingResult(false); } - protected void streamEvents(O offsetContext, ChangeEventSourceContext context) throws InterruptedException { + protected void streamEvents(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException { streamingSource = changeEventSourceFactory.getStreamingChangeEventSource(); final Optional> incrementalSnapshotChangeEventSource = changeEventSourceFactory .getIncrementalSnapshotChangeEventSource(offsetContext, snapshotMetrics, snapshotMetrics); @@ -156,7 +169,7 @@ protected void streamEvents(O offsetContext, ChangeEventSourceContext context) t streamingConnected(true); LOGGER.info("Starting streaming"); incrementalSnapshotChangeEventSource.ifPresent(x -> x.init(offsetContext)); - streamingSource.execute(context, offsetContext); + streamingSource.execute(context, partition, offsetContext); LOGGER.info("Finished streaming"); } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java index fb03b794c..2dd792371 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java @@ -16,6 +16,7 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.ConfigurationDefaults; +import io.debezium.connector.common.Partition; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.OffsetContext; @@ -31,7 +32,7 @@ * * @author Chris Cranford */ -public abstract class AbstractSnapshotChangeEventSource implements SnapshotChangeEventSource { +public abstract class AbstractSnapshotChangeEventSource

implements SnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSnapshotChangeEventSource.class); @@ -44,7 +45,7 @@ public AbstractSnapshotChangeEventSource(CommonConnectorConfig connectorConfig, } @Override - public SnapshotResult execute(ChangeEventSourceContext context, O previousOffset) throws InterruptedException { + public SnapshotResult execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException { SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset); if (snapshottingTask.shouldSkipSnapshot()) { LOGGER.debug("Skipping snapshotting"); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeEventSourceFactory.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeEventSourceFactory.java index 30b7e7173..26d58c6b1 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeEventSourceFactory.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/ChangeEventSourceFactory.java @@ -7,6 +7,7 @@ import java.util.Optional; +import io.debezium.connector.common.Partition; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.schema.DataCollectionId; @@ -16,7 +17,7 @@ * * @author Gunnar Morling */ -public interface ChangeEventSourceFactory { +public interface ChangeEventSourceFactory

{ /** * Returns a snapshot change event source that may emit change events for schema and/or data changes. Depending on @@ -30,12 +31,12 @@ public interface ChangeEventSourceFactory { * * @return A snapshot change event source */ - SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener); + SnapshotChangeEventSource getSnapshotChangeEventSource(SnapshotProgressListener snapshotProgressListener); /** * Returns a streaming change event source that starts streaming at the given offset. */ - StreamingChangeEventSource getStreamingChangeEventSource(); + StreamingChangeEventSource getStreamingChangeEventSource(); /** * Returns and incremental snapshot change event source that can run in parallel with streaming diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java index 17edb70b6..dd8d6f22e 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java @@ -5,6 +5,7 @@ */ package io.debezium.pipeline.source.spi; +import io.debezium.connector.common.Partition; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.SnapshotResult; @@ -14,7 +15,7 @@ * * @author Gunnar Morling */ -public interface SnapshotChangeEventSource extends ChangeEventSource { +public interface SnapshotChangeEventSource

extends ChangeEventSource { /** * Executes this source. Implementations should regularly check via the given context if they should stop. If that's @@ -23,11 +24,13 @@ public interface SnapshotChangeEventSource extends Chan * * @param context * contextual information for this source's execution + * @param partition + * the source partition from which the snapshot should be taken * @param previousOffset * previous offset restored from Kafka * @return an indicator to the position at which the snapshot was taken * @throws InterruptedException * in case the snapshot was aborted before completion */ - SnapshotResult execute(ChangeEventSourceContext context, O previousOffset) throws InterruptedException; + SnapshotResult execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/StreamingChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/StreamingChangeEventSource.java index 1c9447dde..2d67641e8 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/StreamingChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/StreamingChangeEventSource.java @@ -7,6 +7,7 @@ import java.util.Map; +import io.debezium.connector.common.Partition; import io.debezium.pipeline.spi.OffsetContext; /** @@ -14,7 +15,7 @@ * * @author Gunnar Morling */ -public interface StreamingChangeEventSource extends ChangeEventSource { +public interface StreamingChangeEventSource

extends ChangeEventSource { /** * Executes this source. Implementations should regularly check via the given context if they should stop. If that's @@ -23,12 +24,14 @@ public interface StreamingChangeEventSource extends Cha * * @param context * contextual information for this source's execution + * @param partition + * the source partition from which the changes should be streamed * @param offsetContext * @return an indicator to the position at which the snapshot was taken * @throws InterruptedException * in case the snapshot was aborted before completion */ - void execute(ChangeEventSourceContext context, O offsetContext) throws InterruptedException; + void execute(ChangeEventSourceContext context, P partition, O offsetContext) throws InterruptedException; /** * Commits the given offset with the source database. Used by some connectors diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index c220788c6..d735f3367 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -30,8 +30,6 @@ public interface OffsetContext { * Implementations load a connector-specific offset context based on the offset values stored in Kafka. */ interface Loader { - Map getPartition(); - O load(Map offset); } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index a815f4f2f..5fada81c1 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.connector.common.Partition; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher.SnapshotReceiver; @@ -50,7 +51,7 @@ * * @author Gunnar Morling */ -public abstract class RelationalSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource { +public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);