diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 3e741aa5a..6bc2b4c98 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -17,6 +17,7 @@ import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; +import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; @@ -27,6 +28,7 @@ import io.debezium.connector.mongodb.events.SplitEventHandler; import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter; +import io.debezium.function.BlockingRunnable; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; @@ -148,76 +150,90 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE try (var cursor = BufferingChangeStreamCursor.fromIterable(rsChangeStream, taskContext, streamingMetrics, clock).start()) { while (context.isRunning()) { + waitWhenStreamingPaused(context); var resumableEvent = cursor.tryNext(); if (resumableEvent == null) { continue; } - if (resumableEvent.hasDocument()) { - LOGGER.trace("Arrived Change Stream event: {}", resumableEvent); - try { - var event = splitHandler.handle(resumableEvent); - if (event.isEmpty()) { - continue; - } - final var completeEvent = event.get(); + var result = resumableEvent.document + .map(doc -> processChangeStreamDocument(doc, splitHandler, replicaSet, rsPartition, rsOffsetContext)) + .orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(cursor, rsPartition, rsOffsetContext))); - rsOffsetContext.changeStreamEvent(completeEvent); - CollectionId collectionId = new CollectionId( - replicaSet.replicaSetName(), - completeEvent.getNamespace().getDatabaseName(), - completeEvent.getNamespace().getCollectionName()); - - // Note that this will trigger a heartbeat request - dispatcher.dispatchDataChangeEvent( - rsPartition, - collectionId, - new MongoDbChangeRecordEmitter( - rsPartition, - rsOffsetContext, - clock, - completeEvent, connectorConfig)); - } - catch (Exception e) { - errorHandler.setProducerThrowable(e); - return; - } - } - else { - LOGGER.trace("No Change Stream event, triggering heartbeat"); - try { - // Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs: - // > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed. - if (cursor.getResumeToken() != null) { - rsOffsetContext.noEvent(cursor); - dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext); - } - } - catch (InterruptedException e) { - LOGGER.info("Replicator thread is interrupted"); - Thread.currentThread().interrupt(); - return; - } - } - - try { - waitWhenStreamingPaused(context); - } - catch (InterruptedException e) { - LOGGER.info("Replicator thread is interrupted"); - Thread.currentThread().interrupt(); + if (result == StreamStatus.ERROR) { return; } } } } - private void waitWhenStreamingPaused(ChangeEventSourceContext context) throws InterruptedException { + private void waitWhenStreamingPaused(ChangeEventSourceContext context) { if (context.isPaused()) { - LOGGER.info("Streaming will now pause"); - context.streamingPaused(); - context.waitSnapshotCompletion(); - LOGGER.info("Streaming resumed"); + errorHandled(() -> { + LOGGER.info("Streaming will now pause"); + context.streamingPaused(); + context.waitSnapshotCompletion(); + LOGGER.info("Streaming resumed"); + }); + } + } + + private StreamStatus processChangeStreamDocument( + ChangeStreamDocument document, + SplitEventHandler splitHandler, + ReplicaSet replicaSet, + ReplicaSetPartition rsPartition, + ReplicaSetOffsetContext rsOffsetContext) { + LOGGER.trace("Arrived Change Stream event: {}", document); + return splitHandler + .handle(document) + .map(event -> errorHandled(() -> dispatchChangeEvent(event, replicaSet, rsPartition, rsOffsetContext))) + .orElse(StreamStatus.NEXT); + } + + private void dispatchChangeEvent( + ChangeStreamDocument event, + ReplicaSet replicaSet, + ReplicaSetPartition rsPartition, + ReplicaSetOffsetContext rsOffsetContext) + throws InterruptedException { + var collectionId = new CollectionId( + replicaSet.replicaSetName(), + event.getNamespace().getDatabaseName(), + event.getNamespace().getCollectionName()); + + var emitter = new MongoDbChangeRecordEmitter(rsPartition, rsOffsetContext, clock, event, connectorConfig); + rsOffsetContext.changeStreamEvent(event); + dispatcher.dispatchDataChangeEvent(rsPartition, collectionId, emitter); + } + + private void dispatchHeartbeatEvent( + BufferingChangeStreamCursor cursor, + ReplicaSetPartition rsPartition, + ReplicaSetOffsetContext rsOffsetContext) + throws InterruptedException { + LOGGER.trace("No Change Stream event arrived"); + // Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs: + // > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed. + if (cursor.getResumeToken() != null) { + rsOffsetContext.noEvent(cursor); + dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext); + } + } + + private StreamStatus errorHandled(BlockingRunnable action) { + try { + action.run(); + return StreamStatus.DISPATCHED; + } + catch (InterruptedException e) { + LOGGER.info("Replicator thread is interrupted"); + Thread.currentThread().interrupt(); + return StreamStatus.ERROR; + } + catch (Exception e) { + errorHandler.setProducerThrowable(e); + return StreamStatus.ERROR; } } @@ -256,4 +272,22 @@ protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConf new TransactionContext(), new MongoDbIncrementalSnapshotContext<>(false)); } + + /** + * Indicates the status of event processing + */ + protected enum StreamStatus { + /** + * Event successfully dispatched + */ + DISPATCHED, + /** + * No event was dispatched and processing loop should advance to the next iteration immediately + */ + NEXT, + /** + * An error occurred and processing loop should be terminated + */ + ERROR, + } } diff --git a/debezium-core/src/main/java/io/debezium/function/BlockingRunnable.java b/debezium-core/src/main/java/io/debezium/function/BlockingRunnable.java new file mode 100644 index 000000000..a185ac7d8 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/function/BlockingRunnable.java @@ -0,0 +1,20 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.function; + +/** + * A variant of {@link Runnable} that can be blocked and interrupted. + */ +@FunctionalInterface +public interface BlockingRunnable { + + /** + * Performs this action. + * + * @throws InterruptedException if the calling thread is interrupted while blocking + */ + void run() throws InterruptedException; +}