DBZ-7184 Streaming loop now always uses resumable event to get resume token

This commit is contained in:
Jakub Cechacek 2023-11-29 17:22:56 +01:00 committed by Jiri Pechanec
parent 8a5007a738
commit 9ad6f9c7db
3 changed files with 29 additions and 15 deletions

View File

@ -25,6 +25,7 @@
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent;
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
@ -158,7 +159,7 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
var result = resumableEvent.document
.map(doc -> processChangeStreamDocument(doc, splitHandler, replicaSet, rsPartition, rsOffsetContext))
.orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(cursor, rsPartition, rsOffsetContext)));
.orElseGet(() -> errorHandled(() -> dispatchHeartbeatEvent(resumableEvent, rsPartition, rsOffsetContext)));
if (result == StreamStatus.ERROR) {
return;
@ -208,15 +209,15 @@ private void dispatchChangeEvent(
}
private void dispatchHeartbeatEvent(
BufferingChangeStreamCursor<BsonDocument> cursor,
ResumableChangeStreamEvent<BsonDocument> event,
ReplicaSetPartition rsPartition,
ReplicaSetOffsetContext rsOffsetContext)
throws InterruptedException {
LOGGER.trace("No Change Stream event arrived");
// Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
if (cursor.getResumeToken() != null) {
rsOffsetContext.noEvent(cursor);
if (event.resumeToken != null) {
rsOffsetContext.noEvent(event);
dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext);
}
}

View File

@ -19,6 +19,7 @@
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -119,8 +120,8 @@ public void initFromOpTimeIfNeeded(BsonTimestamp timestamp) {
sourceInfo.noEvent(replicaSetName, timestamp);
}
public void noEvent(MongoChangeStreamCursor<?> cursor) {
sourceInfo.noEvent(replicaSetName, cursor);
public void noEvent(ResumableChangeStreamEvent<BsonDocument> event) {
sourceInfo.noEvent(replicaSetName, event);
}
public void changeStreamEvent(ChangeStreamDocument<BsonDocument> changeStreamEvent) {

View File

@ -26,6 +26,7 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor.ResumableChangeStreamEvent;
import io.debezium.util.Collect;
/**
@ -266,28 +267,39 @@ public void initEvent(String replicaSetName, MongoChangeStreamCursor<ChangeStrea
}
}
public void noEvent(String replicaSetName, ResumableChangeStreamEvent<BsonDocument> event) {
if (event.resumeToken == null || event.hasDocument()) {
return;
}
noEvent(replicaSetName, ResumeTokens.getDataString(event.resumeToken));
}
public void noEvent(String replicaSetName, MongoChangeStreamCursor<?> cursor) {
if (cursor == null || cursor.getResumeToken() == null) {
return;
}
String namespace = "";
long wallTime = 0L;
String resumeToken = ResumeTokens.getDataString(cursor.getResumeToken());
Position position = Position.changeStreamPosition(null, resumeToken, null);
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
noEvent(replicaSetName, ResumeTokens.getDataString(cursor.getResumeToken()));
}
public void noEvent(String replicaSetName, BsonTimestamp timestamp) {
if (timestamp == null) {
return;
}
Position position = Position.changeStreamPosition(timestamp, null, null);
noEvent(replicaSetName, position);
}
private void noEvent(String replicaSetName, String resumeToken) {
if (resumeToken == null) {
return;
}
Position position = Position.changeStreamPosition(null, resumeToken, null);
noEvent(replicaSetName, position);
}
private void noEvent(String replicaSetName, Position position) {
String namespace = "";
long wallTime = 0L;
Position position = Position.changeStreamPosition(timestamp, null, null);
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);