diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index 0e246971b..b0de68708 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -20,12 +20,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.mongodb.ReadPreference; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; -import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; +import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow; @@ -69,7 +70,7 @@ public class MongoDbIncrementalSnapshotChangeEventSource protected EventDispatcher dispatcher; protected IncrementalSnapshotContext context = null; protected final Map window = new LinkedHashMap<>(); - private MongoPrimary primary; + private MongoPreferredNode mongo; private CollectionId signallingCollectionId; public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config, @@ -175,10 +176,10 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key) protected void emitWindowOpen() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-open"; - primary.executeBlocking( + mongo.executeBlocking( "emit window open for chunk '" + context.currentChunkId() + "'", - primary -> { - final MongoDatabase database = primary.getDatabase(collectionId.dbName()); + client -> { + final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name()); LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId()); @@ -196,10 +197,10 @@ protected void emitWindowOpen() throws InterruptedException { protected void emitWindowClose() throws InterruptedException { final CollectionId collectionId = signallingCollectionId; final String id = context.currentChunkId() + "-close"; - primary.executeBlocking( + mongo.executeBlocking( "emit window close for chunk '" + context.currentChunkId() + "'", - primary -> { - final MongoDatabase database = primary.getDatabase(collectionId.dbName()); + client -> { + final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name()); LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId()); @@ -214,7 +215,7 @@ protected void emitWindowClose() throws InterruptedException { @Override @SuppressWarnings("unchecked") public void init(MongoDbPartition partition, OffsetContext offsetContext) { - primary = establishConnectionToPrimary(partition, replicaSets.all().get(0)); + mongo = establishConnection(partition, ReadPreference.primary(), replicaSets.all().get(0)); if (offsetContext == null) { LOGGER.info("Empty incremental snapshot change event source started, no action needed"); postIncrementalSnapshotCompleted(); @@ -315,8 +316,8 @@ private void nextDataCollection(MongoDbPartition partition) { private Object[] readMaximumKey() throws InterruptedException { final CollectionId collectionId = (CollectionId) currentCollection.id(); final AtomicReference key = new AtomicReference<>(); - primary.executeBlocking("maximum key for '" + collectionId + "'", primary -> { - final MongoDatabase database = primary.getDatabase(collectionId.dbName()); + mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> { + final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name()); final Document lastDocument = collection.find().sort(new Document(DOCUMENT_ID, -1)).limit(1).first(); @@ -410,7 +411,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw long exportStart = clock.currentTimeInMillis(); LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount()); - primary.executeBlocking("chunk query key for '" + currentCollection.id() + "'", primary -> { + mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", primary -> { final MongoDatabase database = primary.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class); @@ -530,8 +531,8 @@ public void processMessage(MongoDbPartition partition, DataCollectionId dataColl } } - private MongoPrimary establishConnectionToPrimary(MongoDbPartition partition, ReplicaSet replicaSet) { - return connectionContext.primaryFor(replicaSet, taskContext.filters(), (desc, error) -> { + private MongoPreferredNode establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) { + return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> { // propagate authorization failures if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) { throw new ConnectException("Error while attempting to " + desc, error);