DBZ-7184 buffer lock is now acquired with timeout and reattempted on failure
This commit is contained in:
parent
88a64bf3d5
commit
355041dbaf
@ -14,6 +14,7 @@
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@ -118,6 +119,8 @@ public String toString() {
|
||||
*/
|
||||
public static final class EventFetcher<TResult> implements Runnable, Closeable {
|
||||
|
||||
public static final long QUEUE_OFFER_TIMEOUT_MS = 100;
|
||||
|
||||
private final ChangeStreamIterable<TResult> stream;
|
||||
private final Semaphore capacity;
|
||||
private final Queue<ResumableChangeStreamEvent<TResult>> queue;
|
||||
@ -205,13 +208,18 @@ public void run() {
|
||||
}
|
||||
|
||||
private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor) throws InterruptedException {
|
||||
ResumableChangeStreamEvent<TResult> lastEvent = null;
|
||||
var repeat = false;
|
||||
while (isRunning()) {
|
||||
var maybeEvent = fetchEvent(cursor);
|
||||
if (maybeEvent.isEmpty()) {
|
||||
LOGGER.warn("Resume token not available on this poll");
|
||||
continue;
|
||||
if (!repeat) {
|
||||
var maybeEvent = fetchEvent(cursor);
|
||||
if (maybeEvent.isEmpty()) {
|
||||
LOGGER.warn("Resume token not available on this poll");
|
||||
continue;
|
||||
}
|
||||
lastEvent = maybeEvent.get();
|
||||
}
|
||||
enqueue(maybeEvent.get());
|
||||
repeat = !enqueue(lastEvent);
|
||||
}
|
||||
}
|
||||
|
||||
@ -240,9 +248,14 @@ private void throttleIfNeeded(ChangeStreamDocument<TResult> document) {
|
||||
}
|
||||
}
|
||||
|
||||
private void enqueue(ResumableChangeStreamEvent<TResult> event) throws InterruptedException {
|
||||
this.capacity.acquire();
|
||||
queue.offer(event);
|
||||
private boolean enqueue(ResumableChangeStreamEvent<TResult> event) throws InterruptedException {
|
||||
var available = this.capacity.tryAcquire(QUEUE_OFFER_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
if (!available) {
|
||||
LOGGER.warn("Unable to acquire buffer lock, buffer queue is likely full");
|
||||
return false;
|
||||
}
|
||||
// always true
|
||||
return queue.offer(event);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user