DBZ-8022 Pass ChangeEventSourceContext to EventFetcher and reuse the methods

This commit is contained in:
ani-sha 2024-07-22 15:18:04 +05:30 committed by Chris Cranford
parent 5380fe3a13
commit 2b9236e318
2 changed files with 23 additions and 56 deletions

View File

@ -101,9 +101,9 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte
final SplitEventHandler<BsonDocument> splitHandler = new SplitEventHandler<>();
final ChangeStreamIterable<BsonDocument> stream = initChangeStream(client, effectiveOffset);
try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, streamingMetrics, clock).start()) {
try (var cursor = BufferingChangeStreamCursor.fromIterable(stream, taskContext, context, streamingMetrics, clock).start()) {
while (context.isRunning()) {
waitWhenStreamingPaused(context, cursor);
waitWhenStreamingPaused(context);
var resumableEvent = cursor.tryNext();
if (resumableEvent == null) {
continue;
@ -124,14 +124,12 @@ private void readChangeStream(MongoClient client, ChangeEventSourceContext conte
}
}
private void waitWhenStreamingPaused(ChangeEventSourceContext context, BufferingChangeStreamCursor cursor) {
private void waitWhenStreamingPaused(ChangeEventSourceContext context) {
if (context.isPaused()) {
errorHandled(() -> {
LOGGER.info("Streaming will now pause");
context.streamingPaused();
cursor.pause();
context.waitSnapshotCompletion();
cursor.resume();
LOGGER.info("Streaming resumed");
});
}

View File

@ -17,9 +17,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.bson.BsonDocument;
import org.slf4j.Logger;
@ -37,6 +34,7 @@
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Threads;
@ -60,9 +58,6 @@ public class BufferingChangeStreamCursor<TResult> implements MongoChangeStreamCu
private final ExecutorService executor;
private final DelayStrategy throttler;
private BsonDocument lastResumeToken = null;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile boolean paused = false;
/**
* Combination of change stream event and resume token
@ -137,10 +132,12 @@ public static final class EventFetcher<TResult> implements Runnable, Closeable {
private final MongoDbStreamingChangeEventSourceMetrics metrics;
private final Clock clock;
private int noMessageIterations = 0;
private final ChangeEventSource.ChangeEventSourceContext context;
public EventFetcher(ChangeStreamIterable<TResult> stream,
int capacity,
MongoDbStreamingChangeEventSourceMetrics metrics,
ChangeEventSource.ChangeEventSourceContext context,
Clock clock,
DelayStrategy throttler) {
this.stream = stream;
@ -152,14 +149,16 @@ public EventFetcher(ChangeStreamIterable<TResult> stream,
this.cursorRef = new AtomicReference<>(null);
this.queue = new ConcurrentLinkedQueue<>();
this.error = new AtomicReference<>(null);
this.context = context;
}
public EventFetcher(ChangeStreamIterable<TResult> stream,
int capacity,
MongoDbStreamingChangeEventSourceMetrics metrics,
ChangeEventSource.ChangeEventSourceContext context,
Clock clock,
Duration throttleMaxSleep) {
this(stream, capacity, metrics, clock, DelayStrategy.constant(throttleMaxSleep));
this(stream, capacity, metrics, context, clock, DelayStrategy.constant(throttleMaxSleep));
}
/**
@ -248,6 +247,7 @@ private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>>
var repeat = false;
while (isRunning()) {
if (!repeat) {
waitWhenStreamingPaused(context);
var maybeEvent = fetchEvent(cursor);
if (maybeEvent.isEmpty()) {
LOGGER.warn("Resume token not available on this poll");
@ -273,6 +273,15 @@ private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStre
.or(() -> Optional.ofNullable(cursor.getResumeToken()).map(ResumableChangeStreamEvent::new));
}
private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
if (context.isPaused()) {
LOGGER.info("Buffering change stream cursor paused");
context.streamingPaused();
context.waitSnapshotCompletion();
LOGGER.info("Buffering change stream cursor resumed");
}
}
private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
if (document == null) {
noMessageIterations++;
@ -286,9 +295,6 @@ private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
private boolean enqueue(ResumableChangeStreamEvent<TResult> event) throws InterruptedException {
var available = this.capacity.tryAcquire(QUEUE_OFFER_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!isRunning()) {
return false;
}
if (!available) {
LOGGER.warn("Unable to acquire buffer lock, buffer queue is likely full");
return false;
@ -301,12 +307,13 @@ private boolean enqueue(ResumableChangeStreamEvent<TResult> event) throws Interr
public static <TResult> BufferingChangeStreamCursor<TResult> fromIterable(
ChangeStreamIterable<TResult> stream,
MongoDbTaskContext taskContext,
ChangeEventSource.ChangeEventSourceContext context,
MongoDbStreamingChangeEventSourceMetrics metrics,
Clock clock) {
var config = taskContext.getConnectorConfig();
return new BufferingChangeStreamCursor<>(
new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, clock, config.getPollInterval()),
new EventFetcher<>(stream, config.getMaxBatchSize(), metrics, context, clock, config.getPollInterval()),
Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.getServerName(), "replicator-fetcher", 1),
config.getPollInterval());
}
@ -336,14 +343,7 @@ public BufferingChangeStreamCursor<TResult> start() {
@Override
public ResumableChangeStreamEvent<TResult> tryNext() {
ResumableChangeStreamEvent<TResult> event = null;
try {
event = pollWithDelay();
}
catch (Exception e) {
LOGGER.error("Error while polling for event:", e);
}
var event = pollWithDelay();
if (event != null) {
lastResumeToken = event.resumeToken;
}
@ -370,19 +370,9 @@ public ResumableChangeStreamEvent<TResult> next() {
*
* @return event or null if not available within time limit
*/
private ResumableChangeStreamEvent<TResult> pollWithDelay() throws InterruptedException {
private ResumableChangeStreamEvent<TResult> pollWithDelay() {
boolean slept;
ResumableChangeStreamEvent<TResult> event;
lock.lock();
try {
while (paused) {
condition.await();
}
}
finally {
lock.unlock();
}
do {
event = fetcher.poll();
slept = throttler.sleepWhen(event == null);
@ -391,27 +381,6 @@ private ResumableChangeStreamEvent<TResult> pollWithDelay() throws InterruptedEx
return event;
}
public void pause() {
lock.lock();
try {
paused = true;
}
finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
paused = false;
condition.signalAll();
}
finally {
lock.unlock();
}
}
@Override
public boolean hasNext() {
return !fetcher.isEmpty();