DBZ-4339 MongoDbIncrementalSnapshotChangeEventSource uses MongoPreferredNode

This commit is contained in:
jcechace 2022-11-21 11:01:29 +01:00 committed by Jiri Pechanec
parent 27cea0e5e6
commit 3966a4099a

View File

@ -20,12 +20,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.CloseIncrementalSnapshotWindow;
@ -69,7 +70,7 @@ public class MongoDbIncrementalSnapshotChangeEventSource
protected EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
protected IncrementalSnapshotContext<CollectionId> context = null;
protected final Map<Struct, Object[]> window = new LinkedHashMap<>();
private MongoPrimary primary;
private MongoPreferredNode mongo;
private CollectionId signallingCollectionId;
public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config,
@ -175,10 +176,10 @@ protected void deduplicateWindow(DataCollectionId dataCollectionId, Object key)
protected void emitWindowOpen() throws InterruptedException {
final CollectionId collectionId = signallingCollectionId;
final String id = context.currentChunkId() + "-open";
primary.executeBlocking(
mongo.executeBlocking(
"emit window open for chunk '" + context.currentChunkId() + "'",
primary -> {
final MongoDatabase database = primary.getDatabase(collectionId.dbName());
client -> {
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
LOGGER.trace("Emitting open window for chunk = '{}'", context.currentChunkId());
@ -196,10 +197,10 @@ protected void emitWindowOpen() throws InterruptedException {
protected void emitWindowClose() throws InterruptedException {
final CollectionId collectionId = signallingCollectionId;
final String id = context.currentChunkId() + "-close";
primary.executeBlocking(
mongo.executeBlocking(
"emit window close for chunk '" + context.currentChunkId() + "'",
primary -> {
final MongoDatabase database = primary.getDatabase(collectionId.dbName());
client -> {
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
LOGGER.trace("Emitting close window for chunk = '{}'", context.currentChunkId());
@ -214,7 +215,7 @@ protected void emitWindowClose() throws InterruptedException {
@Override
@SuppressWarnings("unchecked")
public void init(MongoDbPartition partition, OffsetContext offsetContext) {
primary = establishConnectionToPrimary(partition, replicaSets.all().get(0));
mongo = establishConnection(partition, ReadPreference.primary(), replicaSets.all().get(0));
if (offsetContext == null) {
LOGGER.info("Empty incremental snapshot change event source started, no action needed");
postIncrementalSnapshotCompleted();
@ -315,8 +316,8 @@ private void nextDataCollection(MongoDbPartition partition) {
private Object[] readMaximumKey() throws InterruptedException {
final CollectionId collectionId = (CollectionId) currentCollection.id();
final AtomicReference<Object> key = new AtomicReference<>();
primary.executeBlocking("maximum key for '" + collectionId + "'", primary -> {
final MongoDatabase database = primary.getDatabase(collectionId.dbName());
mongo.executeBlocking("maximum key for '" + collectionId + "'", client -> {
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
final Document lastDocument = collection.find().sort(new Document(DOCUMENT_ID, -1)).limit(1).first();
@ -410,7 +411,7 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
long exportStart = clock.currentTimeInMillis();
LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount());
primary.executeBlocking("chunk query key for '" + currentCollection.id() + "'", primary -> {
mongo.executeBlocking("chunk query key for '" + currentCollection.id() + "'", primary -> {
final MongoDatabase database = primary.getDatabase(collectionId.dbName());
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
@ -530,8 +531,8 @@ public void processMessage(MongoDbPartition partition, DataCollectionId dataColl
}
}
private MongoPrimary establishConnectionToPrimary(MongoDbPartition partition, ReplicaSet replicaSet) {
return connectionContext.primaryFor(replicaSet, taskContext.filters(), (desc, error) -> {
private MongoPreferredNode establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) {
return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> {
// propagate authorization failures
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
throw new ConnectException("Error while attempting to " + desc, error);