diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java index 10f8054b1..3ac7e7e6a 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java @@ -135,8 +135,8 @@ public static final class EventFetcher implements Runnable, Closeable { private final Clock clock; private int noMessageIterations = 0; private final Lock lock = new ReentrantLock(); - private final Condition snapshotFinished = lock.newCondition(); - private boolean paused; + private final Condition resumed = lock.newCondition(); + private volatile boolean paused; public EventFetcher(ChangeStreamIterable stream, int capacity, @@ -194,23 +194,33 @@ public void close() { running.set(false); } - public void resumeStreaming() { + public boolean isPaused() { + return paused; + } + + public void pause() { + paused = true; + LOGGER.trace("Event buffering will now pause."); + } + + public void resume() { lock.lock(); try { - snapshotFinished.signalAll(); - LOGGER.trace("Streaming will now resume."); + paused = false; + resumed.signalAll(); + LOGGER.trace("Event buffering will now resume."); } finally { lock.unlock(); } } - public void waitSnapshotCompletion() throws InterruptedException { + public void waitIfPaused() throws InterruptedException { lock.lock(); try { while (paused) { - LOGGER.trace("Waiting for snapshot to be completed."); - snapshotFinished.await(); + LOGGER.trace("Waiting until buffering is resumed."); + resumed.await(); } } finally { @@ -255,6 +265,7 @@ public void run() { } catch (InterruptedException e) { LOGGER.error("Fetcher thread interrupted", e); + Thread.currentThread().interrupt(); throw new DebeziumException("Fetcher thread interrupted", e); } catch (Throwable e) { @@ -273,7 +284,7 @@ private void fetchEvents(MongoChangeStreamCursor> while (isRunning()) { if (!repeat) { if (paused) { - waitSnapshotCompletion(); + waitIfPaused(); } var maybeEvent = fetchEvent(cursor); if (maybeEvent.isEmpty()) { @@ -399,12 +410,15 @@ private ResumableChangeStreamEvent pollWithDelay() { } public void resume() { - fetcher.paused = false; - fetcher.resumeStreaming(); + fetcher.resume(); } public void pause() { - fetcher.paused = true; + fetcher.pause(); + } + + public boolean isPaused() { + return fetcher.isPaused(); } @Override