DBZ-7184 Refactored streaming loop of MongoDB connector

This commit is contained in:
Jakub Cechacek 2023-11-28 10:44:31 +01:00 committed by Jiri Pechanec
parent c7e87c5382
commit 8a5007a738
2 changed files with 111 additions and 57 deletions

View File

@ -17,6 +17,7 @@
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
@ -27,6 +28,7 @@
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.function.BlockingRunnable;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
@ -148,76 +150,90 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
try (var cursor = BufferingChangeStreamCursor.fromIterable(rsChangeStream, taskContext, streamingMetrics, clock).start()) {
while (context.isRunning()) {
waitWhenStreamingPaused(context);
var resumableEvent = cursor.tryNext();
if (resumableEvent == null) {
continue;
}
if (resumableEvent.hasDocument()) {
LOGGER.trace("Arrived Change Stream event: {}", resumableEvent);
try {
var event = splitHandler.handle(resumableEvent);
if (event.isEmpty()) {
continue;
}
final var completeEvent = event.get();
var result = resumableEvent.document
.map(doc -> processChangeStreamDocument(doc, splitHandler, replicaSet, rsPartition, rsOffsetContext))
.orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(cursor, rsPartition, rsOffsetContext)));
rsOffsetContext.changeStreamEvent(completeEvent);
CollectionId collectionId = new CollectionId(
replicaSet.replicaSetName(),
completeEvent.getNamespace().getDatabaseName(),
completeEvent.getNamespace().getCollectionName());
// Note that this will trigger a heartbeat request
dispatcher.dispatchDataChangeEvent(
rsPartition,
collectionId,
new MongoDbChangeRecordEmitter(
rsPartition,
rsOffsetContext,
clock,
completeEvent, connectorConfig));
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
return;
}
}
else {
LOGGER.trace("No Change Stream event, triggering heartbeat");
try {
// Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
if (cursor.getResumeToken() != null) {
rsOffsetContext.noEvent(cursor);
dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext);
}
}
catch (InterruptedException e) {
LOGGER.info("Replicator thread is interrupted");
Thread.currentThread().interrupt();
return;
}
}
try {
waitWhenStreamingPaused(context);
}
catch (InterruptedException e) {
LOGGER.info("Replicator thread is interrupted");
Thread.currentThread().interrupt();
if (result == StreamStatus.ERROR) {
return;
}
}
}
}
private void waitWhenStreamingPaused(ChangeEventSourceContext context) throws InterruptedException {
private void waitWhenStreamingPaused(ChangeEventSourceContext context) {
if (context.isPaused()) {
LOGGER.info("Streaming will now pause");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Streaming resumed");
errorHandled(() -> {
LOGGER.info("Streaming will now pause");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Streaming resumed");
});
}
}
private StreamStatus processChangeStreamDocument(
ChangeStreamDocument<BsonDocument> document,
SplitEventHandler<BsonDocument> splitHandler,
ReplicaSet replicaSet,
ReplicaSetPartition rsPartition,
ReplicaSetOffsetContext rsOffsetContext) {
LOGGER.trace("Arrived Change Stream event: {}", document);
return splitHandler
.handle(document)
.map(event -> errorHandled(() -> dispatchChangeEvent(event, replicaSet, rsPartition, rsOffsetContext)))
.orElse(StreamStatus.NEXT);
}
private void dispatchChangeEvent(
ChangeStreamDocument<BsonDocument> event,
ReplicaSet replicaSet,
ReplicaSetPartition rsPartition,
ReplicaSetOffsetContext rsOffsetContext)
throws InterruptedException {
var collectionId = new CollectionId(
replicaSet.replicaSetName(),
event.getNamespace().getDatabaseName(),
event.getNamespace().getCollectionName());
var emitter = new MongoDbChangeRecordEmitter(rsPartition, rsOffsetContext, clock, event, connectorConfig);
rsOffsetContext.changeStreamEvent(event);
dispatcher.dispatchDataChangeEvent(rsPartition, collectionId, emitter);
}
private void dispatchHeartbeatEvent(
BufferingChangeStreamCursor<BsonDocument> cursor,
ReplicaSetPartition rsPartition,
ReplicaSetOffsetContext rsOffsetContext)
throws InterruptedException {
LOGGER.trace("No Change Stream event arrived");
// Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
if (cursor.getResumeToken() != null) {
rsOffsetContext.noEvent(cursor);
dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext);
}
}
private StreamStatus errorHandled(BlockingRunnable action) {
try {
action.run();
return StreamStatus.DISPATCHED;
}
catch (InterruptedException e) {
LOGGER.info("Replicator thread is interrupted");
Thread.currentThread().interrupt();
return StreamStatus.ERROR;
}
catch (Exception e) {
errorHandler.setProducerThrowable(e);
return StreamStatus.ERROR;
}
}
@ -256,4 +272,22 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf
new TransactionContext(),
new MongoDbIncrementalSnapshotContext<>(false));
}
/**
* Indicates the status of event processing
*/
protected enum StreamStatus {
/**
* Event successfully dispatched
*/
DISPATCHED,
/**
* No event was dispatched and processing loop should advance to the next iteration immediately
*/
NEXT,
/**
* An error occurred and processing loop should be terminated
*/
ERROR,
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.function;
/**
* A variant of {@link Runnable} that can be blocked and interrupted.
*/
@FunctionalInterface
public interface BlockingRunnable {
/**
* Performs this action.
*
* @throws InterruptedException if the calling thread is interrupted while blocking
*/
void run() throws InterruptedException;
}