From 93e8570fb6e8b6ed97979313f5670ec63de6b92d Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Mon, 27 Nov 2023 13:26:56 +0100 Subject: [PATCH] DBZ-7184 Implementation of buffering change stream cursor --- .../events/BufferingChangeStreamCursor.java | 337 ++++++++++++++++++ ...goDbStreamingChangeEventSourceMetrics.java | 4 +- .../java/io/debezium/util/DelayStrategy.java | 25 +- 3 files changed, 360 insertions(+), 6 deletions(-) create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java new file mode 100644 index 000000000..7a7518370 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.java @@ -0,0 +1,337 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb.events; + +import java.io.Closeable; +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.mongodb.ServerAddress; +import com.mongodb.ServerCursor; +import com.mongodb.client.ChangeStreamIterable; +import com.mongodb.client.MongoChangeStreamCursor; +import com.mongodb.client.model.changestream.ChangeStreamDocument; + +import io.debezium.DebeziumException; +import io.debezium.annotation.Immutable; +import io.debezium.annotation.NotThreadSafe; +import io.debezium.connector.mongodb.MongoDbConnector; +import io.debezium.connector.mongodb.MongoDbTaskContext; +import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics; +import io.debezium.util.Clock; +import io.debezium.util.DelayStrategy; +import io.debezium.util.Threads; + +/** + * An implementation of {@link MongoChangeStreamCursor} which immediately starts consuming available events into a buffer. + *

+ * Internally this cursor starts a {@link EventFetcher} as a separate thread on provided executor. + * Although the implementation is internally thread safe the cursors is not meant to be accessed concurrently from multiple threads. + * + * @param the type of documents the cursor contains + */ +@NotThreadSafe +public class BufferingChangeStreamCursor implements MongoChangeStreamCursor> { + + private static final Logger LOGGER = LoggerFactory.getLogger(BufferingChangeStreamCursor.class); + public static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5; + + private final EventFetcher fetcher; + private final ExecutorService executor; + private final DelayStrategy throttler; + + private BsonDocument lastResumeToken = null; + + /** + * Combination of change stream event and resume token + * + * @param the type of change stream document + */ + @Immutable + public static final class ResumableChangeStreamEvent { + public final Optional> document; + /** + * When {@link #document} is present this field corresponds to {@link ChangeStreamDocument#getResumeToken()} + * Otherwise the value corresponds to the value returned by associated {@link MongoChangeStreamCursor#getResumeToken()} + */ + public final BsonDocument resumeToken; + + /** + * Creates resumable event + * + * @param document change stream event + * @param cursor cursors that produced the event + */ + public ResumableChangeStreamEvent(ChangeStreamDocument document, MongoChangeStreamCursor> cursor) { + this.document = Optional.ofNullable(document); + this.resumeToken = this.document + .map(ChangeStreamDocument::getResumeToken) + .orElseGet(cursor::getResumeToken); + } + + public boolean isEmpty() { + return document.isEmpty(); + } + + public boolean hasDocument() { + return document.isPresent(); + } + + @Override + public String toString() { + return document + .map(ChangeStreamDocument::toString) + .orElseGet(resumeToken::toString); + } + } + + /** + * Runnable responsible for fetching events from {@link ChangeStreamIterable} and buffering them in provided queue; + *

+ * This utilises standard cursors returned by {@link ChangeStreamIterable#cursor()} + * + * @param + */ + public static final class EventFetcher implements Runnable, Closeable { + + private final ChangeStreamIterable stream; + private final Semaphore capacity; + private final Queue> queue; + private final DelayStrategy throttler; + private final AtomicBoolean running; + private final AtomicReference>> cursorRef; + private final MongoDbStreamingChangeEventSourceMetrics metrics; + private final Clock clock; + private int noMessageIterations = 0; + + public EventFetcher(ChangeStreamIterable stream, + int capacity, + MongoDbStreamingChangeEventSourceMetrics metrics, + Clock clock, + DelayStrategy throttler) { + this.stream = stream; + this.capacity = new Semaphore(capacity); + this.metrics = metrics; + this.clock = clock; + this.throttler = throttler; + this.running = new AtomicBoolean(false); + this.cursorRef = new AtomicReference<>(null); + this.queue = new ConcurrentLinkedQueue<>(); + } + + public EventFetcher(ChangeStreamIterable stream, + int capacity, + MongoDbStreamingChangeEventSourceMetrics metrics, + Clock clock, + Duration throttleMaxSleep) { + this(stream, capacity, metrics, clock, DelayStrategy.constant(throttleMaxSleep)); + } + + /** + * Indicates whether event fetching is running and the internal cursor is open + * + * @return true if running, false otherwise + */ + public boolean isRunning() { + return running.get(); + } + + @Override + public void close() { + running.set(false); + } + + public ResumableChangeStreamEvent poll() { + var event = queue.poll(); + if (event != null) { + capacity.release(); + } + return event; + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + /** + * Depending on queue implementation this method may not be reliable + * By default see {@link ConcurrentLinkedQueue#size()} + * + * @return approximate number of elements in queue + */ + public int size() { + return queue.size(); + } + + @Override + public void run() { + try (MongoChangeStreamCursor> cursor = stream.cursor()) { + cursorRef.compareAndSet(null, cursor); + running.set(true); + noMessageIterations = 0; + fetchEvents(cursor); + } + catch (InterruptedException e) { + throw new DebeziumException("Fetcher thread interrupted", e); + } + finally { + cursorRef.set(null); + close(); + } + } + + private void fetchEvents(MongoChangeStreamCursor> cursor) throws InterruptedException { + while (isRunning()) { + var beforeEventPollTime = clock.currentTimeAsInstant(); + var event = new ResumableChangeStreamEvent<>(cursor.tryNext(), cursor); + event.document.ifPresent(doc -> LOGGER.trace("Polled Change Stream event: {}", event)); + metrics.onSourceEventPolled(event.document.orElse(null), clock, beforeEventPollTime); + + if (event.hasDocument()) { + enqueue(event); + } + else { + throttleIfNeeded(event); + } + } + } + + private void throttleIfNeeded(ResumableChangeStreamEvent event) throws InterruptedException { + noMessageIterations++; + if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) { + enqueue(event); + noMessageIterations = 0; + throttler.sleepWhen(true); + } + } + + private void enqueue(ResumableChangeStreamEvent event) throws InterruptedException { + this.capacity.acquire(); + queue.offer(event); + } + } + + public static BufferingChangeStreamCursor fromIterable( + ChangeStreamIterable stream, + MongoDbTaskContext taskContext, + MongoDbStreamingChangeEventSourceMetrics metrics, + Clock clock) { + var config = taskContext.getConnectorConfig(); + + return new BufferingChangeStreamCursor<>( + new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, clock, config.getPollInterval()), + Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.serverName(), "replicator-buffer", 1), + config.getPollInterval()); + } + + /** + * Creates new prefetching cursor + * + * @param fetcher MongoDB change event fetcher + * @param executor executor used to dispatch buffering thread + * @param throttler throttling mechanism + */ + public BufferingChangeStreamCursor(EventFetcher fetcher, ExecutorService executor, DelayStrategy throttler) { + this.fetcher = fetcher; + this.executor = executor; + this.throttler = throttler; + } + + public BufferingChangeStreamCursor(EventFetcher fetcher, ExecutorService executor, Duration throttleMaxSleep) { + this(fetcher, executor, DelayStrategy.boundedExponential(Duration.ofMillis(1), throttleMaxSleep, 2)); + } + + public BufferingChangeStreamCursor start() { + executor.submit(fetcher); + return this; + } + + @Override + public ResumableChangeStreamEvent tryNext() { + var event = pollWithDelay(); + if (event != null) { + lastResumeToken = event.resumeToken; + } + return event; + } + + /** + * Returns next event in buffer. + * Not that unlike other Mongo implementation this method does not block + * + * @throws NoSuchElementException when no element is available + * @return event + */ + @Override + public ResumableChangeStreamEvent next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return tryNext(); + } + + /** + * Repeatedly polling fetcher for new event, exponentially waiting between polls until limit is reached + * + * @return event or null if not available within time limit + */ + private ResumableChangeStreamEvent pollWithDelay() { + boolean slept; + ResumableChangeStreamEvent event; + + do { + event = fetcher.poll(); + slept = throttler.sleepWhen(event == null); + } while (slept); + + return event; + } + + @Override + public boolean hasNext() { + return !fetcher.isEmpty(); + } + + /** + * See {@link EventFetcher#size()} + */ + @Override + public int available() { + return fetcher.size(); + } + + @Override + public BsonDocument getResumeToken() { + return lastResumeToken; + } + + @Override + public ServerCursor getServerCursor() { + return fetcher.cursorRef.get().getServerCursor(); + } + + @Override + public ServerAddress getServerAddress() { + return fetcher.cursorRef.get().getServerAddress(); + } + + @Override + public void close() { + fetcher.close(); + executor.shutdown(); + } +} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbStreamingChangeEventSourceMetrics.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbStreamingChangeEventSourceMetrics.java index 5fe0692db..ef5bcdbd2 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbStreamingChangeEventSourceMetrics.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/metrics/MongoDbStreamingChangeEventSourceMetrics.java @@ -9,8 +9,6 @@ import java.time.Instant; import java.util.concurrent.atomic.AtomicLong; -import org.bson.BsonDocument; - import com.mongodb.client.model.changestream.ChangeStreamDocument; import io.debezium.annotation.ThreadSafe; @@ -72,7 +70,7 @@ public long getNumberOfEmptyPolls() { return numberOfEmptyPolls.get(); } - public void onSourceEventPolled(ChangeStreamDocument event, Clock clock, Instant prePollTimestamp) { + public void onSourceEventPolled(ChangeStreamDocument event, Clock clock, Instant prePollTimestamp) { var now = clock.currentTimeAsInstant(); var duration = Duration.between(prePollTimestamp, now).toMillis(); diff --git a/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java b/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java index bb99cfe49..2dccbc066 100644 --- a/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java +++ b/debezium-core/src/main/java/io/debezium/util/DelayStrategy.java @@ -114,16 +114,31 @@ static DelayStrategy exponential(Duration initialDelay, Duration maxDelay) { return exponential(initialDelay, maxDelay, 2.0); } + /** + * Same as {@link #exponential(Duration, Duration, double, boolean)} with {@code bounded} parameter set to {@code false} + */ + static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier) { + return exponential(initialDelay, maxDelay, backOffMultiplier, false); + } + + /** + * Same as {@link #exponential(Duration, Duration, double, boolean)} with {@code bounded} parameter set to {@code true} + */ + static DelayStrategy boundedExponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier) { + return exponential(initialDelay, maxDelay, backOffMultiplier, true); + } + /** * Create a delay strategy that applies an exponentially-increasing delay as long as the criteria is met. As soon as - * the criteria is not met, the delay resets to zero. + * the criteria is not met, the delay resets to the intial value. * * @param initialDelay the initial delay; must be positive * @param maxDelay the maximum delay; must be greater than the initial delay * @param backOffMultiplier the factor by which the delay increases each pass + * @param bounded if true the delay resets also when maximum delay was reached * @return the strategy */ - static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier) { + static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, double backOffMultiplier, boolean bounded) { final long initialDelayInMilliseconds = initialDelay.toMillis(); final long maxDelayInMilliseconds = maxDelay.toMillis(); if (backOffMultiplier <= 1.0) { @@ -140,7 +155,7 @@ static DelayStrategy exponential(Duration initialDelay, Duration maxDelay, doubl @Override public boolean sleepWhen(boolean criteria) { - if (!criteria) { + if (!criteria || maxDelayedReached()) { // Don't sleep ... previousDelay = 0; return false; @@ -162,6 +177,10 @@ public boolean sleepWhen(boolean criteria) { } return true; } + + private boolean maxDelayedReached() { + return bounded && previousDelay >= maxDelayInMilliseconds; + } }; } }