DBZ-6518 use multipe threads to query chunks

This commit is contained in:
Yue Wang 2023-09-12 10:30:38 -07:00 committed by Jiri Pechanec
parent 0d920b8ba2
commit 199bf775ed
4 changed files with 183 additions and 33 deletions

View File

@ -516,6 +516,7 @@ Yoann Rodière
Yohei Yoshimuta Yohei Yoshimuta
Yossi Shirizli Yossi Shirizli
Yuan Zhang Yuan Zhang
Yue Wang
Yuiham Chan Yuiham Chan
Zheng Wang Zheng Wang
Zongwen Li Zongwen Li

View File

@ -10,10 +10,14 @@
import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.SUCCEEDED; import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.SUCCEEDED;
import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.UNKNOWN_SCHEMA; import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.UNKNOWN_SCHEMA;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -25,6 +29,7 @@
import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Projections;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
@ -74,13 +79,15 @@ public class MongoDbIncrementalSnapshotChangeEventSource
protected EventDispatcher<MongoDbPartition, CollectionId> dispatcher; protected EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
protected IncrementalSnapshotContext<CollectionId> context = null; protected IncrementalSnapshotContext<CollectionId> context = null;
protected final Map<Struct, Object[]> window = new LinkedHashMap<>(); protected final Map<Struct, Object[]> window = new ConcurrentHashMap<>();
private MongoDbConnection mongo; private MongoDbConnection mongo;
private final CollectionId signallingCollectionId; private final CollectionId signallingCollectionId;
protected final NotificationService<MongoDbPartition, ? extends OffsetContext> notificationService; protected final NotificationService<MongoDbPartition, ? extends OffsetContext> notificationService;
private final ExecutorService incrementalSnapshotThreadPool;
public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config, public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config,
MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets, MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets,
EventDispatcher<MongoDbPartition, CollectionId> dispatcher, EventDispatcher<MongoDbPartition, CollectionId> dispatcher,
@ -100,6 +107,8 @@ public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config
this.signallingCollectionId = connectorConfig.getSignalingDataCollectionId() == null ? null this.signallingCollectionId = connectorConfig.getSignalingDataCollectionId() == null ? null
: CollectionId.parse("UNUSED", connectorConfig.getSignalingDataCollectionId()); : CollectionId.parse("UNUSED", connectorConfig.getSignalingDataCollectionId());
this.notificationService = notificationService; this.notificationService = notificationService;
this.incrementalSnapshotThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, config.getConnectorName(),
"incremental-snapshot-" + replicaSets.all().get(0).replicaSetName(), connectorConfig.getSnapshotMaxThreads());
} }
@Override @Override
@ -512,52 +521,49 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount()); LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount());
mongo.execute("chunk query key for '" + currentCollection.id() + "'", client -> { mongo.execute("chunk query key for '" + currentCollection.id() + "'", client -> {
final int threads = connectorConfig.getSnapshotMaxThreads();
final int chunkSize = connectorConfig.getIncrementalSnapshotChunkSize();
final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class); final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
final Document maxKeyPredicate = new Document(); Document predicate = constructQueryPredicate(context.chunkEndPosititon(), context.maximumKey().get());
final Document maxKeyOp = new Document(); LOGGER.debug("\t For collection '{}' using query: '{}', key: '{}', maximum key: '{}' to get all _id fields",
maxKeyOp.put("$lte", context.maximumKey().get()[0]); currentCollection.id(), predicate.toJson(), context.chunkEndPosititon(), context.maximumKey().get());
maxKeyPredicate.put(DOCUMENT_ID, maxKeyOp);
Document predicate = maxKeyPredicate;
if (context.chunkEndPosititon() != null) {
final Document chunkEndPredicate = new Document();
final Document chunkEndOp = new Document();
chunkEndOp.put("$gt", context.chunkEndPosititon()[0]);
chunkEndPredicate.put(DOCUMENT_ID, chunkEndOp);
predicate = new Document();
predicate.put("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate));
}
LOGGER.debug("\t For collection '{}' using query: '{}', key: '{}', maximum key: '{}'", currentCollection.id(),
predicate.toJson(), context.chunkEndPosititon(), context.maximumKey().get());
long rows = 0; long rows = 0;
Timer logTimer = getTableScanLogTimer();
Object[] lastRow = null; Object[] lastRow = null;
Object[] firstRow = null; Object[] firstRow = null;
List<Future<?>> futureList = new ArrayList<>();
Object[] lastChunkKey = context.chunkEndPosititon();
for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1)) for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1)).projection(Projections.include(DOCUMENT_ID))
.limit(connectorConfig.getIncrementalSnapshotChunkSize())) { .limit(chunkSize * threads)) {
rows++; rows++;
final Object[] row = new Object[]{ doc }; final Object[] row = new Object[]{ doc };
if (firstRow == null) { if (firstRow == null) {
firstRow = row; firstRow = row;
} }
final Struct keyStruct = currentCollection.keyFromDocumentSnapshot(doc);
window.put(keyStruct, row);
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOGGER.debug("\t Exported {} records for collection '{}' after {}", rows, currentCollection.id(),
Strings.duration(stop - exportStart));
logTimer = getTableScanLogTimer();
}
lastRow = row; lastRow = row;
if (rows % chunkSize == 0) {
lastChunkKey = addChunkToExecutor(collection, lastRow, futureList, lastChunkKey);
}
} }
// in case the last iteration doesn't have enough data, do it once again for the rest of the rows
if (rows % chunkSize != 0) {
addChunkToExecutor(collection, lastRow, futureList, lastChunkKey);
}
try {
for (Future<?> future : futureList) {
future.get(); // Wait for the tasks to complete
}
}
catch (ExecutionException e) {
throw new DebeziumException("Error while processing chunk", e);
}
final Object[] firstKey = keyFromRow(firstRow); final Object[] firstKey = keyFromRow(firstRow);
final Object[] lastKey = keyFromRow(lastRow); final Object[] lastKey = keyFromRow(lastRow);
if (context.isNonInitialChunk()) { if (context.isNonInitialChunk()) {
@ -577,6 +583,62 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
}); });
} }
protected Object[] addChunkToExecutor(final MongoCollection<BsonDocument> collection, Object[] lastRow,
List<Future<?>> futureList, Object[] lastChunkKey) {
final Object[] chunkStartKey = lastChunkKey;
final Object[] chunkEndKey = keyFromRow(lastRow);
futureList.add(this.incrementalSnapshotThreadPool.submit(() -> {
queryChunk(collection, chunkStartKey, chunkEndKey);
}));
return chunkEndKey;
}
private void queryChunk(MongoCollection<BsonDocument> collection, Object[] startKey, Object[] endKey) {
Document predicate = constructQueryPredicate(startKey, endKey);
LOGGER.debug("\t For collection chunk, '{}' using query: '{}', key: '{}', maximum key: '{}'", currentCollection.id(),
predicate.toJson(), startKey, endKey);
long rows = 0;
long exportStart = clock.currentTimeInMillis();
Timer logTimer = getTableScanLogTimer();
for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1))) {
rows++;
final Object[] row = new Object[]{ doc };
final Struct keyStruct = currentCollection.keyFromDocumentSnapshot(doc);
window.put(keyStruct, row);
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
LOGGER.debug("\t Exported {} records for collection '{}' after {}", rows, currentCollection.id(),
Strings.duration(stop - exportStart));
logTimer = getTableScanLogTimer();
}
}
}
private Document constructQueryPredicate(Object[] startKey, Object[] endKey) {
final Document maxKeyPredicate = new Document();
final Document maxKeyOp = new Document();
if (endKey != null) {
maxKeyOp.put("$lte", endKey[0]);
maxKeyPredicate.put(DOCUMENT_ID, maxKeyOp);
}
Document predicate = maxKeyPredicate;
if (startKey != null) {
final Document chunkEndPredicate = new Document();
final Document chunkEndOp = new Document();
chunkEndOp.put("$gt", startKey[0]);
chunkEndPredicate.put(DOCUMENT_ID, chunkEndOp);
predicate = new Document();
predicate.put("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate));
}
return predicate;
}
private void incrementTableRowsScanned(long rows) { private void incrementTableRowsScanned(long rows) {
totalRowsScanned += rows; totalRowsScanned += rows;
// TODO This metric is not provided by MongoDB // TODO This metric is not provided by MongoDB

View File

@ -0,0 +1,86 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.junit.Test;
import io.debezium.config.Configuration;
/**
* Test to verify multi-thread incremental snapshotting for MongoDB.
*
* @author Yue Wang
*/
public class MultiThreadIncrementalSnapshotIT extends IncrementalSnapshotIT {
protected static final int ROW_COUNT = 1_000;
private static final int INCREMENTAL_SNAPSHOT_THREADS = 7; // use a prime number in tests to cover the cases where the last chunk is less than the chunk size.
@Override
protected Configuration.Builder config() {
Configuration.Builder builder = super.config();
return builder.with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, INCREMENTAL_SNAPSHOT_THREADS);
}
@Test
public void multiThreadingSnapshot() throws Exception {
// Testing.Print.enable();
populateDataCollection();
startConnector();
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
for (int i = 0; i < expectedRecordCount; i++) {
assertThat(dbChanges).contains(entry(i + 1, i));
}
}
@Test
public void multiThreadSnapshotWithRestart() throws Exception {
// Testing.Print.enable();
populateDataCollection();
startAndConsumeTillEnd(connectorClass(), config().build());
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignal();
final int expectedRecordCount = ROW_COUNT;
final AtomicInteger recordCounter = new AtomicInteger();
final AtomicBoolean restarted = new AtomicBoolean();
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount, x -> true,
x -> {
if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
stopConnector();
assertConnectorNotRunning();
// restart connector with different threads count to make sure there is still no data loss.
start(connectorClass(), super.config().with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, INCREMENTAL_SNAPSHOT_THREADS + 2).build());
waitForConnectorToStart();
restarted.set(true);
}
});
for (int i = 0; i < expectedRecordCount; i++) {
assertThat(dbChanges).contains(entry(i + 1, i));
}
}
}

View File

@ -212,6 +212,7 @@ umachi,Hidetomi Umaki
sclarkson-zoomcare,Stephen Clarkson sclarkson-zoomcare,Stephen Clarkson
gongchanghua,Gong Chang Hua gongchanghua,Gong Chang Hua
angsdey2,Angshuman Dey angsdey2,Angshuman Dey
xidui,Yue Wang
jehrenzweig-pi,Jesse Ehrenzweig jehrenzweig-pi,Jesse Ehrenzweig
TechIsCool,David Beck TechIsCool,David Beck
cjmencias,Christian Jacob Mencias cjmencias,Christian Jacob Mencias