DBZ-7184 Resumable events are now guaranteed to contain resume token

This commit is contained in:
Jakub Cechacek 2023-11-29 18:29:21 +01:00 committed by Jiri Pechanec
parent 9ad6f9c7db
commit 88a64bf3d5
2 changed files with 44 additions and 27 deletions

View File

@ -214,13 +214,9 @@ private void dispatchHeartbeatEvent(
ReplicaSetOffsetContext rsOffsetContext)
throws InterruptedException {
LOGGER.trace("No Change Stream event arrived");
// Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
if (event.resumeToken != null) {
rsOffsetContext.noEvent(event);
dispatcher.dispatchHeartbeatEvent(rsPartition, rsOffsetContext);
}
}
private StreamStatus errorHandled(BlockingRunnable action) {
try {

View File

@ -8,6 +8,7 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -71,16 +72,25 @@ public static final class ResumableChangeStreamEvent<TResult> {
public final BsonDocument resumeToken;
/**
* Creates resumable event
* Creates resumable event from document
*
* @param document change stream event
* @param cursor cursors that produced the event
*/
public ResumableChangeStreamEvent(ChangeStreamDocument<TResult> document, MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) {
this.document = Optional.ofNullable(document);
this.resumeToken = this.document
.map(ChangeStreamDocument::getResumeToken)
.orElseGet(cursor::getResumeToken);
public ResumableChangeStreamEvent(ChangeStreamDocument<TResult> document) {
Objects.requireNonNull(document);
this.document = Optional.of(document);
this.resumeToken = document.getResumeToken();
}
/**
* Creates resumable event from resume token
*
* @param resumeToken resume token
*/
public ResumableChangeStreamEvent(BsonDocument resumeToken) {
Objects.requireNonNull(resumeToken);
this.document = Optional.empty();
this.resumeToken = resumeToken;
}
public boolean isEmpty() {
@ -196,26 +206,37 @@ public void run() {
private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) throws InterruptedException {
while (isRunning()) {
var maybeEvent = fetchEvent(cursor);
if (maybeEvent.isEmpty()) {
LOGGER.warn("Resume token not available on this poll");
continue;
}
enqueue(maybeEvent.get());
}
}
private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) {
var beforeEventPollTime = clock.currentTimeAsInstant();
var event = new ResumableChangeStreamEvent<>(cursor.tryNext(), cursor);
event.document.ifPresent(doc -> LOGGER.trace("Polled Change Stream event: {}", event));
metrics.onSourceEventPolled(event.document.orElse(null), clock, beforeEventPollTime);
var document = cursor.tryNext();
metrics.onSourceEventPolled(document, clock, beforeEventPollTime);
throttleIfNeeded(document);
if (event.hasDocument()) {
enqueue(event);
}
else {
throttleIfNeeded(event);
}
}
// Only create resumable event if we have either document or cursor resume token
// Cursor resume token may be `null` in case of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
return Optional.<ResumableChangeStreamEvent<TResult>> empty()
.or(() -> Optional.ofNullable(document).map(ResumableChangeStreamEvent::new))
.or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new));
}
private void throttleIfNeeded(ResumableChangeStreamEvent<TResult> event) throws InterruptedException {
private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
if (document == null) {
noMessageIterations++;
}
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
enqueue(event);
noMessageIterations = 0;
LOGGER.debug("Sleeping after {} empty polls", noMessageIterations);
throttler.sleepWhen(true);
noMessageIterations = 0;
}
}