From 03e305e8352ee71b21dd3beaa4e69c7311a48220 Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Mon, 27 Nov 2023 15:11:25 +0100 Subject: [PATCH] DBZ-7184 Extracted split event handling from streaming source --- .../MongoDbStreamingChangeEventSource.java | 93 +++--------------- .../mongodb/events/SplitEventHandler.java | 98 +++++++++++++++++++ 2 files changed, 111 insertions(+), 80 deletions(-) create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/SplitEventHandler.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index d87b77246..4b2a59d2a 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -5,14 +5,10 @@ */ package io.debezium.connector.mongodb; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.bson.BsonDocument; import org.bson.BsonString; @@ -26,10 +22,10 @@ import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; -import io.debezium.DebeziumException; import io.debezium.connector.mongodb.connection.ConnectionContext; import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; +import io.debezium.connector.mongodb.events.SplitEventHandler; import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter; import io.debezium.pipeline.ErrorHandler; @@ -149,11 +145,9 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE LOGGER.info("Reading change stream for '{}'", replicaSet); final ReplicaSetPartition rsPartition = effectiveOffset.getReplicaSetPartition(replicaSet); final ReplicaSetOffsetContext rsOffsetContext = effectiveOffset.getReplicaSetOffsetContext(replicaSet); + final SplitEventHandler splitHandler = new SplitEventHandler<>(); final ChangeStreamIterable rsChangeStream = initChangeStream(client, rsOffsetContext); - - final List> fragmentBuffer = new ArrayList<>(16); - try (MongoChangeStreamCursor> cursor = rsChangeStream.cursor()) { // In Replicator, this used cursor.hasNext() but this is a blocking call and I observed that this can // delay the shutdown of the connector by up to 15 seconds or longer. By introducing a Metronome, we @@ -171,39 +165,19 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE if (event != null) { LOGGER.trace("Arrived Change Stream event: {}", event); noMessageIterations = 0; - var split = event.getSplitEvent(); - - if (split != null) { - var currentFragment = split.getFragment(); - var totalFragments = split.getOf(); - LOGGER.trace("Change Stream event is a fragment: {} of {}", currentFragment, totalFragments); - fragmentBuffer.add(event); - - // move to the next fragment if expected - if (currentFragment != totalFragments) { + try { + var maybeEvent = splitHandler.handle(event); + if (maybeEvent.isEmpty()) { continue; } + final var completeEvent = maybeEvent.get(); - // reconstruct the event - event = mergeEventFragments(fragmentBuffer); + rsOffsetContext.changeStreamEvent(maybeEvent.get()); + CollectionId collectionId = new CollectionId( + replicaSet.replicaSetName(), + completeEvent.getNamespace().getDatabaseName(), + completeEvent.getNamespace().getCollectionName()); - // clear the fragment buffer - fragmentBuffer.clear(); - } - - if (split == null && !fragmentBuffer.isEmpty()) { - LOGGER.error("Expected event fragment but a new event arrived"); - errorHandler.setProducerThrowable(new DebeziumException("Missing event fragment")); - return; - } - - rsOffsetContext.changeStreamEvent(event); - CollectionId collectionId = new CollectionId( - replicaSet.replicaSetName(), - event.getNamespace().getDatabaseName(), - event.getNamespace().getCollectionName()); - - try { // Note that this will trigger a heartbeat request dispatcher.dispatchDataChangeEvent( rsPartition, @@ -212,7 +186,7 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE rsPartition, rsOffsetContext, clock, - event, connectorConfig)); + completeEvent, connectorConfig)); } catch (Exception e) { errorHandler.setProducerThrowable(e); @@ -280,7 +254,7 @@ else if (offsetContext.lastTimestamp() != null) { if (connectorConfig.getCursorMaxAwaitTime() > 0) { stream.maxAwaitTime(connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS); } - + return stream; } @@ -291,45 +265,4 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false)); } - - @SuppressWarnings("DuplicatedCode") - private static ChangeStreamDocument mergeEventFragments(List> events) { - var operationTypeString = firstOrNull(events, ChangeStreamDocument::getOperationTypeString); - var resumeToken = events.get(events.size() - 1).getResumeToken(); - var namespaceDocument = firstOrNull(events, ChangeStreamDocument::getNamespaceDocument); - var destinationNamespaceDocument = firstOrNull(events, ChangeStreamDocument::getDestinationNamespaceDocument); - var fullDocument = firstOrNull(events, ChangeStreamDocument::getFullDocument); - var fullDocumentBeforeChange = firstOrNull(events, ChangeStreamDocument::getFullDocumentBeforeChange); - var documentKey = firstOrNull(events, ChangeStreamDocument::getDocumentKey); - var clusterTime = firstOrNull(events, ChangeStreamDocument::getClusterTime); - var updateDescription = firstOrNull(events, ChangeStreamDocument::getUpdateDescription); - var txnNumber = firstOrNull(events, ChangeStreamDocument::getTxnNumber); - var lsid = firstOrNull(events, ChangeStreamDocument::getLsid); - var wallTime = firstOrNull(events, ChangeStreamDocument::getWallTime); - var extraElements = firstOrNull(events, ChangeStreamDocument::getExtraElements); - - return new ChangeStreamDocument<>( - operationTypeString, - resumeToken, - namespaceDocument, - destinationNamespaceDocument, - fullDocument, - fullDocumentBeforeChange, - documentKey, - clusterTime, - updateDescription, - txnNumber, - lsid, - wallTime, - null, - extraElements); - } - - private static T firstOrNull(Collection> events, Function, T> getter) { - return events.stream() - .map(getter) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); - } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/SplitEventHandler.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/SplitEventHandler.java new file mode 100644 index 000000000..0252c46f2 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/SplitEventHandler.java @@ -0,0 +1,98 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb.events; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.client.model.changestream.ChangeStreamDocument; + +import io.debezium.DebeziumException; + +public class SplitEventHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(SplitEventHandler.class); + + final List> fragmentBuffer = new ArrayList<>(16); + + public Optional> handle(ChangeStreamDocument event) { + var split = event.getSplitEvent(); + + if (split != null) { + var currentFragment = split.getFragment(); + var totalFragments = split.getOf(); + LOGGER.trace("Change Stream event is a fragment: {} of {}", currentFragment, totalFragments); + fragmentBuffer.add(event); + + // not the final fragment + if (currentFragment != totalFragments) { + return Optional.empty(); + } + + // reconstruct the event and clear the buffer + var merged = mergeEventFragments(fragmentBuffer); + fragmentBuffer.clear(); + return Optional.of(merged); + } + + if (!fragmentBuffer.isEmpty()) { + LOGGER.error("Expected event fragment but a new event arrived"); + throw new DebeziumException("Missing event fragment"); + } + + return Optional.of(event); + } + + public boolean isEmpty() { + return fragmentBuffer.isEmpty(); + } + + private static ChangeStreamDocument mergeEventFragments(List> events) { + var operationTypeString = firstOrNull(events, ChangeStreamDocument::getOperationTypeString); + var resumeToken = events.get(events.size() - 1).getResumeToken(); + var namespaceDocument = firstOrNull(events, ChangeStreamDocument::getNamespaceDocument); + var destinationNamespaceDocument = firstOrNull(events, ChangeStreamDocument::getDestinationNamespaceDocument); + var fullDocument = firstOrNull(events, ChangeStreamDocument::getFullDocument); + var fullDocumentBeforeChange = firstOrNull(events, ChangeStreamDocument::getFullDocumentBeforeChange); + var documentKey = firstOrNull(events, ChangeStreamDocument::getDocumentKey); + var clusterTime = firstOrNull(events, ChangeStreamDocument::getClusterTime); + var updateDescription = firstOrNull(events, ChangeStreamDocument::getUpdateDescription); + var txnNumber = firstOrNull(events, ChangeStreamDocument::getTxnNumber); + var lsid = firstOrNull(events, ChangeStreamDocument::getLsid); + var wallTime = firstOrNull(events, ChangeStreamDocument::getWallTime); + var extraElements = firstOrNull(events, ChangeStreamDocument::getExtraElements); + + return new ChangeStreamDocument( + operationTypeString, + resumeToken, + namespaceDocument, + destinationNamespaceDocument, + fullDocument, + fullDocumentBeforeChange, + documentKey, + clusterTime, + updateDescription, + txnNumber, + lsid, + wallTime, + null, + extraElements); + } + + private static T firstOrNull(Collection> events, Function, T> getter) { + return events.stream() + .map(getter) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } +}