DBZ-8022 Cleaned up the pause/resume API of BufferingChangeStreamCursor

This commit is contained in:
Jakub Cechacek 2024-07-24 10:07:38 +02:00 committed by Chris Cranford
parent 6de3e9fd3b
commit d1f1792b69

View File

@ -135,8 +135,8 @@ public static final class EventFetcher<TResult> implements Runnable, Closeable {
private final Clock clock; private final Clock clock;
private int noMessageIterations = 0; private int noMessageIterations = 0;
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private final Condition snapshotFinished = lock.newCondition(); private final Condition resumed = lock.newCondition();
private boolean paused; private volatile boolean paused;
public EventFetcher(ChangeStreamIterable<TResult> stream, public EventFetcher(ChangeStreamIterable<TResult> stream,
int capacity, int capacity,
@ -194,23 +194,33 @@ public void close() {
running.set(false); running.set(false);
} }
public void resumeStreaming() { public boolean isPaused() {
return paused;
}
public void pause() {
paused = true;
LOGGER.trace("Event buffering will now pause.");
}
public void resume() {
lock.lock(); lock.lock();
try { try {
snapshotFinished.signalAll(); paused = false;
LOGGER.trace("Streaming will now resume."); resumed.signalAll();
LOGGER.trace("Event buffering will now resume.");
} }
finally { finally {
lock.unlock(); lock.unlock();
} }
} }
public void waitSnapshotCompletion() throws InterruptedException { public void waitIfPaused() throws InterruptedException {
lock.lock(); lock.lock();
try { try {
while (paused) { while (paused) {
LOGGER.trace("Waiting for snapshot to be completed."); LOGGER.trace("Waiting until buffering is resumed.");
snapshotFinished.await(); resumed.await();
} }
} }
finally { finally {
@ -255,6 +265,7 @@ public void run() {
} }
catch (InterruptedException e) { catch (InterruptedException e) {
LOGGER.error("Fetcher thread interrupted", e); LOGGER.error("Fetcher thread interrupted", e);
Thread.currentThread().interrupt();
throw new DebeziumException("Fetcher thread interrupted", e); throw new DebeziumException("Fetcher thread interrupted", e);
} }
catch (Throwable e) { catch (Throwable e) {
@ -273,7 +284,7 @@ private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>>
while (isRunning()) { while (isRunning()) {
if (!repeat) { if (!repeat) {
if (paused) { if (paused) {
waitSnapshotCompletion(); waitIfPaused();
} }
var maybeEvent = fetchEvent(cursor); var maybeEvent = fetchEvent(cursor);
if (maybeEvent.isEmpty()) { if (maybeEvent.isEmpty()) {
@ -399,12 +410,15 @@ private ResumableChangeStreamEvent<TResult> pollWithDelay() {
} }
public void resume() { public void resume() {
fetcher.paused = false; fetcher.resume();
fetcher.resumeStreaming();
} }
public void pause() { public void pause() {
fetcher.paused = true; fetcher.pause();
}
public boolean isPaused() {
return fetcher.isPaused();
} }
@Override @Override