From 7ad4701253d337f5e606d0de921f18d350f713ee Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Sat, 16 Dec 2023 23:05:38 +0100 Subject: [PATCH] DBZ-7260 Further elimination of RS references, mainly from SourceInfo --- .../mongodb/MongoDbOffsetContext.java | 43 +++-- .../MongoDbSnapshotChangeEventSource.java | 9 +- .../MongoDbStreamingChangeEventSource.java | 2 +- .../connector/mongodb/MongoDbTaskContext.java | 8 +- .../connector/mongodb/SourceInfo.java | 172 ++++++------------ .../connector/mongodb/AbstractMongoIT.java | 13 +- .../connector/mongodb/SourceInfoTest.java | 49 ++--- 7 files changed, 108 insertions(+), 188 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index e1e3978b6..f4ea64a76 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -38,29 +38,28 @@ public class MongoDbOffsetContext extends CommonOffsetContext { private final TransactionContext transactionContext; private final IncrementalSnapshotContext incrementalSnapshotContext; - private final String replicaSetName; public MongoDbOffsetContext(MongoDbTaskContext taskContext, SourceInfo sourceInfo, TransactionContext transactionContext, IncrementalSnapshotContext incrementalSnapshotContext) { super(sourceInfo); - this.replicaSetName = ConnectionStrings.replicaSetName(taskContext.getConnectionString()); this.transactionContext = transactionContext; this.incrementalSnapshotContext = incrementalSnapshotContext; } - void startReplicaSetSnapshot() { - sourceInfo.startInitialSync(replicaSetName); + void startInitialSync() { + sourceInfo.startInitialSync(); } - void stopReplicaSetSnapshot() { - sourceInfo.stopInitialSync(replicaSetName); + void stopInitialSync() { + sourceInfo.stopInitialSync(); } @Override @SuppressWarnings("unchecked") public Map getOffset() { - Map offsets = (Map) sourceInfo.lastOffset(replicaSetName); - return isSnapshotOngoing() ? offsets : incrementalSnapshotContext.store(transactionContext.store(offsets)); + Map offsets = (Map) sourceInfo.lastOffset(); + + return isSnapshotRunning() ? offsets : incrementalSnapshotContext.store(transactionContext.store(offsets)); } @Override @@ -74,7 +73,7 @@ public boolean isSnapshotRunning() { } public boolean isSnapshotOngoing() { - return sourceInfo.isInitialSyncOngoing(replicaSetName); + return sourceInfo.isSnapshotRunning(); } @Override @@ -103,45 +102,45 @@ public void event(DataCollectionId collectionId, Instant timestamp) { } public void readEvent(CollectionId collectionId, Instant timestamp) { - sourceInfo.collectionEvent(replicaSetName, collectionId, 0L); - sourceInfo.lastOffset(replicaSetName); + sourceInfo.collectionEvent(collectionId, 0L); + sourceInfo.lastOffset(); } public void initEvent(MongoChangeStreamCursor> cursor) { - sourceInfo.initEvent(replicaSetName, cursor); + sourceInfo.initEvent(cursor); } public void initFromOpTimeIfNeeded(BsonTimestamp timestamp) { if (lastResumeToken() != null) { return; } - LOGGER.info("Initializing offset for replica-set {} from operation time", replicaSetName); - sourceInfo.noEvent(replicaSetName, timestamp); + LOGGER.info("Initializing offset from operation time"); + sourceInfo.noEvent(timestamp); } public void noEvent(BufferingChangeStreamCursor.ResumableChangeStreamEvent event) { - sourceInfo.noEvent(replicaSetName, event); + sourceInfo.noEvent(event); } public void changeStreamEvent(ChangeStreamDocument changeStreamEvent) { - sourceInfo.changeStreamEvent(replicaSetName, changeStreamEvent); + sourceInfo.changeStreamEvent(changeStreamEvent); } public String lastResumeToken() { - return sourceInfo.lastResumeToken(replicaSetName); + return sourceInfo.lastResumeToken(); } public BsonDocument lastResumeTokenDoc() { - final String data = sourceInfo.lastResumeToken(replicaSetName); + final String data = sourceInfo.lastResumeToken(); return (data == null) ? null : ResumeTokens.fromData(data); } public BsonTimestamp lastTimestamp() { - return sourceInfo.lastTimestamp(replicaSetName); + return sourceInfo.lastTimestamp(); } public boolean hasOffset() { - return sourceInfo.hasOffset(replicaSetName); + return sourceInfo.hasOffset(); } public static class Loader implements OffsetContext.Loader { @@ -151,14 +150,14 @@ public static class Loader implements OffsetContext.Loader private final String replicaSetName; public Loader(MongoDbTaskContext taskContext) { - this.sourceInfo = new SourceInfo(taskContext.getConnectorConfig()); this.replicaSetName = ConnectionStrings.replicaSetName(taskContext.getConnectionString()); + this.sourceInfo = new SourceInfo(taskContext.getConnectorConfig(), replicaSetName); this.taskContext = taskContext; } @Override public MongoDbOffsetContext load(Map offset) { - sourceInfo.setOffsetFor(replicaSetName, offset); + sourceInfo.setOffset(offset); return new MongoDbOffsetContext(taskContext, sourceInfo, new TransactionContext(), MongoDbIncrementalSnapshotContext.load(offset, false)); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 2688a2526..16380fa15 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -37,6 +37,7 @@ import io.debezium.DebeziumException; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.mongodb.connection.ConnectionStrings; import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter; import io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotContext; @@ -176,7 +177,7 @@ private boolean isSnapshotExpected(MongoDbPartition partition, MongoDbOffsetCont return true; } - if (offsetContext.isSnapshotOngoing()) { + if (offsetContext.isSnapshotRunning()) { // The latest snapshot was not completed, so restart it LOGGER.info("The previous snapshot was incomplete, so restarting the snapshot"); return true; @@ -217,7 +218,7 @@ private void initSnapshotStartOffsets(MongoDbSnapshotContext snapshotCtx) { LOGGER.info("Initializing empty Offset context"); snapshotCtx.offset = new MongoDbOffsetContext( taskContext, - new SourceInfo(connectorConfig), + new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString())), new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false)); } @@ -243,7 +244,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, SnapshottingTask snapshottingTask) throws InterruptedException { snapshotContext.lastCollection = false; - snapshotContext.offset.startReplicaSetSnapshot(); + snapshotContext.offset.startInitialSync(); LOGGER.info("Beginning snapshot at {}", snapshotContext.offset.getOffset()); @@ -314,7 +315,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, executorService.shutdown(); } - snapshotContext.offset.stopReplicaSetSnapshot(); + snapshotContext.offset.stopInitialSync(); } @Override diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 5bb702fae..ac2af46bb 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -217,7 +217,7 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf LOGGER.info("Initializing empty Offset context"); return new MongoDbOffsetContext( taskContext, - new SourceInfo(connectorConfig), + new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(taskContext.getConnectionString())), new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false)); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java index a6b670eef..fb4e53bbc 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbTaskContext.java @@ -14,6 +14,7 @@ import io.debezium.connector.common.CdcSourceTaskContext; import io.debezium.connector.mongodb.MongoDbConnectorConfig.CaptureMode; import io.debezium.connector.mongodb.connection.ConnectionContext; +import io.debezium.connector.mongodb.connection.ConnectionStrings; import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.spi.topic.TopicNamingStrategy; @@ -39,12 +40,12 @@ public MongoDbTaskContext(Configuration config) { new MongoDbConnectorConfig(config).getCustomMetricTags(), Collections::emptySet); + this.connectionContext = new ConnectionContext(config); this.filters = new Filters(config); this.connectorConfig = new MongoDbConnectorConfig(config); - this.source = new SourceInfo(connectorConfig); + this.source = new SourceInfo(connectorConfig, ConnectionStrings.replicaSetName(getConnectionString())); this.topicNamingStrategy = connectorConfig.getTopicNamingStrategy(MongoDbConnectorConfig.TOPIC_NAMING_STRATEGY); this.serverName = config.getString(CommonConnectorConfig.TOPIC_PREFIX); - this.connectionContext = new ConnectionContext(config); } public TopicNamingStrategy topicNamingStrategy() { @@ -83,7 +84,8 @@ public CaptureMode getCaptureMode() { } public ConnectionString getConnectionString() { - return connectorConfig.getTaskConnectionString().orElseThrow(); + return connectorConfig.getTaskConnectionString() + .orElse(connectionContext.connectionString()); } public MongoDbConnection connect(MongoDbConnection.ErrorHandler errorHandler) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java index 6a57e20a3..cdd97ca8c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java @@ -6,12 +6,8 @@ package io.debezium.connector.mongodb; import java.time.Instant; -import java.util.Collections; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; @@ -36,7 +32,7 @@ * finds a particular record. As the connector uses the Kafka Connect framework to process each record in a partition, Kafka * Connect keeps track of the most recent offset for that partition. *

- * The {@link #partition(String) source partition} information identifies the particular MongoDB replica set and the connector's + * The {@link #partition() source partition} information identifies the particular MongoDB replica set and the connector's * logical name of the MongoDB server. A JSON-like representation of the source partition for a database named "customers" hosted * in a MongoDB replica set named "myMongoServer" is as follows: * @@ -48,7 +44,7 @@ * * *

- * The {@link #lastOffset(String) source offset} information describes the position within a particular partition of each record. + * The {@link #lastOffset() source offset} information describes the position within a particular partition of each record. * Since each event in MongoDB's oplog is identified by a {@link BSONTimestamp} that tracks the time and the order of the * event for that particular time (e.g., multiple events that occur at the same time will have unique orders), the offset * includes the BSONTimetamp representation. (The event's {@code h} field is the unique ID for the operation, so this is also @@ -72,7 +68,6 @@ public final class SourceInfo extends BaseSourceInfo { private static final String RESUME_TOKEN = "resume_token"; - public static final String SERVER_ID_KEY = "server_id"; public static final String REPLICA_SET_NAME = "rs"; public static final String TIMESTAMP = "sec"; @@ -85,14 +80,10 @@ public final class SourceInfo extends BaseSourceInfo { public static final String WALL_TIME = "wallTime"; // Change Stream fields - private static final BsonTimestamp INITIAL_TIMESTAMP = new BsonTimestamp(); private static final Position INITIAL_POSITION = new Position(INITIAL_TIMESTAMP, null, null); - - private final ConcurrentMap> sourcePartitionsByReplicaSetName = new ConcurrentHashMap<>(); - private final ConcurrentMap positionsByReplicaSetName = new ConcurrentHashMap<>(); - private final Set initialSyncReplicaSets = Collections.newSetFromMap(new ConcurrentHashMap<>()); - + private Position rPosition = null; + public boolean initialSync = false; private String replicaSetName; /** @@ -159,8 +150,9 @@ static final class SessionTransactionId { } } - public SourceInfo(MongoDbConnectorConfig connectorConfig) { + public SourceInfo(MongoDbConnectorConfig connectorConfig, String replicaSetName) { super(connectorConfig); + this.replicaSetName = replicaSetName; } CollectionId collectionId() { @@ -171,35 +163,12 @@ Position position() { return position; } - /** - * @return server id - */ - public String serverId() { - return serverName(); + public String lastResumeToken() { + return position != null ? position.resumeToken : null; } - /** - * Get the Kafka Connect detail about the source "partition" for the given database in the replica set. If the database is - * not known, this method records the new partition. - * - * @param replicaSetName the name of the replica set name for which the partition is to be obtained; may not be null - * @return the source partition information; never null - */ - public Map partition(String replicaSetName) { - if (replicaSetName == null) { - throw new IllegalArgumentException("Replica set name may not be null"); - } - return sourcePartitionsByReplicaSetName.computeIfAbsent(replicaSetName, rsName -> Collect.hashMapOf(SERVER_ID_KEY, serverName(), REPLICA_SET_NAME, rsName)); - } - - public String lastResumeToken(String replicaSetName) { - Position existing = positionsByReplicaSetName.get(replicaSetName); - return existing != null ? existing.resumeToken : null; - } - - public BsonTimestamp lastTimestamp(String replicaSetName) { - Position existing = positionsByReplicaSetName.get(replicaSetName); - return existing != null ? existing.getTimestamp() : null; + public BsonTimestamp lastTimestamp() { + return position != null ? position.getTimestamp() : null; } /** @@ -207,29 +176,27 @@ public BsonTimestamp lastTimestamp(String replicaSetName) { * database where we have last read. If the database has not yet been seen, this records the starting position * for that database. However, if there is a position for the database, the offset representation is returned. * - * @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null * @return a copy of the current offset for the database; never null */ - public Map lastOffset(String replicaSetName) { - Position existing = positionsByReplicaSetName.get(replicaSetName); - if (existing == null) { - existing = INITIAL_POSITION; + public Map lastOffset() { + if (position == null) { + position = INITIAL_POSITION; } - if (isInitialSyncOngoing(replicaSetName)) { + if (isSnapshotRunning()) { Map offset = Collect.hashMapOf( - TIMESTAMP, existing.getTime(), - ORDER, existing.getInc(), + TIMESTAMP, position.getTime(), + ORDER, position.getInc(), INITIAL_SYNC, true); - return addSessionTxnIdToOffset(existing, offset); + return addSessionTxnIdToOffset(position, offset); } Map offset = Collect.hashMapOf( - TIMESTAMP, existing.getTime(), - ORDER, existing.getInc()); - existing.getResumeToken().ifPresent(resumeToken -> offset.put(RESUME_TOKEN, resumeToken)); + TIMESTAMP, position.getTime(), + ORDER, position.getInc()); + position.getResumeToken().ifPresent(resumeToken -> offset.put(RESUME_TOKEN, resumeToken)); - return addSessionTxnIdToOffset(existing, offset); + return addSessionTxnIdToOffset(position, offset); } private Map addSessionTxnIdToOffset(Position position, Map offset) { @@ -241,71 +208,68 @@ public BsonTimestamp lastTimestamp(String replicaSetName) { } /** - * Get a {@link Struct} representation of the source {@link #partition(String) partition} and {@link #lastOffset(String) + * Get a {@link Struct} representation of the source {@link #partition() partition} and {@link #lastOffset() * offset} information where we have last read. The Struct complies with the {@link #schema} for the MongoDB connector. * - * @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null * @param collectionId the event's collection identifier; may not be null * @return the source partition and offset {@link Struct}; never null * @see #schema() */ - public void collectionEvent(String replicaSetName, CollectionId collectionId, long wallTime) { - onEvent(replicaSetName, collectionId, positionsByReplicaSetName.get(replicaSetName), wallTime); + public void collectionEvent(CollectionId collectionId, long wallTime) { + onEvent(collectionId, rPosition, wallTime); } - public void initEvent(String replicaSetName, MongoChangeStreamCursor> cursor) { + public void initEvent(MongoChangeStreamCursor> cursor) { if (cursor == null) { return; } ChangeStreamDocument result = cursor.tryNext(); if (result == null) { - noEvent(replicaSetName, cursor); + noEvent(cursor); } else { - changeStreamEvent(replicaSetName, result); + changeStreamEvent(result); } } - public void noEvent(String replicaSetName, ResumableChangeStreamEvent event) { + public void noEvent(ResumableChangeStreamEvent event) { if (event.resumeToken == null || event.hasDocument()) { return; } - noEvent(replicaSetName, ResumeTokens.getDataString(event.resumeToken)); + noEvent(ResumeTokens.getDataString(event.resumeToken)); } - public void noEvent(String replicaSetName, MongoChangeStreamCursor cursor) { + public void noEvent(MongoChangeStreamCursor cursor) { if (cursor == null || cursor.getResumeToken() == null) { return; } - noEvent(replicaSetName, ResumeTokens.getDataString(cursor.getResumeToken())); + noEvent(ResumeTokens.getDataString(cursor.getResumeToken())); } - public void noEvent(String replicaSetName, BsonTimestamp timestamp) { + public void noEvent(BsonTimestamp timestamp) { if (timestamp == null) { return; } Position position = Position.changeStreamPosition(timestamp, null, null); - noEvent(replicaSetName, position); + noEvent(position); } - private void noEvent(String replicaSetName, String resumeToken) { + private void noEvent(String resumeToken) { if (resumeToken == null) { return; } Position position = Position.changeStreamPosition(null, resumeToken, null); - noEvent(replicaSetName, position); + noEvent(position); } - private void noEvent(String replicaSetName, Position position) { + private void noEvent(Position position) { String namespace = ""; long wallTime = 0L; - positionsByReplicaSetName.put(replicaSetName, position); - - onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime); + onEvent(CollectionId.parse(replicaSetName, namespace), position, wallTime); } - public void changeStreamEvent(String replicaSetName, ChangeStreamDocument changeStreamEvent) { + public void changeStreamEvent(ChangeStreamDocument changeStreamEvent) { Position position = INITIAL_POSITION; String namespace = ""; long wallTime = 0L; @@ -318,13 +282,11 @@ public void changeStreamEvent(String replicaSetName, ChangeStreamDocument sourceOffset) { - if (replicaSetName == null) { - throw new IllegalArgumentException("The replica set name may not be null"); - } + public boolean setOffset(Map sourceOffset) { if (sourceOffset == null) { return false; } @@ -371,60 +328,40 @@ public boolean setOffsetFor(String replicaSetName, Map sourceOffset) changeStreamTxnId = new SessionTransactionId(changeStreamLsid, changeStreamTxnNumber); } String resumeToken = stringOffsetValue(sourceOffset, RESUME_TOKEN); - Position position = new Position(new BsonTimestamp(time, order), changeStreamTxnId, resumeToken); - positionsByReplicaSetName.put(replicaSetName, position); + rPosition = new Position(new BsonTimestamp(time, order), changeStreamTxnId, resumeToken); return true; } - /** - * Set the source offset, as read from Kafka Connect, for the given replica set. This method does nothing if the supplied map - * is null. - * - * @param partition the partition information; may not be null - * @param sourceOffset the previously-recorded Kafka Connect source offset; may be null - * @return {@code true} if the offset was recorded, or {@code false} if the source offset is null - * @throws DebeziumException if any offset parameter values are missing, invalid, or of the wrong type - */ - public boolean setOffsetFor(Map partition, Map sourceOffset) { - String replicaSetName = partition.get(REPLICA_SET_NAME); - return setOffsetFor(replicaSetName, sourceOffset); - } - /** * Record that an initial sync has started for the given replica set. - * - * @param replicaSetName the name of the replica set; never null */ - public void startInitialSync(String replicaSetName) { - initialSyncReplicaSets.add(replicaSetName); + public void startInitialSync() { + this.initialSync = true; } /** * Record that an initial sync has stopped for the given replica set. - * - * @param replicaSetName the name of the replica set; never null */ - public void stopInitialSync(String replicaSetName) { - initialSyncReplicaSets.remove(replicaSetName); + public void stopInitialSync() { + this.initialSync = false; } /** * Determine if the initial sync for the given replica set is still ongoing. * - * @param replicaSetName the name of the replica set; never null * @return {@code true} if the initial sync for this replica is still ongoing or was not completed before restarting, or * {@code false} if there is currently no initial sync operation for this replica set */ - public boolean isInitialSyncOngoing(String replicaSetName) { - return initialSyncReplicaSets.contains(replicaSetName); + public boolean isInitialSyncOngoing() { + return initialSync; } /** * Returns whether any replica sets are still running a snapshot. */ public boolean isSnapshotRunning() { - return !initialSyncReplicaSets.isEmpty(); + return initialSync; } private static int intOffsetValue(Map values, String key) { @@ -482,7 +419,7 @@ protected Instant timestamp() { @Override public SnapshotRecord snapshot() { - return isInitialSyncOngoing(replicaSetName) ? SnapshotRecord.TRUE + return isSnapshotRunning() ? SnapshotRecord.TRUE : snapshotRecord == SnapshotRecord.INCREMENTAL ? SnapshotRecord.INCREMENTAL : SnapshotRecord.FALSE; } @@ -501,9 +438,6 @@ long wallTime() { @Override public String toString() { - return "SourceInfo [sourcePartitionsByReplicaSetName=" + sourcePartitionsByReplicaSetName - + ", positionsByReplicaSetName=" + positionsByReplicaSetName + ", initialSyncReplicaSets=" - + initialSyncReplicaSets + ", replicaSetName=" + replicaSetName + ", collectionId=" + collectionId - + ", position=" + position + "]"; + return "SourceInfo [initialSyncReplicaSets=" + initialSync + ", collectionId=" + collectionId + ", position=" + position + "]"; } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java index 9b7edb125..53cf1cba4 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/AbstractMongoIT.java @@ -91,24 +91,23 @@ protected void reuseConfiguration(Configuration config) { */ private void initialize(boolean restartFromBeginning) { // Record the partition offsets (if there are some) ... - Map partition = null; Map offsetForPartition = null; var rsName = ConnectionStrings.replicaSetName(mongo.getConnectionString()); - if (!restartFromBeginning && context != null && mongo != null && context.source().hasOffset(rsName)) { - partition = context.source().partition(rsName); - offsetForPartition = context.source().lastOffset(rsName); + if (!restartFromBeginning && context != null && mongo != null && context.source().hasOffset()) { + offsetForPartition = context.source().lastOffset(); } context = new MongoDbTaskContext(config); assertThat(context.getConnectionContext().connectionSeed()).isNotEmpty(); // Restore Source position (if there are some) ... - if (partition != null) { - context.source().setOffsetFor(partition, offsetForPartition); + if (offsetForPartition != null) { + context.source().setOffset(offsetForPartition); } // Get a connection to the primary ... - connection = context.getConnectionContext().connect(new ConnectionString(mongo.getConnectionString()), context.filters(), TestHelper.connectionErrorHandler(3)); + var connectionString = new ConnectionString(TestHelper.connectionString(mongo)); + connection = context.getConnectionContext().connect(connectionString, context.filters(), TestHelper.connectionErrorHandler(3)); } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java index 0262adeff..ad48fb39a 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/SourceInfoTest.java @@ -63,7 +63,7 @@ private SourceInfo createSourceInfo() { .with(CommonConnectorConfig.TOPIC_PREFIX, "serverX") .build()); - return new SourceInfo(config); + return new SourceInfo(config, REPLICA_SET_NAME); } @SuppressWarnings("unchecked") @@ -107,8 +107,8 @@ public void assertSourceInfoContents(SourceInfo source, BsonTimestamp timestamp, String snapshot) { if (cursor != null) { - assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false); - source.initEvent(REPLICA_SET_NAME, cursor); + assertThat(source.hasOffset()).isEqualTo(false); + source.initEvent(cursor); } assertSourceInfoContents(source, cursor != null, resumeTokenData, timestamp, snapshot); @@ -119,16 +119,16 @@ public void assertSourceInfoContents(SourceInfo source, String resumeTokenData, BsonTimestamp timestamp, String snapshot) { - assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(hasOffset); + assertThat(source.hasOffset()).isEqualTo(hasOffset); - Map offset = source.lastOffset(REPLICA_SET_NAME); + Map offset = source.lastOffset(); assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo((timestamp != null) ? timestamp.getTime() : 0); assertThat(offset.get(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1); - String resumeToken = source.lastResumeToken(REPLICA_SET_NAME); + String resumeToken = source.lastResumeToken(); assertThat(resumeToken).isEqualTo(resumeTokenData); - source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L); + source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L); Struct struct = source.struct(); assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo((timestamp != null) ? timestamp.getTime() * 1000L : 0L); assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1); @@ -145,17 +145,16 @@ public void shouldSetAndReturnRecordedOffset() { assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null); // Create a new source info and set the offset ... - Map offset = source.lastOffset(REPLICA_SET_NAME); - Map partition = source.partition(REPLICA_SET_NAME); + Map offset = source.lastOffset(); SourceInfo newSource = createSourceInfo(); - newSource.setOffsetFor(partition, offset); + newSource.setOffset(offset); assertSourceInfoContents(newSource, true, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, null); } @Test public void shouldReturnOffsetForUnusedReplicaName() { - assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(false); + assertThat(source.hasOffset()).isEqualTo(false); assertSourceInfoContents(source, false, null, new BsonTimestamp(0), null); } @@ -173,13 +172,13 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameWithoutEvent() { @Test public void shouldReturnOffsetForUnusedReplicaNameDuringInitialSync() { - source.startInitialSync(REPLICA_SET_NAME); + source.startInitialSync(); assertSourceInfoContents(source, false, null, new BsonTimestamp(0), "true"); } @Test public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() { - source.startInitialSync(REPLICA_SET_NAME); + source.startInitialSync(); var cursor = mockEventChangeStreamCursor(); assertSourceInfoContents(source, cursor, CHANGE_RESUME_TOKEN_DATA, CHANGE_TIMESTAMP, "true"); @@ -187,46 +186,32 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSync() { @Test public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSyncWithoutEvent() { - source.startInitialSync(REPLICA_SET_NAME); + source.startInitialSync(); var cursor = mockNoEventChangeStreamCursor(); assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, "true"); } - @Test - public void shouldProducePartitionMap() { - partition = source.partition(REPLICA_SET_NAME); - assertThat(partition.get(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME); - assertThat(partition.get(SourceInfo.SERVER_ID_KEY)).isEqualTo("serverX"); - assertThat(partition.size()).isEqualTo(2); - } - - @Test - public void shouldReturnSamePartitionMapForSameReplicaName() { - partition = source.partition(REPLICA_SET_NAME); - assertThat(partition).isSameAs(source.partition(REPLICA_SET_NAME)); - } - @Test public void versionIsPresent() { var cursor = mockEventChangeStreamCursor(); - source.initEvent(REPLICA_SET_NAME, cursor); + source.initEvent(cursor); assertThat(source.struct().getString(SourceInfo.DEBEZIUM_VERSION_KEY)).isEqualTo(Module.version()); } @Test public void connectorIsPresent() { var cursor = mockEventChangeStreamCursor(); - source.initEvent(REPLICA_SET_NAME, cursor); + source.initEvent(cursor); assertThat(source.struct().getString(SourceInfo.DEBEZIUM_CONNECTOR_KEY)).isEqualTo(Module.name()); } @Test public void wallTimeIsPresent() { var cursor = mockEventChangeStreamCursor(); - source.initEvent(REPLICA_SET_NAME, cursor); + source.initEvent(cursor); assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isNull(); - source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 10L); + source.collectionEvent(new CollectionId(REPLICA_SET_NAME, "test", "names"), 10L); assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isEqualTo(10L); }