DBZ-7146 Configurable inactivity pause duration in MongoDB connector

This commit is contained in:
Jakub Cechacek 2023-11-16 09:32:43 +01:00 committed by Jiri Pechanec
parent 529be44712
commit 17464f5fe2

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -37,7 +36,7 @@
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Threads;
/**
@ -47,6 +46,8 @@ public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSo
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
private static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5;
private final MongoDbConnectorConfig connectorConfig;
private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
private final ErrorHandler errorHandler;
@ -176,13 +177,16 @@ else if (rsOffsetContext.lastTimestamp() != null) {
// In Replicator, this used cursor.hasNext() but this is a blocking call and I observed that this can
// delay the shutdown of the connector by up to 15 seconds or longer. By introducing a Metronome, we
// can respond to the stop request much faster and without much overhead.
Metronome pause = Metronome.sleeper(Duration.ofMillis(500), clock);
DelayStrategy pause = DelayStrategy.constant(connectorConfig.getPollInterval());
int noMessageIterations = 0;
while (context.isRunning()) {
// Use tryNext which will return null if no document is yet available from the cursor.
// In this situation if not document is available, we'll pause.
ChangeStreamDocument<BsonDocument> event = cursor.tryNext();
if (event != null) {
LOGGER.trace("Arrived Change Stream event: {}", event);
noMessageIterations = 0;
var split = event.getSplitEvent();
if (split != null) {
@ -233,6 +237,7 @@ else if (rsOffsetContext.lastTimestamp() != null) {
}
else {
// No event was returned, so trigger a heartbeat
noMessageIterations++;
try {
// Guard against `null` to be protective of issues like SERVER-63772, and situations called out in the Javadocs:
// > resume token [...] can be null if the cursor has either not been iterated yet, or the cursor is closed.
@ -248,7 +253,10 @@ else if (rsOffsetContext.lastTimestamp() != null) {
}
try {
pause.pause();
if (noMessageIterations >= THROTTLE_NO_MESSAGE_BEFORE_PAUSE) {
noMessageIterations = 0;
pause.sleepWhen(true);
}
if (context.isPaused()) {
LOGGER.info("Streaming will now pause");
context.streamingPaused();