DBZ-8022 Correctly handle pausing of fetcher thread

This commit is contained in:
ani-sha 2024-07-23 19:29:36 +05:30 committed by Chris Cranford
parent 2b9236e318
commit 6de3e9fd3b
2 changed files with 49 additions and 21 deletions

View File

@ -101,9 +101,9 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte
final SplitEventHandler<BsonDocument> splitHandler = new SplitEventHandler<>(); final SplitEventHandler<BsonDocument> splitHandler = new SplitEventHandler<>();
final ChangeStreamIterable<BsonDocument> stream = initChangeStream(client, effectiveOffset); final ChangeStreamIterable<BsonDocument> stream = initChangeStream(client, effectiveOffset);
try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, context, streamingMetrics, clock).start()) { try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, streamingMetrics, clock).start()) {
while (context.isRunning()) { while (context.isRunning()) {
waitWhenStreamingPaused(context); waitWhenStreamingPaused(context, cursor);
var resumableEvent = cursor.tryNext(); var resumableEvent = cursor.tryNext();
if (resumableEvent == null) { if (resumableEvent == null) {
continue; continue;
@ -124,12 +124,14 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte
} }
} }
private void waitWhenStreamingPaused(ChangeEventSourceContext context) { private void waitWhenStreamingPaused(ChangeEventSourceContext context, BufferingChangeStreamCursor cursor) {
if (context.isPaused()) { if (context.isPaused()) {
errorHandled(() -> { errorHandled(() -> {
LOGGER.info("Streaming will now pause"); LOGGER.info("Streaming will now pause");
cursor.pause();
context.streamingPaused(); context.streamingPaused();
context.waitSnapshotCompletion(); context.waitSnapshotCompletion();
cursor.resume();
LOGGER.info("Streaming resumed"); LOGGER.info("Streaming resumed");
}); });
} }

View File

@ -17,6 +17,9 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.bson.BsonDocument; import org.bson.BsonDocument;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -34,7 +37,6 @@
import io.debezium.connector.mongodb.MongoDbConnector; import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbTaskContext; import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy; import io.debezium.util.DelayStrategy;
import io.debezium.util.Threads; import io.debezium.util.Threads;
@ -132,12 +134,13 @@ public static final class EventFetcher<TResult> implements Runnable, Closeable {
private final MongoDbStreamingChangeEventSourceMetrics metrics; private final MongoDbStreamingChangeEventSourceMetrics metrics;
private final Clock clock; private final Clock clock;
private int noMessageIterations = 0; private int noMessageIterations = 0;
private final ChangeEventSource.ChangeEventSourceContext context; private final Lock lock = new ReentrantLock();
private final Condition snapshotFinished = lock.newCondition();
private boolean paused;
public EventFetcher(ChangeStreamIterable<TResult> stream, public EventFetcher(ChangeStreamIterable<TResult> stream,
int capacity, int capacity,
MongoDbStreamingChangeEventSourceMetrics metrics, MongoDbStreamingChangeEventSourceMetrics metrics,
ChangeEventSource.ChangeEventSourceContext context,
Clock clock, Clock clock,
DelayStrategy throttler) { DelayStrategy throttler) {
this.stream = stream; this.stream = stream;
@ -149,16 +152,14 @@ public EventFetcher(ChangeStreamIterable<TResult> stream,
this.cursorRef = new AtomicReference<>(null); this.cursorRef = new AtomicReference<>(null);
this.queue = new ConcurrentLinkedQueue<>(); this.queue = new ConcurrentLinkedQueue<>();
this.error = new AtomicReference<>(null); this.error = new AtomicReference<>(null);
this.context = context;
} }
public EventFetcher(ChangeStreamIterable<TResult> stream, public EventFetcher(ChangeStreamIterable<TResult> stream,
int capacity, int capacity,
MongoDbStreamingChangeEventSourceMetrics metrics, MongoDbStreamingChangeEventSourceMetrics metrics,
ChangeEventSource.ChangeEventSourceContext context,
Clock clock, Clock clock,
Duration throttleMaxSleep) { Duration throttleMaxSleep) {
this(stream, capacity, metrics, context, clock, DelayStrategy.constant(throttleMaxSleep)); this(stream, capacity, metrics, clock, DelayStrategy.constant(throttleMaxSleep));
} }
/** /**
@ -193,6 +194,30 @@ public void close() {
running.set(false); running.set(false);
} }
public void resumeStreaming() {
lock.lock();
try {
snapshotFinished.signalAll();
LOGGER.trace("Streaming will now resume.");
}
finally {
lock.unlock();
}
}
public void waitSnapshotCompletion() throws InterruptedException {
lock.lock();
try {
while (paused) {
LOGGER.trace("Waiting for snapshot to be completed.");
snapshotFinished.await();
}
}
finally {
lock.unlock();
}
}
public ResumableChangeStreamEvent<TResult> poll() { public ResumableChangeStreamEvent<TResult> poll() {
var event = queue.poll(); var event = queue.poll();
if (event == null) { if (event == null) {
@ -247,7 +272,9 @@ private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>>
var repeat = false; var repeat = false;
while (isRunning()) { while (isRunning()) {
if (!repeat) { if (!repeat) {
waitWhenStreamingPaused(context); if (paused) {
waitSnapshotCompletion();
}
var maybeEvent = fetchEvent(cursor); var maybeEvent = fetchEvent(cursor);
if (maybeEvent.isEmpty()) { if (maybeEvent.isEmpty()) {
LOGGER.warn("Resume token not available on this poll"); LOGGER.warn("Resume token not available on this poll");
@ -273,15 +300,6 @@ private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStre
.or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new)); .or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new));
} }
private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
if (context.isPaused()) {
LOGGER.info("Buffering change stream cursor paused");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Buffering change stream cursor resumed");
}
}
private void throttleIfNeeded(ChangeStreamDocument<TResult> document) { private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
if (document == null) { if (document == null) {
noMessageIterations++; noMessageIterations++;
@ -307,13 +325,12 @@ private boolean enqueue(ResumableChangeStreamEvent<TResult> event) throws Interr
public static <TResult> BufferingChangeStreamCursor<TResult> fromIterable( public static <TResult> BufferingChangeStreamCursor<TResult> fromIterable(
ChangeStreamIterable<TResult> stream, ChangeStreamIterable<TResult> stream,
MongoDbTaskContext taskContext, MongoDbTaskContext taskContext,
ChangeEventSource.ChangeEventSourceContext context,
MongoDbStreamingChangeEventSourceMetrics metrics, MongoDbStreamingChangeEventSourceMetrics metrics,
Clock clock) { Clock clock) {
var config = taskContext.getConnectorConfig(); var config = taskContext.getConnectorConfig();
return new BufferingChangeStreamCursor<>( return new BufferingChangeStreamCursor<>(
new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, context, clock, config.getPollInterval()), new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, clock, config.getPollInterval()),
Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.getServerName(), "replicator-fetcher", 1), Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.getServerName(), "replicator-fetcher", 1),
config.getPollInterval()); config.getPollInterval());
} }
@ -381,6 +398,15 @@ private ResumableChangeStreamEvent<TResult> pollWithDelay() {
return event; return event;
} }
public void resume() {
fetcher.paused = false;
fetcher.resumeStreaming();
}
public void pause() {
fetcher.paused = true;
}
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return !fetcher.isEmpty(); return !fetcher.isEmpty();