From 9ad6f9c7db9cd3caa730d9a8a38949a55566bcae Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Wed, 29 Nov 2023 17:22:56 +0100 Subject: [PATCH] DBZ-7184 Streaming loop now always uses resumable event to get resume token --- .../MongoDbStreamingChangeEventSource.java | 9 +++--- .../mongodb/ReplicaSetOffsetContext.java | 5 ++-- .../connector/mongodb/SourceInfo.java | 30 +++++++++++++------ 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index 6bc2b4c98..e1f6eb294 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -25,6 +25,7 @@ import io.debezium.connector.mongodb.connection.MongoDbConnection; import io.debezium.connector.mongodb.connection.ReplicaSet; import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor; +import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent; import io.debezium.connector.mongodb.events.SplitEventHandler; import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter; @@ -158,7 +159,7 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE var result = resumableEvent.document .map(doc -> processChangeStreamDocument(doc, splitHandler, replicaSet, rsPartition, rsOffsetContext)) - .orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(cursor, rsPartition, rsOffsetContext))); + .orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(resumableEvent, rsPartition, rsOffsetContext))); if (result == StreamStatus.ERROR) { return; @@ -208,15 +209,15 @@ private void dispatchChangeEvent( } private void dispatchHeartbeatEvent( - BufferingChangeStreamCursor cursor, + ResumableChangeStreamEvent event, ReplicaSetPartition rsPartition, ReplicaSetOffsetContext rsOffsetContext) throws InterruptedException { LOGGER.trace("No Change Stream event arrived"); // Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs: // > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed. - if (cursor.getResumeToken() != null) { - rsOffsetContext.noEvent(cursor); + if (event.resumeToken != null) { + rsOffsetContext.noEvent(event); dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java index 9ea3cdee0..e4c4cc751 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java @@ -19,6 +19,7 @@ import io.debezium.annotation.ThreadSafe; import io.debezium.connector.mongodb.connection.ReplicaSet; +import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent; import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -119,8 +120,8 @@ public void initFromOpTimeIfNeeded(BsonTimestamp timestamp) { sourceInfo.noEvent(replicaSetName, timestamp); } - public void noEvent(MongoChangeStreamCursor cursor) { - sourceInfo.noEvent(replicaSetName, cursor); + public void noEvent(ResumableChangeStreamEvent event) { + sourceInfo.noEvent(replicaSetName, event); } public void changeStreamEvent(ChangeStreamDocument changeStreamEvent) { diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java index 35edce2f9..6a57e20a3 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/SourceInfo.java @@ -26,6 +26,7 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.connector.SnapshotRecord; import io.debezium.connector.common.BaseSourceInfo; +import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent; import io.debezium.util.Collect; /** @@ -266,28 +267,39 @@ public void initEvent(String replicaSetName, MongoChangeStreamCursor event) { + if (event.resumeToken == null || event.hasDocument()) { + return; + } + noEvent(replicaSetName, ResumeTokens.getDataString(event.resumeToken)); + } + public void noEvent(String replicaSetName, MongoChangeStreamCursor cursor) { if (cursor == null || cursor.getResumeToken() == null) { return; } - - String namespace = ""; - long wallTime = 0L; - String resumeToken = ResumeTokens.getDataString(cursor.getResumeToken()); - Position position = Position.changeStreamPosition(null, resumeToken, null); - positionsByReplicaSetName.put(replicaSetName, position); - - onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime); + noEvent(replicaSetName, ResumeTokens.getDataString(cursor.getResumeToken())); } public void noEvent(String replicaSetName, BsonTimestamp timestamp) { if (timestamp == null) { return; } + Position position = Position.changeStreamPosition(timestamp, null, null); + noEvent(replicaSetName, position); + } + private void noEvent(String replicaSetName, String resumeToken) { + if (resumeToken == null) { + return; + } + Position position = Position.changeStreamPosition(null, resumeToken, null); + noEvent(replicaSetName, position); + } + + private void noEvent(String replicaSetName, Position position) { String namespace = ""; long wallTime = 0L; - Position position = Position.changeStreamPosition(timestamp, null, null); positionsByReplicaSetName.put(replicaSetName, position); onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);