DBZ-7260 Adjusted snapshot threads to reflect RS connection mode removal

This commit is contained in:
Jakub Cechacek 2023-12-14 14:48:42 +01:00
parent 1c413db228
commit 2aa072830a

View File

@ -6,7 +6,6 @@
package io.debezium.connector.mongodb;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -79,7 +78,6 @@ public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEven
private final Clock clock;
private final SnapshotProgressListener<MongoDbPartition> snapshotProgressListener;
private final ErrorHandler errorHandler;
private final AtomicBoolean aborted = new AtomicBoolean(false);
public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext,
MongoDbConnection.ChangeEventSourceConnectionFactory connections,
@ -288,88 +286,68 @@ private void createDataEvents(ChangeEventSourceContext sourceContext,
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(mongo.collections(), dataCollectionPattern)
.collect(Collectors.toList());
snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, collections);
if (connectorConfig.getSnapshotMaxThreads() > 1) {
// Since multiple snapshot threads are to be used, create a thread pool and initiate the snapshot.
// The current thread will wait until the snapshot threads either have completed or an error occurred.
final int numThreads = Math.min(collections.size(), connectorConfig.getSnapshotMaxThreads());
final Queue<CollectionId> collectionsToCopy = new ConcurrentLinkedQueue<>(collections);
final String snapshotThreadName = "snapshot-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main");
final ExecutorService snapshotThreads = Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.serverName(),
snapshotThreadName, connectorConfig.getSnapshotMaxThreads());
final CountDownLatch latch = new CountDownLatch(numThreads);
final AtomicBoolean aborted = new AtomicBoolean(false);
final AtomicInteger threadCounter = new AtomicInteger(0);
// Since multiple snapshot threads are to be used, create a thread pool and initiate the snapshot.
// The current thread will wait until the snapshot threads either have completed or an error occurred.
final int numThreads = Math.min(collections.size(), connectorConfig.getSnapshotMaxThreads());
final Queue<CollectionId> collectionsToCopy = new ConcurrentLinkedQueue<>(collections);
LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", numThreads, collections.size(),
Strings.join(", ", collections));
final String snapshotThreadName = "snapshot-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main");
for (int i = 0; i < numThreads; ++i) {
snapshotThreads.submit(() -> {
taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-snapshot" + threadCounter.incrementAndGet());
try {
CollectionId id = null;
while (!aborted.get() && (id = collectionsToCopy.poll()) != null) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
}
LOGGER.info("Creating snapshot worker pool with {} worker thread(s)", numThreads);
final ExecutorService executorService = Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.serverName(), snapshotThreadName,
connectorConfig.getSnapshotMaxThreads());
final CountDownLatch latch = new CountDownLatch(numThreads);
final AtomicBoolean aborted = new AtomicBoolean(false);
final AtomicInteger threadCounter = new AtomicInteger(0);
if (collectionsToCopy.isEmpty()) {
snapshotContext.lastCollection = true;
}
LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", numThreads, collections.size(),
Strings.join(", ", collections));
createDataEventsForCollection(
sourceContext,
snapshotContext,
snapshotReceiver,
id,
mongo, snapshottingTask.getFilterQueries());
for (int i = 0; i < numThreads; ++i) {
executorService.submit(() -> {
taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-snapshot" + threadCounter.incrementAndGet());
try {
CollectionId id = null;
while (!aborted.get() && (id = collectionsToCopy.poll()) != null) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
}
}
catch (Throwable t) {
LOGGER.error("Snapshot for replica set {} failed", replicaSet.replicaSetName(), t);
errorHandler.setProducerThrowable(t);
aborted.set(true);
}
finally {
latch.countDown();
}
});
}
// wait for all copy threads to finish
try {
latch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
aborted.set(true);
}
if (collectionsToCopy.isEmpty()) {
snapshotContext.lastCollection = true;
}
snapshotThreads.shutdown();
createDataEventsForCollection(
sourceContext,
snapshotContext,
snapshotReceiver,
id,
mongo, snapshottingTask.getFilterQueries());
}
}
catch (Throwable t) {
// Do nothing so that this thread is stopped
LOGGER.error("Snapshot failed", t);
errorHandler.setProducerThrowable(t);
aborted.set(true);
}
finally {
latch.countDown();
}
});
}
else {
// Only 1 thread should be used for snapshotting collections.
// In this use case since the replica-set snapshot is already in a separate thread, there is not
// a real reason to spawn additional threads but instead just run within the current thread.
for (Iterator<CollectionId> it = collections.iterator(); it.hasNext();) {
final CollectionId collectionId = it.next();
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
}
if (!it.hasNext()) {
snapshotContext.lastCollection = true;
}
createDataEventsForCollection(
sourceContext,
snapshotContext,
snapshotReceiver,
collectionId,
mongo, snapshottingTask.getFilterQueries());
}
// wait for all copy threads to finish
try {
latch.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
aborted.set(true);
}
finally {
executorService.shutdown();
}
offsetContext.stopReplicaSetSnapshot(replicaSet.replicaSetName());