diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java index 7cbffa890..ff8d830a2 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.java @@ -88,6 +88,9 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps if (previousOffsets.getOffsets().size() == 1) { signalProcessor.setContext(snapshotResult.getOffset()); } + if (snapshotResult.isCompleted()) { + delayStreamingIfNeeded(context); + } } } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index c6b252e53..9910da3a2 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -958,6 +958,16 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { + "'warn' (the default) the value of column of event that conversion failed will be null and be logged with warn level; " + "'skip' the value of column of event that conversion failed will be null and be logged with debug level."); + public static final Field STREAMING_DELAY_MS = Field.create("streaming.delay.ms") + .withDisplayName("Streaming Delay (milliseconds)") + .withType(Type.LONG) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 27)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDescription("A delay period after the snapshot is completed and the streaming begins, given in milliseconds. Defaults to 0 ms.") + .withDefault(0L) + .withValidation(Field::isNonNegativeLong); + public static final Field SNAPSHOT_LOCKING_MODE_CUSTOM_NAME = Field.create("snapshot.locking.mode.custom.name") .withDisplayName("Snapshot Locking Mode Custom Name") .withType(Type.STRING) @@ -1075,6 +1085,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { PROVIDE_TRANSACTION_METADATA, SKIPPED_OPERATIONS, SNAPSHOT_DELAY_MS, + STREAMING_DELAY_MS, SNAPSHOT_MODE_TABLES, SNAPSHOT_FETCH_SIZE, SNAPSHOT_MAX_THREADS, @@ -1115,6 +1126,7 @@ public static SnapshotQueryMode parse(String value, String defaultValue) { private final String heartbeatTopicsPrefix; private final Duration heartbeatInterval; private final Duration snapshotDelay; + private final Duration streamingDelay; private final Duration retriableRestartWait; private final int snapshotFetchSize; private final int incrementalSnapshotChunkSize; @@ -1164,6 +1176,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX); this.heartbeatInterval = config.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS); this.snapshotDelay = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS)); + this.streamingDelay = Duration.ofMillis(config.getLong(STREAMING_DELAY_MS)); this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT)); this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize); this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS); @@ -1295,6 +1308,10 @@ public Duration getSnapshotDelay() { return snapshotDelay; } + public Duration getStreamingDelay() { + return streamingDelay; + } + public int getSnapshotFetchSize() { return snapshotFetchSize; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 6ccb7dec1..9b8966b52 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -19,6 +19,9 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import io.debezium.config.ConfigurationDefaults; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; import org.apache.kafka.connect.source.SourceConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,11 +196,37 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps LOGGER.debug("Snapshot result {}", snapshotResult); if (running && snapshotResult.isCompletedOrSkipped()) { + if (snapshotResult.isCompleted()) { + delayStreamingIfNeeded(context); + } previousLogContext.set(taskContext.configureLoggingContext("streaming", partition)); streamEvents(context, partition, snapshotResult.getOffset()); } } + /** + * Delays streaming execution as per the {@link CommonConnectorConfig#STREAMING_DELAY_MS} parameter. + */ + protected void delayStreamingIfNeeded(ChangeEventSourceContext context) throws InterruptedException { + Duration streamingDelay = connectorConfig.getStreamingDelay(); + + if (streamingDelay.isZero() || streamingDelay.isNegative()) { + return; + } + + Threads.Timer timer = Threads.timer(Clock.SYSTEM, streamingDelay); + Metronome metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM); + + while (!timer.expired()) { + if (!context.isRunning()) { + throw new InterruptedException("Interrupted while awaiting streaming delay"); + } + + LOGGER.info("The connector will wait for {}s before initiating streaming", timer.remaining().getSeconds()); + metronome.pause(); + } + } + public void doBlockingSnapshot(P partition, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) { blockingSnapshotExecutor.submit(() -> { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java index 5b2b6617f..009413cd2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/SnapshotResult.java @@ -31,6 +31,10 @@ public boolean isCompletedOrSkipped() { return this.status == SnapshotResultStatus.SKIPPED || this.status == SnapshotResultStatus.COMPLETED; } + public boolean isCompleted() { + return this.status == SnapshotResultStatus.COMPLETED; + } + public SnapshotResultStatus getStatus() { return status; }