DBZ-7902: Add configurable delay after snapshotting, before starting streaming

This commit is contained in:
rajdangwal 2024-05-30 23:06:44 +05:30 committed by Jiri Pechanec
parent 5c35b7b061
commit e390275d91
4 changed files with 53 additions and 0 deletions

View File

@ -88,6 +88,9 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
if (previousOffsets.getOffsets().size() == 1) {
signalProcessor.setContext(snapshotResult.getOffset());
}
if (snapshotResult.isCompleted()) {
delayStreamingIfNeeded(context);
}
}
}

View File

@ -958,6 +958,16 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
+ "'warn' (the default) the value of column of event that conversion failed will be null and be logged with warn level; "
+ "'skip' the value of column of event that conversion failed will be null and be logged with debug level.");
public static final Field STREAMING_DELAY_MS = Field.create("streaming.delay.ms")
.withDisplayName("Streaming Delay (milliseconds)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 27))
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("A delay period after the snapshot is completed and the streaming begins, given in milliseconds. Defaults to 0 ms.")
.withDefault(0L)
.withValidation(Field::isNonNegativeLong);
public static final Field SNAPSHOT_LOCKING_MODE_CUSTOM_NAME = Field.create("snapshot.locking.mode.custom.name")
.withDisplayName("Snapshot Locking Mode Custom Name")
.withType(Type.STRING)
@ -1075,6 +1085,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
PROVIDE_TRANSACTION_METADATA,
SKIPPED_OPERATIONS,
SNAPSHOT_DELAY_MS,
STREAMING_DELAY_MS,
SNAPSHOT_MODE_TABLES,
SNAPSHOT_FETCH_SIZE,
SNAPSHOT_MAX_THREADS,
@ -1115,6 +1126,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
private final String heartbeatTopicsPrefix;
private final Duration heartbeatInterval;
private final Duration snapshotDelay;
private final Duration streamingDelay;
private final Duration retriableRestartWait;
private final int snapshotFetchSize;
private final int incrementalSnapshotChunkSize;
@ -1164,6 +1176,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
this.heartbeatInterval = config.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
this.snapshotDelay = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));
this.streamingDelay = Duration.ofMillis(config.getLong(STREAMING_DELAY_MS));
this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT));
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
@ -1295,6 +1308,10 @@ public Duration getSnapshotDelay() {
return snapshotDelay;
}
public Duration getStreamingDelay() {
return streamingDelay;
}
public int getSnapshotFetchSize() {
return snapshotFetchSize;
}

View File

@ -19,6 +19,9 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -193,11 +196,37 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
LOGGER.debug("Snapshot result {}", snapshotResult);
if (running && snapshotResult.isCompletedOrSkipped()) {
if (snapshotResult.isCompleted()) {
delayStreamingIfNeeded(context);
}
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
streamEvents(context, partition, snapshotResult.getOffset());
}
}
/**
* Delays streaming execution as per the {@link CommonConnectorConfig#STREAMING_DELAY_MS} parameter.
*/
protected void delayStreamingIfNeeded(ChangeEventSourceContext context) throws InterruptedException {
Duration streamingDelay = connectorConfig.getStreamingDelay();
if (streamingDelay.isZero() || streamingDelay.isNegative()) {
return;
}
Threads.Timer timer = Threads.timer(Clock.SYSTEM, streamingDelay);
Metronome metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
while (!timer.expired()) {
if (!context.isRunning()) {
throw new InterruptedException("Interrupted while awaiting streaming delay");
}
LOGGER.info("The connector will wait for {}s before initiating streaming", timer.remaining().getSeconds());
metronome.pause();
}
}
public void doBlockingSnapshot(P partition, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) {
blockingSnapshotExecutor.submit(() -> {

View File

@ -31,6 +31,10 @@ public boolean isCompletedOrSkipped() {
return this.status == SnapshotResultStatus.SKIPPED || this.status == SnapshotResultStatus.COMPLETED;
}
public boolean isCompleted() {
return this.status == SnapshotResultStatus.COMPLETED;
}
public SnapshotResultStatus getStatus() {
return status;
}