DBZ-6421 Ability to recover from cursor which doesn't return resume token when there is no change document (e.g. in case of DocumentDB connection)

This commit is contained in:
jcechace 2023-05-09 17:32:33 +02:00 committed by Jiri Pechanec
parent 5dfe52dbc4
commit 2de3f0d9a4
5 changed files with 74 additions and 27 deletions

View File

@ -198,22 +198,24 @@ private boolean isSnapshotExpected(MongoDbPartition partition, ReplicaSet replic
} }
private boolean isValidResumeToken(MongoDbPartition partition, ReplicaSet replicaSet, BsonDocument token) { private boolean isValidResumeToken(MongoDbPartition partition, ReplicaSet replicaSet, BsonDocument token) {
try { if (token == null) {
try (MongoDbConnection mongo = connections.get(replicaSet, partition)) { return false;
return mongo.execute("Checking change stream", client -> { }
ChangeStreamIterable<BsonDocument> stream = client.watch(BsonDocument.class);
stream.resumeAfter(token);
try (var ignored = stream.cursor()) { try (MongoDbConnection mongo = connections.get(replicaSet, partition)) {
LOGGER.info("Valid resume token present for replica set '{}, so no snapshot will be performed'", replicaSet.replicaSetName()); return mongo.execute("Checking change stream", client -> {
return false; ChangeStreamIterable<BsonDocument> stream = client.watch(BsonDocument.class);
} stream.resumeAfter(token);
catch (MongoCommandException | MongoChangeStreamException e) {
LOGGER.info("Invalid resume token present for replica set '{}, snapshot will be performed'", replicaSet.replicaSetName()); try (var ignored = stream.cursor()) {
return true; LOGGER.info("Valid resume token present for replica set '{}, so no snapshot will be performed'", replicaSet.replicaSetName());
} return false;
}); }
} catch (MongoCommandException | MongoChangeStreamException e) {
LOGGER.info("Invalid resume token present for replica set '{}, snapshot will be performed'", replicaSet.replicaSetName());
return true;
}
});
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new DebeziumException("Interrupted while creating snapshotting task", e); throw new DebeziumException("Interrupted while creating snapshotting task", e);
@ -237,6 +239,7 @@ private void initReplicaSetSnapshotStartOffsets(MongoDbSnapshotContext snapshotC
try (MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor = stream.cursor()) { try (MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor = stream.cursor()) {
rsOffsetCtx.initEvent(cursor); rsOffsetCtx.initEvent(cursor);
} }
rsOffsetCtx.initFromOpTimeIfNeeded(client);
}); });
} }

View File

@ -158,6 +158,10 @@ private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeE
doc.put("_data", new BsonString(rsOffsetContext.lastResumeToken())); doc.put("_data", new BsonString(rsOffsetContext.lastResumeToken()));
rsChangeStream.resumeAfter(doc); rsChangeStream.resumeAfter(doc);
} }
else if (rsOffsetContext.lastTimestamp() != null) {
LOGGER.info("Resuming streaming from operation time '{}'", rsOffsetContext.lastTimestamp());
rsChangeStream.startAtOperationTime(rsOffsetContext.lastTimestamp());
}
if (connectorConfig.getCursorMaxAwaitTime() > 0) { if (connectorConfig.getCursorMaxAwaitTime() > 0) {
rsChangeStream.maxAwaitTime(connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS); rsChangeStream.maxAwaitTime(connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);

View File

@ -10,8 +10,13 @@
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.bson.BsonDocument; import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.MongoChangeStreamCursor; import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.annotation.ThreadSafe; import io.debezium.annotation.ThreadSafe;
@ -34,6 +39,8 @@
@ThreadSafe @ThreadSafe
public class ReplicaSetOffsetContext extends CommonOffsetContext<SourceInfo> { public class ReplicaSetOffsetContext extends CommonOffsetContext<SourceInfo> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaSetOffsetContext.class);
private final MongoDbOffsetContext offsetContext; private final MongoDbOffsetContext offsetContext;
private final String replicaSetName; private final String replicaSetName;
private final IncrementalSnapshotContext<CollectionId> incrementalSnapshotContext; private final IncrementalSnapshotContext<CollectionId> incrementalSnapshotContext;
@ -106,6 +113,19 @@ public void initEvent(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>
sourceInfo.initEvent(replicaSetName, cursor); sourceInfo.initEvent(replicaSetName, cursor);
} }
public void initFromOpTimeIfNeeded(MongoClient client) {
if (lastResumeToken() != null) {
return;
}
LOGGER.info("Initializing offset for replica-set {} from operation time", replicaSetName);
var database = client.getDatabase("admin");
var result = database.runCommand(new Document("ping", 1), BsonDocument.class);
var timestamp = result.getTimestamp("operationTime");
sourceInfo.noEvent(replicaSetName, timestamp);
}
public void noEvent(MongoChangeStreamCursor<?> cursor) { public void noEvent(MongoChangeStreamCursor<?> cursor) {
sourceInfo.noEvent(replicaSetName, cursor); sourceInfo.noEvent(replicaSetName, cursor);
} }
@ -123,6 +143,10 @@ public BsonDocument lastResumeTokenDoc() {
return (data == null) ? null : ResumeTokens.fromData(data); return (data == null) ? null : ResumeTokens.fromData(data);
} }
public BsonTimestamp lastTimestamp() {
return sourceInfo.lastTimestamp(replicaSetName);
}
@Override @Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() { public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return offsetContext.getIncrementalSnapshotContext(); return offsetContext.getIncrementalSnapshotContext();

View File

@ -124,11 +124,11 @@ public BsonTimestamp getTimestamp() {
} }
public int getTime() { public int getTime() {
return this.ts.getTime(); return (this.ts != null) ? this.ts.getTime() : 0;
} }
public int getInc() { public int getInc() {
return this.ts.getInc(); return (this.ts != null) ? this.ts.getInc() : -1;
} }
public SessionTransactionId getChangeStreamSessionTxnId() { public SessionTransactionId getChangeStreamSessionTxnId() {
@ -196,6 +196,11 @@ public String lastResumeToken(String replicaSetName) {
return existing != null ? existing.resumeToken : null; return existing != null ? existing.resumeToken : null;
} }
public BsonTimestamp lastTimestamp(String replicaSetName) {
Position existing = positionsByReplicaSetName.get(replicaSetName);
return existing != null ? existing.getTimestamp() : null;
}
/** /**
* Get the Kafka Connect detail about the source "offset" for the named database, which describes the given position in the * Get the Kafka Connect detail about the source "offset" for the named database, which describes the given position in the
* database where we have last read. If the database has not yet been seen, this records the starting position * database where we have last read. If the database has not yet been seen, this records the starting position
@ -268,9 +273,21 @@ public void noEvent(String replicaSetName, MongoChangeStreamCursor<?> cursor) {
String namespace = ""; String namespace = "";
long wallTime = 0L; long wallTime = 0L;
BsonTimestamp ts = ResumeTokens.getTimestamp(cursor.getResumeToken());
String resumeToken = ResumeTokens.getDataString(cursor.getResumeToken()); String resumeToken = ResumeTokens.getDataString(cursor.getResumeToken());
Position position = Position.changeStreamPosition(ts, resumeToken, null); Position position = Position.changeStreamPosition(null, resumeToken, null);
positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
}
public void noEvent(String replicaSetName, BsonTimestamp timestamp) {
if (timestamp == null) {
return;
}
String namespace = "";
long wallTime = 0L;
Position position = Position.changeStreamPosition(timestamp, null, null);
positionsByReplicaSetName.put(replicaSetName, position); positionsByReplicaSetName.put(replicaSetName, position);
onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime); onEvent(replicaSetName, CollectionId.parse(replicaSetName, namespace), position, wallTime);
@ -282,8 +299,7 @@ public void changeStreamEvent(String replicaSetName, ChangeStreamDocument<BsonDo
long wallTime = 0L; long wallTime = 0L;
if (changeStreamEvent != null) { if (changeStreamEvent != null) {
String resumeToken = ResumeTokens.getDataString(changeStreamEvent.getResumeToken()); String resumeToken = ResumeTokens.getDataString(changeStreamEvent.getResumeToken());
// > Decode timestamp from resume token to be consistent with other events BsonTimestamp ts = changeStreamEvent.getClusterTime();
BsonTimestamp ts = ResumeTokens.getTimestamp(changeStreamEvent.getResumeToken());
position = Position.changeStreamPosition(ts, resumeToken, MongoUtil.getChangeStreamSessionTransactionId(changeStreamEvent)); position = Position.changeStreamPosition(ts, resumeToken, MongoUtil.getChangeStreamSessionTransactionId(changeStreamEvent));
namespace = changeStreamEvent.getNamespace().getFullName(); namespace = changeStreamEvent.getNamespace().getFullName();
if (changeStreamEvent.getWallTime() != null) { if (changeStreamEvent.getWallTime() != null) {

View File

@ -122,16 +122,16 @@ public void assertSourceInfoContents(SourceInfo source,
assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(hasOffset); assertThat(source.hasOffset(REPLICA_SET_NAME)).isEqualTo(hasOffset);
Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME); Map<String, ?> offset = source.lastOffset(REPLICA_SET_NAME);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo(timestamp.getTime()); assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo((timestamp != null) ? timestamp.getTime() : 0);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo(timestamp.getInc()); assertThat(offset.get(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
String resumeToken = source.lastResumeToken(REPLICA_SET_NAME); String resumeToken = source.lastResumeToken(REPLICA_SET_NAME);
assertThat(resumeToken).isEqualTo(resumeTokenData); assertThat(resumeToken).isEqualTo(resumeTokenData);
source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L); source.collectionEvent(REPLICA_SET_NAME, new CollectionId(REPLICA_SET_NAME, "test", "names"), 0L);
Struct struct = source.struct(); Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(timestamp.getTime() * 1000L); assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo((timestamp != null) ? timestamp.getTime() * 1000L : 0L);
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo(timestamp.getInc()); assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("test"); assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("test");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("names"); assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("names");
assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME); assertThat(struct.getString(SourceInfo.REPLICA_SET_NAME)).isEqualTo(REPLICA_SET_NAME);
@ -168,7 +168,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaName() {
@Test @Test
public void shouldReturnRecordedOffsetForUsedReplicaNameWithoutEvent() { public void shouldReturnRecordedOffsetForUsedReplicaNameWithoutEvent() {
var cursor = mockNoEventChangeStreamCursor(); var cursor = mockNoEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, CURSOR_TIMESTAMP, null); assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, null);
} }
@Test @Test
@ -190,7 +190,7 @@ public void shouldReturnRecordedOffsetForUsedReplicaNameDuringInitialSyncWithout
source.startInitialSync(REPLICA_SET_NAME); source.startInitialSync(REPLICA_SET_NAME);
var cursor = mockNoEventChangeStreamCursor(); var cursor = mockNoEventChangeStreamCursor();
assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, CURSOR_TIMESTAMP, "true"); assertSourceInfoContents(source, cursor, CURSOR_RESUME_TOKEN_DATA, null, "true");
} }
@Test @Test