From 2b9236e3188b024be39a5d81402b42750f673bb4 Mon Sep 17 00:00:00 2001 From: ani-sha Date: Mon, 22 Jul 2024 15:18:04 +0530 Subject: [PATCH] DBZ-8022 Pass ChangeEventSourceContext to EventFetcher and reuse the methods --- .../MongoDbStreamingChangeEventSource.java | 8 +-- .../events/BufferingChangeStreamCursor.java | 71 ++++++------------- 2 files changed, 23 insertions(+), 56 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java index d3d761cbe..b6fb5b7ab 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java @@ -101,9 +101,9 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte final SplitEventHandler splitHandler = new SplitEventHandler<>(); final ChangeStreamIterable stream = initChangeStream(client, effectiveOffset); - try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, streamingMetrics, clock).start()) { + try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, context, streamingMetrics, clock).start()) { while (context.isRunning()) { - waitWhenStreamingPaused(context, cursor); + waitWhenStreamingPaused(context); var resumableEvent = cursor.tryNext(); if (resumableEvent == null) { continue; @@ -124,14 +124,12 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte } } - private void waitWhenStreamingPaused(ChangeEventSourceContext context, BufferingChangeStreamCursor cursor) { + private void waitWhenStreamingPaused(ChangeEventSourceContext context) { if (context.isPaused()) { errorHandled(() -> { LOGGER.info("Streaming will now pause"); context.streamingPaused(); - cursor.pause(); context.waitSnapshotCompletion(); - cursor.resume(); LOGGER.info("Streaming resumed"); }); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java index 82aa323d7..fda5eb748 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java @@ -17,9 +17,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.bson.BsonDocument; import org.slf4j.Logger; @@ -37,6 +34,7 @@ import io.debezium.connector.mongodb.MongoDbConnector; import io.debezium.connector.mongodb.MongoDbTaskContext; import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; +import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.util.Clock; import io.debezium.util.DelayStrategy; import io.debezium.util.Threads; @@ -60,9 +58,6 @@ public class BufferingChangeStreamCursor implements MongoChangeStreamCu private final ExecutorService executor; private final DelayStrategy throttler; private BsonDocument lastResumeToken = null; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private volatile boolean paused = false; /** * Combination of change stream event and resume token @@ -137,10 +132,12 @@ public static final class EventFetcher implements Runnable, Closeable { private final MongoDbStreamingChangeEventSourceMetrics metrics; private final Clock clock; private int noMessageIterations = 0; + private final ChangeEventSource.ChangeEventSourceContext context; public EventFetcher(ChangeStreamIterable stream, int capacity, MongoDbStreamingChangeEventSourceMetrics metrics, + ChangeEventSource.ChangeEventSourceContext context, Clock clock, DelayStrategy throttler) { this.stream = stream; @@ -152,14 +149,16 @@ public EventFetcher(ChangeStreamIterable stream, this.cursorRef = new AtomicReference<>(null); this.queue = new ConcurrentLinkedQueue<>(); this.error = new AtomicReference<>(null); + this.context = context; } public EventFetcher(ChangeStreamIterable stream, int capacity, MongoDbStreamingChangeEventSourceMetrics metrics, + ChangeEventSource.ChangeEventSourceContext context, Clock clock, Duration throttleMaxSleep) { - this(stream, capacity, metrics, clock, DelayStrategy.constant(throttleMaxSleep)); + this(stream, capacity, metrics, context, clock, DelayStrategy.constant(throttleMaxSleep)); } /** @@ -248,6 +247,7 @@ private void fetchEvents(MongoChangeStreamCursor> var repeat = false; while (isRunning()) { if (!repeat) { + waitWhenStreamingPaused(context); var maybeEvent = fetchEvent(cursor); if (maybeEvent.isEmpty()) { LOGGER.warn("Resume token not available on this poll"); @@ -273,6 +273,15 @@ private Optional> fetchEvent(MongoChangeStre .or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new)); } + private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException { + if (context.isPaused()) { + LOGGER.info("Buffering change stream cursor paused"); + context.streamingPaused(); + context.waitSnapshotCompletion(); + LOGGER.info("Buffering change stream cursor resumed"); + } + } + private void throttleIfNeeded(ChangeStreamDocument document) { if (document == null) { noMessageIterations++; @@ -286,9 +295,6 @@ private void throttleIfNeeded(ChangeStreamDocument document) { private boolean enqueue(ResumableChangeStreamEvent event) throws InterruptedException { var available = this.capacity.tryAcquire(QUEUE_OFFER_TIMEOUT_MS, TimeUnit.MILLISECONDS); - if (!isRunning()) { - return false; - } if (!available) { LOGGER.warn("Unable to acquire buffer lock, buffer queue is likely full"); return false; @@ -301,12 +307,13 @@ private boolean enqueue(ResumableChangeStreamEvent event) throws Interr public static BufferingChangeStreamCursor fromIterable( ChangeStreamIterable stream, MongoDbTaskContext taskContext, + ChangeEventSource.ChangeEventSourceContext context, MongoDbStreamingChangeEventSourceMetrics metrics, Clock clock) { var config = taskContext.getConnectorConfig(); return new BufferingChangeStreamCursor<>( - new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, clock, config.getPollInterval()), + new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, context, clock, config.getPollInterval()), Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.getServerName(), "replicator-fetcher", 1), config.getPollInterval()); } @@ -336,14 +343,7 @@ public BufferingChangeStreamCursor start() { @Override public ResumableChangeStreamEvent tryNext() { - ResumableChangeStreamEvent event = null; - try { - event = pollWithDelay(); - } - catch (Exception e) { - LOGGER.error("Error while polling for event:", e); - } - + var event = pollWithDelay(); if (event != null) { lastResumeToken = event.resumeToken; } @@ -370,19 +370,9 @@ public ResumableChangeStreamEvent next() { * * @return event or null if not available within time limit */ - private ResumableChangeStreamEvent pollWithDelay() throws InterruptedException { + private ResumableChangeStreamEvent pollWithDelay() { boolean slept; ResumableChangeStreamEvent event; - - lock.lock(); - try { - while (paused) { - condition.await(); - } - } - finally { - lock.unlock(); - } do { event = fetcher.poll(); slept = throttler.sleepWhen(event == null); @@ -391,27 +381,6 @@ private ResumableChangeStreamEvent pollWithDelay() throws InterruptedEx return event; } - public void pause() { - lock.lock(); - try { - paused = true; - } - finally { - lock.unlock(); - } - } - - public void resume() { - lock.lock(); - try { - paused = false; - condition.signalAll(); - } - finally { - lock.unlock(); - } - } - @Override public boolean hasNext() { return !fetcher.isEmpty();