DBZ-7184 Extracted split event handling from streaming source

This commit is contained in:
Jakub Cechacek 2023-11-27 15:11:25 +01:00 committed by Jiri Pechanec
parent 3f0eeebf10
commit 03e305e835
2 changed files with 111 additions and 80 deletions

View File

@ -5,14 +5,10 @@
*/
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.bson.BsonDocument;
import org.bson.BsonString;
@ -26,10 +22,10 @@
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
@ -149,11 +145,9 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
LOGGER.info("Reading change stream for '{}'", replicaSet);
final ReplicaSetPartition rsPartition = effectiveOffset.getReplicaSetPartition(replicaSet);
final ReplicaSetOffsetContext rsOffsetContext = effectiveOffset.getReplicaSetOffsetContext(replicaSet);
final SplitEventHandler<BsonDocument> splitHandler = new SplitEventHandler<>();
final ChangeStreamIterable<BsonDocument> rsChangeStream = initChangeStream(client, rsOffsetContext);
final List<ChangeStreamDocument<BsonDocument>> fragmentBuffer = new ArrayList<>(16);
try (MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor = rsChangeStream.cursor()) {
// In Replicator, this used cursor.hasNext() but this is a blocking call and I observed that this can
// delay the shutdown of the connector by up to 15 seconds or longer. By introducing a Metronome, we
@ -171,39 +165,19 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
if (event != null) {
LOGGER.trace("Arrived Change Stream event: {}", event);
noMessageIterations = 0;
var split = event.getSplitEvent();
if (split != null) {
var currentFragment = split.getFragment();
var totalFragments = split.getOf();
LOGGER.trace("Change Stream event is a fragment: {} of {}", currentFragment, totalFragments);
fragmentBuffer.add(event);
// move to the next fragment if expected
if (currentFragment != totalFragments) {
try {
var maybeEvent = splitHandler.handle(event);
if (maybeEvent.isEmpty()) {
continue;
}
final var completeEvent = maybeEvent.get();
// reconstruct the event
event = mergeEventFragments(fragmentBuffer);
rsOffsetContext.changeStreamEvent(maybeEvent.get());
CollectionId collectionId = new CollectionId(
replicaSet.replicaSetName(),
completeEvent.getNamespace().getDatabaseName(),
completeEvent.getNamespace().getCollectionName());
// clear the fragment buffer
fragmentBuffer.clear();
}
if (split == null && !fragmentBuffer.isEmpty()) {
LOGGER.error("Expected event fragment but a new event arrived");
errorHandler.setProducerThrowable(new DebeziumException("Missing event fragment"));
return;
}
rsOffsetContext.changeStreamEvent(event);
CollectionId collectionId = new CollectionId(
replicaSet.replicaSetName(),
event.getNamespace().getDatabaseName(),
event.getNamespace().getCollectionName());
try {
// Note that this will trigger a heartbeat request
dispatcher.dispatchDataChangeEvent(
rsPartition,
@ -212,7 +186,7 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
rsPartition,
rsOffsetContext,
clock,
event, connectorConfig));
completeEvent, connectorConfig));
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
@ -280,7 +254,7 @@ else if (offsetContext.lastTimestamp() != null) {
if (connectorConfig.getCursorMaxAwaitTime() > 0) {
stream.maxAwaitTime(connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
}
return stream;
}
@ -291,45 +265,4 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf
new TransactionContext(),
new MongoDbIncrementalSnapshotContext<>(false));
}
@SuppressWarnings("DuplicatedCode")
private static ChangeStreamDocument<BsonDocument> mergeEventFragments(List<ChangeStreamDocument<BsonDocument>> events) {
var operationTypeString = firstOrNull(events, ChangeStreamDocument::getOperationTypeString);
var resumeToken = events.get(events.size() - 1).getResumeToken();
var namespaceDocument = firstOrNull(events, ChangeStreamDocument::getNamespaceDocument);
var destinationNamespaceDocument = firstOrNull(events, ChangeStreamDocument::getDestinationNamespaceDocument);
var fullDocument = firstOrNull(events, ChangeStreamDocument::getFullDocument);
var fullDocumentBeforeChange = firstOrNull(events, ChangeStreamDocument::getFullDocumentBeforeChange);
var documentKey = firstOrNull(events, ChangeStreamDocument::getDocumentKey);
var clusterTime = firstOrNull(events, ChangeStreamDocument::getClusterTime);
var updateDescription = firstOrNull(events, ChangeStreamDocument::getUpdateDescription);
var txnNumber = firstOrNull(events, ChangeStreamDocument::getTxnNumber);
var lsid = firstOrNull(events, ChangeStreamDocument::getLsid);
var wallTime = firstOrNull(events, ChangeStreamDocument::getWallTime);
var extraElements = firstOrNull(events, ChangeStreamDocument::getExtraElements);
return new ChangeStreamDocument<>(
operationTypeString,
resumeToken,
namespaceDocument,
destinationNamespaceDocument,
fullDocument,
fullDocumentBeforeChange,
documentKey,
clusterTime,
updateDescription,
txnNumber,
lsid,
wallTime,
null,
extraElements);
}
private static <T> T firstOrNull(Collection<ChangeStreamDocument<BsonDocument>> events, Function<ChangeStreamDocument<BsonDocument>, T> getter) {
return events.stream()
.map(getter)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.events;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
public class SplitEventHandler<TResult> {
private static final Logger LOGGER = LoggerFactory.getLogger(SplitEventHandler.class);
final List<ChangeStreamDocument<TResult>> fragmentBuffer = new ArrayList<>(16);
public Optional<ChangeStreamDocument<TResult>> handle(ChangeStreamDocument<TResult> event) {
var split = event.getSplitEvent();
if (split != null) {
var currentFragment = split.getFragment();
var totalFragments = split.getOf();
LOGGER.trace("Change Stream event is a fragment: {} of {}", currentFragment, totalFragments);
fragmentBuffer.add(event);
// not the final fragment
if (currentFragment != totalFragments) {
return Optional.empty();
}
// reconstruct the event and clear the buffer
var merged = mergeEventFragments(fragmentBuffer);
fragmentBuffer.clear();
return Optional.of(merged);
}
if (!fragmentBuffer.isEmpty()) {
LOGGER.error("Expected event fragment but a new event arrived");
throw new DebeziumException("Missing event fragment");
}
return Optional.of(event);
}
public boolean isEmpty() {
return fragmentBuffer.isEmpty();
}
private static <TResult> ChangeStreamDocument<TResult> mergeEventFragments(List<ChangeStreamDocument<TResult>> events) {
var operationTypeString = firstOrNull(events, ChangeStreamDocument::getOperationTypeString);
var resumeToken = events.get(events.size() - 1).getResumeToken();
var namespaceDocument = firstOrNull(events, ChangeStreamDocument::getNamespaceDocument);
var destinationNamespaceDocument = firstOrNull(events, ChangeStreamDocument::getDestinationNamespaceDocument);
var fullDocument = firstOrNull(events, ChangeStreamDocument::getFullDocument);
var fullDocumentBeforeChange = firstOrNull(events, ChangeStreamDocument::getFullDocumentBeforeChange);
var documentKey = firstOrNull(events, ChangeStreamDocument::getDocumentKey);
var clusterTime = firstOrNull(events, ChangeStreamDocument::getClusterTime);
var updateDescription = firstOrNull(events, ChangeStreamDocument::getUpdateDescription);
var txnNumber = firstOrNull(events, ChangeStreamDocument::getTxnNumber);
var lsid = firstOrNull(events, ChangeStreamDocument::getLsid);
var wallTime = firstOrNull(events, ChangeStreamDocument::getWallTime);
var extraElements = firstOrNull(events, ChangeStreamDocument::getExtraElements);
return new ChangeStreamDocument<TResult>(
operationTypeString,
resumeToken,
namespaceDocument,
destinationNamespaceDocument,
fullDocument,
fullDocumentBeforeChange,
documentKey,
clusterTime,
updateDescription,
txnNumber,
lsid,
wallTime,
null,
extraElements);
}
private static <TResult, T> T firstOrNull(Collection<ChangeStreamDocument<TResult>> events, Function<ChangeStreamDocument<TResult>, T> getter) {
return events.stream()
.map(getter)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
}