DBZ-7138 Additional Conditions for MongoDb Incremental Snapshot

This commit is contained in:
ani-sha 2024-04-10 11:39:53 +05:30 committed by Jiri Pechanec
parent a52ab79f76
commit fbe31a77cd
3 changed files with 115 additions and 19 deletions

View File

@ -20,7 +20,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonBinarySubType;
@ -361,12 +363,20 @@ private void nextDataCollection(MongoDbPartition partition, OffsetContext offset
private Object[] readMaximumKey() throws InterruptedException {
final CollectionId collectionId = (CollectionId) currentCollection.id();
String additionalCondition = getAdditionalConditions();
final AtomicReference<Object> key = new AtomicReference<>();
mongo.execute("maximum key for '" + collectionId + "'", client -> {
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<Document> collection = database.getCollection(collectionId.name());
final Document lastDocument;
final Document lastDocument = collection.find().sort(new Document(DOCUMENT_ID, -1)).limit(1).first();
if (!additionalCondition.isEmpty()) {
Document condition = Document.parse(additionalCondition);
lastDocument = collection.find(condition).sort(new Document(DOCUMENT_ID, -1)).limit(1).first();
}
else {
lastDocument = collection.find().sort(new Document(DOCUMENT_ID, -1)).limit(1).first();
}
if (lastDocument != null) {
key.set(lastDocument.get(DOCUMENT_ID));
}
@ -384,29 +394,36 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<MongoDbPartition> sig
final OffsetContext offsetContext = signalPayload.offsetContext;
final String correlationId = signalPayload.id;
if (!snapshotConfiguration.getAdditionalConditions().isEmpty()) {
throw new UnsupportedOperationException("Additional condition not supported for MongoDB");
}
if (!Strings.isNullOrEmpty(snapshotConfiguration.getSurrogateKey())) {
throw new UnsupportedOperationException("Surrogate key not supported for MongoDB");
}
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
final boolean shouldReadChunk = !context.snapshotRunning();
List<String> dataCollectionIds = snapshotConfiguration.getDataCollections();
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, List.of(), "");
List<String> expandedDataCollectionIds = expandAndDedupeDataCollectionIds(snapshotConfiguration.getDataCollections());
LOGGER.trace("Configured data collections {}", snapshotConfiguration.getDataCollections());
LOGGER.trace("Expanded data collections {}", expandedDataCollectionIds);
if (expandedDataCollectionIds.size() > snapshotConfiguration.getDataCollections().size()) {
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", snapshotConfiguration.getDataCollections(), expandedDataCollectionIds);
}
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, expandedDataCollectionIds,
snapshotConfiguration.getAdditionalConditions(), "");
if (shouldReadChunk) {
List<DataCollectionId> monitoredDataCollections = newDataCollectionIds.stream()
.map(DataCollection::getId).collect(Collectors.toList());
LOGGER.trace("Monitored data collections {}", newDataCollectionIds);
progressListener.snapshotStarted(partition);
notificationService
.incrementalSnapshotNotificationService()
.notifyStarted(context, partition, offsetContext);
notificationService.incrementalSnapshotNotificationService().notifyStarted(context, partition, offsetContext);
progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds.stream()
.map(x -> x.getId()).collect(Collectors.toList()));
progressListener.monitoredDataCollectionsDetermined(partition, monitoredDataCollections);
readChunk(partition, offsetContext);
}
}
@ -469,6 +486,22 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext
}
}
/**
* Expands the string-based list of data collection ids if supplied using regex to a list of
* all matching explicit data collection ids.
*/
private List<String> expandAndDedupeDataCollectionIds(List<String> collectionIds) {
return collectionIds
.stream()
.flatMap(dataId -> {
CollectionId collectionId = CollectionId.parse(dataId);
if (collectionId == null) {
return Stream.of(dataId);
}
return Pattern.compile(dataId).matcher(collectionId.identifier()).matches() ? Stream.of(collectionId.identifier()) : Stream.of(dataId);
}).distinct().collect(Collectors.toList());
}
/**
* Dispatches the data change events for the records of a single table.
*/
@ -483,7 +516,8 @@ private void createDataEventsForDataCollection(MongoDbPartition partition) throw
final MongoDatabase database = client.getDatabase(collectionId.dbName());
final MongoCollection<BsonDocument> collection = database.getCollection(collectionId.name(), BsonDocument.class);
Document predicate = constructQueryPredicate(context.chunkEndPosititon(), context.maximumKey().get());
Document predicate = constructQueryPredicate(context.chunkEndPosititon(), context.maximumKey().get(),
getAdditionalConditions());
LOGGER.debug("\t For collection '{}' using query: '{}', key: '{}', maximum key: '{}' to get all _id fields",
currentCollection.id(), predicate.toJson(), context.chunkEndPosititon(), context.maximumKey().get());
@ -551,7 +585,7 @@ protected Object[] addChunkToExecutor(final MongoCollection<BsonDocument> collec
}
private void queryChunk(MongoCollection<BsonDocument> collection, Object[] startKey, Object[] endKey) {
Document predicate = constructQueryPredicate(startKey, endKey);
Document predicate = constructQueryPredicate(startKey, endKey, getAdditionalConditions());
LOGGER.debug("\t For collection chunk, '{}' using query: '{}', key: '{}', maximum key: '{}'", currentCollection.id(),
predicate.toJson(), startKey, endKey);
@ -573,7 +607,7 @@ private void queryChunk(MongoCollection<BsonDocument> collection, Object[] start
}
}
private Document constructQueryPredicate(Object[] startKey, Object[] endKey) {
private Document constructQueryPredicate(Object[] startKey, Object[] endKey, String additionalConditions) {
final Document maxKeyPredicate = new Document();
final Document maxKeyOp = new Document();
@ -589,11 +623,20 @@ private Document constructQueryPredicate(Object[] startKey, Object[] endKey) {
final Document chunkEndOp = new Document();
chunkEndOp.put("$gt", startKey[0]);
chunkEndPredicate.put(DOCUMENT_ID, chunkEndOp);
predicate = new Document();
predicate.put("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate));
if (!additionalConditions.isEmpty()) {
Document additionalConditionsPredicate = Document.parse(additionalConditions);
predicate = new Document("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate, additionalConditionsPredicate));
}
else {
predicate = new Document("$and", Arrays.asList(chunkEndPredicate, maxKeyPredicate));
}
}
return predicate;
}
return predicate;
private String getAdditionalConditions() {
// Strip additional parenthesis to make sure additional conditions are parsed correctly
return context.currentDataCollectionId().getAdditionalCondition().map(s -> s.substring(1, s.length() - 1)).orElse("");
}
private void incrementTableRowsScanned(long rows) {

View File

@ -20,6 +20,7 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -234,14 +235,27 @@ private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
@SuppressWarnings("unchecked")
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, List<AdditionalCondition> additionalCondition,
String surrogateKey) {
LOGGER.trace("Adding data collections names {} to snapshot", dataCollectionIds);
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> new DataCollection<T>((T) CollectionId.parse(x)))
.map(buildDataCollection(additionalCondition, surrogateKey))
.filter(x -> x.getId() != null) // Remove collections with incorrectly formatted name
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
this.correlationId = correlationId;
return newDataCollectionIds;
}
private Function<String, DataCollection<T>> buildDataCollection(List<AdditionalCondition> additionalCondition, String surrogateKey) {
return expandedCollectionName -> {
String filter = additionalCondition.stream()
.filter(condition -> condition.getDataCollection().matcher(expandedCollectionName).matches())
.map(AdditionalCondition::getFilter)
.findFirst()
.orElse("");
return new DataCollection<T>((T) CollectionId.parse(expandedCollectionName), filter, surrogateKey);
};
}
@Override
public void stopSnapshot() {
this.dataCollectionsToSnapshot.clear();

View File

@ -228,6 +228,20 @@ protected void sendAdHocSnapshotStopSignal(String... dataCollectionIds) throws S
+ "}\"}") });
}
protected void sendAdHocSnapshotSignalWithAdditionalConditions(Map<String, String> additionalConditions, String... dataCollectionIds) throws SQLException {
final String conditions = additionalConditions.entrySet().stream()
.map(e -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", e.getKey(), e.getValue())).collect(
Collectors.joining(","));
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> "\"" + x + "\"")
.collect(Collectors.joining(", "));
insertDocuments("dbA", "signals",
Document.parse("{\"type\": \"execute-snapshot\", \"payload\": {"
+ "\"type\": \"INCREMENTAL\","
+ "\"data-collections\": [" + dataCollectionIdsList + "],"
+ "\"additional-conditions\": [" + conditions + "]}}"));
}
protected void sendAdHocSnapshotSignal() throws SQLException {
sendAdHocSnapshotSignal(fullDataCollectionName());
}
@ -810,6 +824,31 @@ public void insertDeleteWatermarkingStrategy() throws Exception {
assertCloseEventCount(closeEventCount -> assertThat(closeEventCount).isZero());
}
@Test
public void executeIncrementalSnapshotWithAdditionalCondition() throws Exception {
// Testing.Print.enable();
final LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
populateDataCollection(dataCollectionNames().get(1));
startConnector();
waitForConnectorToStart();
waitForStreamingRunning("mongodb", "mongo1", getStreamingNamespace(), "0");
sendAdHocSnapshotSignalWithAdditionalConditions(
Map.of(fullDataCollectionNames().get(1), "{ aa: { $lt: 250 } }"),
fullDataCollectionNames().get(1));
final int expectedRecordCount = 250;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount, topicNames().get(1));
for (int i = 0; i < expectedRecordCount; i++) {
assertThat(dbChanges).contains(entry(i + 1, i));
}
assertThat(interceptor.containsMessage("No data returned by the query, incremental snapshotting of table 'dbA.c2' finished")).isTrue();
assertCloseEventCount(closeEventCount -> assertThat(closeEventCount).isNotZero());
}
private void assertCloseEventCount(Consumer<Long> consumer) {
try (var client = TestHelper.connect(mongo)) {