DBZ-4339 Preferring secondaries to perform snapshot

This commit is contained in:
jcechace 2022-11-19 22:24:26 +01:00 committed by Jiri Pechanec
parent 44ae4697d7
commit 27cea0e5e6

View File

@ -27,12 +27,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
@ -181,17 +182,17 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) previousOffset;
try {
replicaSets.onEachReplicaSet(replicaSet -> {
MongoPrimary primary = null;
MongoPreferredNode mongo = null;
try {
primary = establishConnectionToPrimary(partition, replicaSet);
mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
if (primary != null && isSnapshotExpected(primary, rsOffsetContext)) {
if (mongo != null && isSnapshotExpected(mongo, rsOffsetContext)) {
replicaSetSnapshots.add(replicaSet);
}
}
finally {
if (primary != null) {
primary.stop();
if (mongo != null) {
mongo.stop();
}
}
});
@ -210,35 +211,35 @@ protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoD
}
private void snapshotReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext ctx, ReplicaSet replicaSet) throws InterruptedException {
MongoPrimary primaryClient = null;
MongoPreferredNode mongo = null;
try {
primaryClient = establishConnectionToPrimary(ctx.partition, replicaSet);
if (primaryClient != null) {
createDataEvents(sourceContext, ctx, replicaSet, primaryClient);
mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred());
if (mongo != null) {
createDataEvents(sourceContext, ctx, replicaSet, mongo);
}
}
finally {
if (primaryClient != null) {
primaryClient.stop();
if (mongo != null) {
mongo.stop();
}
}
}
private MongoPrimary establishConnectionToPrimary(MongoDbPartition partition, ReplicaSet replicaSet) {
return connectionContext.primaryFor(replicaSet, taskContext.filters(), (desc, error) -> {
private MongoPreferredNode establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> {
// propagate authorization failures
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
throw new ConnectException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchConnectorEvent(partition, new DisconnectEvent());
LOGGER.error("Error while attempting to {}: ", desc, error.getMessage(), error);
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
}
});
}
private boolean isSnapshotExpected(MongoPrimary primaryClient, ReplicaSetOffsetContext offsetContext) {
private boolean isSnapshotExpected(MongoPreferredNode mongo, ReplicaSetOffsetContext offsetContext) {
boolean performSnapshot = true;
if (offsetContext.hasOffset()) {
if (LOGGER.isInfoEnabled()) {
@ -255,8 +256,8 @@ private boolean isSnapshotExpected(MongoPrimary primaryClient, ReplicaSetOffsetC
// same options as other connectors and this is where when_needed functionality would go.
// There is no ongoing snapshot, so look to see if our last recorded offset still exists in the oplog.
BsonTimestamp lastRecordedTs = offsetContext.lastOffsetTimestamp();
BsonTimestamp firstAvailableTs = primaryClient.execute("get oplog position", primary -> {
return SourceInfo.extractEventTimestamp(MongoUtil.getOplogEntry(primary, 1, LOGGER));
BsonTimestamp firstAvailableTs = mongo.execute("get oplog position", client -> {
return SourceInfo.extractEventTimestamp(MongoUtil.getOplogEntry(client, 1, LOGGER));
});
if (firstAvailableTs == null) {
@ -289,16 +290,16 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
replicaSets.onEachReplicaSet(replicaSet -> {
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
MongoPrimary primaryClient = establishConnectionToPrimary(ctx.partition, replicaSet);
if (primaryClient != null) {
MongoPreferredNode mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred());
if (mongo != null) {
try {
primaryClient.execute("get oplog position", primary -> {
positions.put(replicaSet, MongoUtil.getOplogEntry(primary, -1, LOGGER));
mongo.execute("get oplog position", client -> {
positions.put(replicaSet, MongoUtil.getOplogEntry(client, -1, LOGGER));
});
}
finally {
LOGGER.info("Stopping primary client");
primaryClient.stop();
mongo.stop();
}
}
});
@ -308,12 +309,12 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
}
private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet,
MongoPrimary primaryClient)
MongoPreferredNode mongo)
throws InterruptedException {
SnapshotReceiver<MongoDbPartition> snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
snapshotContext.offset.preSnapshotStart();
createDataEventsForReplicaSet(sourceContext, snapshotContext, snapshotReceiver, replicaSet, primaryClient);
createDataEventsForReplicaSet(sourceContext, snapshotContext, snapshotReceiver, replicaSet, mongo);
snapshotContext.offset.preSnapshotCompletion();
snapshotReceiver.completeSnapshot();
@ -326,7 +327,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSna
private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, MongoPrimary primaryClient)
ReplicaSet replicaSet, MongoPreferredNode mongo)
throws InterruptedException {
final String rsName = replicaSet.replicaSetName();
@ -339,7 +340,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
LOGGER.info("Beginning snapshot of '{}' at {}", rsName, rsOffsetContext.getOffset());
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(primaryClient.collections()).collect(Collectors.toList());
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(mongo.collections()).collect(Collectors.toList());
snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, collections);
if (connectorConfig.getSnapshotMaxThreads() > 1) {
// Since multiple snapshot threads are to be used, create a thread pool and initiate the snapshot.
@ -377,7 +378,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
snapshotReceiver,
replicaSet,
id,
primaryClient);
mongo);
}
}
catch (InterruptedException e) {
@ -422,7 +423,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
snapshotReceiver,
replicaSet,
collectionId,
primaryClient);
mongo);
}
}
@ -432,14 +433,14 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
private void createDataEventsForCollection(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, CollectionId collectionId, MongoPrimary primaryClient)
ReplicaSet replicaSet, CollectionId collectionId, MongoPreferredNode mongo)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
LOGGER.info("\t Exporting data for collection '{}'", collectionId);
primaryClient.executeBlocking("sync '" + collectionId + "'", primary -> {
final MongoDatabase database = primary.getDatabase(collectionId.dbName());
mongo.executeBlocking("sync '" + collectionId + "'", client -> {
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
final int batchSize = taskContext.getConnectorConfig().getSnapshotFetchSize();