DBZ-601 Snapshot delay implementation

This commit is contained in:
Peter Goransson 2018-05-29 11:54:11 -04:00 committed by Gunnar Morling
parent 4111ddbcaf
commit f387f72474
5 changed files with 85 additions and 4 deletions

View File

@ -29,10 +29,12 @@ public class BlockingReader implements Reader {
private final Metronome metronome; private final Metronome metronome;
private final String name; private final String name;
private final String runningLogMessage;
public BlockingReader(String name) { public BlockingReader(String name, String runningLogMessage) {
this.name = name; this.name = name;
this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM); this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
this.runningLogMessage = runningLogMessage;
} }
@ -65,7 +67,7 @@ public void uponCompletion(Runnable handler) {
@Override @Override
public void start() { public void start() {
state.set(State.RUNNING); state.set(State.RUNNING);
logger.info("Connector has completed all of its work but will continue in the running state. It can be shut down at any time."); logger.info(runningLogMessage);
} }
@Override @Override

View File

@ -894,6 +894,15 @@ public static EventProcessingFailureHandlingMode parse(String value) {
"The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " + "The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " +
"A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted."); "A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.");
public static final Field SNAPSHOT_DELAY_MS = Field.createInternal("snapshot.delay.ms")
.withDisplayName("Snapshot Delay (milliseconds)")
.withType(Type.LONG)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The number of milliseconds to delay before a snapshot will begin.")
.withDefault(0L)
.withValidation(Field::isNonNegativeLong);
/** /**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should * Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values truncated to be no longer than the specified number of characters. * have their values truncated to be no longer than the specified number of characters.
@ -947,6 +956,7 @@ public static final Field MASK_COLUMN(int length) {
BIGINT_UNSIGNED_HANDLING_MODE, BIGINT_UNSIGNED_HANDLING_MODE,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,
SNAPSHOT_DELAY_MS,
CommonConnectorConfig.TOMBSTONES_ON_DELETE); CommonConnectorConfig.TOMBSTONES_ON_DELETE);
/** /**
@ -1003,7 +1013,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE, Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE); BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS);
return config; return config;
} }

View File

@ -165,11 +165,16 @@ public synchronized void start(Configuration config) {
// We're supposed to start with a snapshot, so set that up ... // We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext); SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents(); if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
if (!taskContext.snapshotDelay().isZero()) {
// Adding a timed blocking reader to delay the snapshot, can help to avoid initial rebalancing interruptions
chainedReaderBuilder.addReader(new TimedBlockingReader("timed-blocker", taskContext.snapshotDelay()));
}
chainedReaderBuilder.addReader(snapshotReader); chainedReaderBuilder.addReader(snapshotReader);
if (taskContext.isInitialSnapshotOnly()) { if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes."); logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
chainedReaderBuilder.addReader(new BlockingReader("blocker")); chainedReaderBuilder.addReader(new BlockingReader("blocker", "Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate."); chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else { } else {
if (!rowBinlogEnabled) { if (!rowBinlogEnabled) {

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.time.Duration;
import java.util.Map; import java.util.Map;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -253,6 +254,10 @@ public String getHeartbeatTopicsPrefix() {
return config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX); return config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
} }
public Duration snapshotDelay() {
return Duration.ofMillis(config.getLong(MySqlConnectorConfig.SNAPSHOT_DELAY_MS));
}
public void start() { public void start() {
connectionContext.start(); connectionContext.start();
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point // Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point

View File

@ -0,0 +1,59 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* A component that blocks doing nothing for a specified period of time or until the connector task is stopped
*
* @author Peter Goransson
*/
public class TimedBlockingReader extends BlockingReader {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Duration timeout;
private Timer timer;
/**
* @param name Name of the reader
* @param timeout Duration of time until this TimedBlockingReader should stop
*/
public TimedBlockingReader(String name, Duration timeout) {
super(name, "The connector will wait for " + timeout.toMillis() + " ms before proceeding");
this.timeout = timeout;
}
@Override
public void start() {
super.start();
this.timer = Threads.timer(Clock.SYSTEM, timeout);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
super.poll();
// Stop when we've reached the timeout threshold
if (timer != null && timer.expired()) {
stop();
}
return null;
}
}