diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 0742a4df9..0c260571e 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -516,6 +516,7 @@ Yoann Rodière Yohei Yoshimuta Yossi Shirizli Yuan Zhang +Yue Wang Yuiham Chan Zheng Wang Zongwen Li diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index 1db16e91c..01633f622 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -10,10 +10,14 @@ import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.SUCCEEDED; import static io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.TableScanCompletionStatus.UNKNOWN_SCHEMA; +import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -25,6 +29,7 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Projections; import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; @@ -74,13 +79,15 @@ public class MongoDbIncrementalSnapshotChangeEventSource protected EventDispatcher dispatcher; protected IncrementalSnapshotContext context = null; - protected final Map window = new LinkedHashMap<>(); + protected final Map window = new ConcurrentHashMap<>(); private MongoDbConnection mongo; private final CollectionId signallingCollectionId; protected final NotificationService notificationService; + private final ExecutorService incrementalSnapshotThreadPool; + public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config, MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets, EventDispatcher dispatcher, @@ -100,6 +107,8 @@ public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config this.signallingCollectionId = connectorConfig.getSignalingDataCollectionId() == null ? null : CollectionId.parse("UNUSED", connectorConfig.getSignalingDataCollectionId()); this.notificationService = notificationService; + this.incrementalSnapshotThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, config.getConnectorName(), + "incremental-snapshot-" + replicaSets.all().get(0).replicaSetName(), connectorConfig.getSnapshotMaxThreads()); } @Override @@ -512,52 +521,49 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw LOGGER.debug("Exporting data chunk from collection '{}' (total {} collections)", currentCollection.id(), context.dataCollectionsToBeSnapshottedCount()); mongo.execute("chunk query key for '" + currentCollection.id() + "'", client -> { + final int threads = connectorConfig.getSnapshotMaxThreads(); + final int chunkSize = connectorConfig.getIncrementalSnapshotChunkSize(); final MongoDatabase database = client.getDatabase(collectionId.dbName()); final MongoCollection collection = database.getCollection(collectionId.name(), BsonDocument.class); - final Document maxKeyPredicate = new Document(); - final Document maxKeyOp = new Document(); - maxKeyOp.put("$lte", context.maximumKey().get()[0]); - maxKeyPredicate.put(DOCUMENT_ID, maxKeyOp); - - Document predicate = maxKeyPredicate; - - if (context.chunkEndPosititon() != null) { - final Document chunkEndPredicate = new Document(); - final Document chunkEndOp = new Document(); - chunkEndOp.put("$gt", context.chunkEndPosititon()[0]); - chunkEndPredicate.put(DOCUMENT_ID, chunkEndOp); - predicate = new Document(); - predicate.put("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate)); - } - - LOGGER.debug("\t For collection '{}' using query: '{}', key: '{}', maximum key: '{}'", currentCollection.id(), - predicate.toJson(), context.chunkEndPosititon(), context.maximumKey().get()); + Document predicate = constructQueryPredicate(context.chunkEndPosititon(), context.maximumKey().get()); + LOGGER.debug("\t For collection '{}' using query: '{}', key: '{}', maximum key: '{}' to get all _id fields", + currentCollection.id(), predicate.toJson(), context.chunkEndPosititon(), context.maximumKey().get()); long rows = 0; - Timer logTimer = getTableScanLogTimer(); - Object[] lastRow = null; Object[] firstRow = null; + List> futureList = new ArrayList<>(); + Object[] lastChunkKey = context.chunkEndPosititon(); - for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1)) - .limit(connectorConfig.getIncrementalSnapshotChunkSize())) { + for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1)).projection(Projections.include(DOCUMENT_ID)) + .limit(chunkSize * threads)) { rows++; final Object[] row = new Object[]{ doc }; if (firstRow == null) { firstRow = row; } - final Struct keyStruct = currentCollection.keyFromDocumentSnapshot(doc); - window.put(keyStruct, row); - if (logTimer.expired()) { - long stop = clock.currentTimeInMillis(); - LOGGER.debug("\t Exported {} records for collection '{}' after {}", rows, currentCollection.id(), - Strings.duration(stop - exportStart)); - logTimer = getTableScanLogTimer(); - } - lastRow = row; + + if (rows % chunkSize == 0) { + lastChunkKey = addChunkToExecutor(collection, lastRow, futureList, lastChunkKey); + } } + + // in case the last iteration doesn't have enough data, do it once again for the rest of the rows + if (rows % chunkSize != 0) { + addChunkToExecutor(collection, lastRow, futureList, lastChunkKey); + } + + try { + for (Future future : futureList) { + future.get(); // Wait for the tasks to complete + } + } + catch (ExecutionException e) { + throw new DebeziumException("Error while processing chunk", e); + } + final Object[] firstKey = keyFromRow(firstRow); final Object[] lastKey = keyFromRow(lastRow); if (context.isNonInitialChunk()) { @@ -577,6 +583,62 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw }); } + protected Object[] addChunkToExecutor(final MongoCollection collection, Object[] lastRow, + List> futureList, Object[] lastChunkKey) { + final Object[] chunkStartKey = lastChunkKey; + final Object[] chunkEndKey = keyFromRow(lastRow); + futureList.add(this.incrementalSnapshotThreadPool.submit(() -> { + queryChunk(collection, chunkStartKey, chunkEndKey); + })); + return chunkEndKey; + } + + private void queryChunk(MongoCollection collection, Object[] startKey, Object[] endKey) { + Document predicate = constructQueryPredicate(startKey, endKey); + LOGGER.debug("\t For collection chunk, '{}' using query: '{}', key: '{}', maximum key: '{}'", currentCollection.id(), + predicate.toJson(), startKey, endKey); + + long rows = 0; + long exportStart = clock.currentTimeInMillis(); + Timer logTimer = getTableScanLogTimer(); + + for (BsonDocument doc : collection.find(predicate).sort(new Document(DOCUMENT_ID, 1))) { + rows++; + final Object[] row = new Object[]{ doc }; + final Struct keyStruct = currentCollection.keyFromDocumentSnapshot(doc); + window.put(keyStruct, row); + if (logTimer.expired()) { + long stop = clock.currentTimeInMillis(); + LOGGER.debug("\t Exported {} records for collection '{}' after {}", rows, currentCollection.id(), + Strings.duration(stop - exportStart)); + logTimer = getTableScanLogTimer(); + } + } + } + + private Document constructQueryPredicate(Object[] startKey, Object[] endKey) { + final Document maxKeyPredicate = new Document(); + final Document maxKeyOp = new Document(); + + if (endKey != null) { + maxKeyOp.put("$lte", endKey[0]); + maxKeyPredicate.put(DOCUMENT_ID, maxKeyOp); + } + + Document predicate = maxKeyPredicate; + + if (startKey != null) { + final Document chunkEndPredicate = new Document(); + final Document chunkEndOp = new Document(); + chunkEndOp.put("$gt", startKey[0]); + chunkEndPredicate.put(DOCUMENT_ID, chunkEndOp); + predicate = new Document(); + predicate.put("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate)); + } + + return predicate; + } + private void incrementTableRowsScanned(long rows) { totalRowsScanned += rows; // TODO This metric is not provided by MongoDB diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MultiThreadIncrementalSnapshotIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MultiThreadIncrementalSnapshotIT.java new file mode 100644 index 000000000..9e2cbf1de --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MultiThreadIncrementalSnapshotIT.java @@ -0,0 +1,86 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import org.junit.Test; + +import io.debezium.config.Configuration; + +/** + * Test to verify multi-thread incremental snapshotting for MongoDB. + * + * @author Yue Wang + */ +public class MultiThreadIncrementalSnapshotIT extends IncrementalSnapshotIT { + + protected static final int ROW_COUNT = 1_000; + private static final int INCREMENTAL_SNAPSHOT_THREADS = 7; // use a prime number in tests to cover the cases where the last chunk is less than the chunk size. + + @Override + protected Configuration.Builder config() { + Configuration.Builder builder = super.config(); + return builder.with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, INCREMENTAL_SNAPSHOT_THREADS); + } + + @Test + public void multiThreadingSnapshot() throws Exception { + // Testing.Print.enable(); + + populateDataCollection(); + startConnector(); + + sendAdHocSnapshotSignal(); + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); + for (int i = 0; i < expectedRecordCount; i++) { + assertThat(dbChanges).contains(entry(i + 1, i)); + } + } + + @Test + public void multiThreadSnapshotWithRestart() throws Exception { + // Testing.Print.enable(); + + populateDataCollection(); + startAndConsumeTillEnd(connectorClass(), config().build()); + waitForConnectorToStart(); + + waitForAvailableRecords(1, TimeUnit.SECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + sendAdHocSnapshotSignal(); + + final int expectedRecordCount = ROW_COUNT; + final AtomicInteger recordCounter = new AtomicInteger(); + final AtomicBoolean restarted = new AtomicBoolean(); + final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount, x -> true, + x -> { + if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) { + stopConnector(); + assertConnectorNotRunning(); + + // restart connector with different threads count to make sure there is still no data loss. + start(connectorClass(), super.config().with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, INCREMENTAL_SNAPSHOT_THREADS + 2).build()); + waitForConnectorToStart(); + restarted.set(true); + } + }); + for (int i = 0; i < expectedRecordCount; i++) { + assertThat(dbChanges).contains(entry(i + 1, i)); + } + } +} diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index 87d8cb7b4..ebba5a793 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -212,6 +212,7 @@ umachi,Hidetomi Umaki sclarkson-zoomcare,Stephen Clarkson gongchanghua,Gong Chang Hua angsdey2,Angshuman Dey +xidui,Yue Wang jehrenzweig-pi,Jesse Ehrenzweig TechIsCool,David Beck cjmencias,Christian Jacob Mencias