DBZ-2362 add configurable restart wait time

This commit is contained in:
Cory Harper 2020-07-16 09:34:47 -06:00 committed by Gunnar Morling
parent e70708a6ad
commit 724fc7ac98
5 changed files with 39 additions and 4 deletions

View File

@ -238,6 +238,17 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500;
public static final String DATABASE_CONFIG_PREFIX = "database.";
private static final String CONVERTER_TYPE_SUFFIX = ".type";
public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L;
public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms")
.withDisplayName("Retriable restart wait (ms)")
.withType(Type.LONG)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDefault(DEFAULT_RETRIABLE_RESTART_WAIT)
.withDescription(
"Time to wait before restarting connector after retriable exception occurs. Defaults to " + DEFAULT_RETRIABLE_RESTART_WAIT + "ms.")
.withValidation(Field::isPositiveLong);
public static final Field TOMBSTONES_ON_DELETE = Field.create("tombstones.on.delete")
.withDisplayName("Change the behaviour of Debezium with regards to delete operations")
@ -367,7 +378,8 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
PROVIDE_TRANSACTION_METADATA,
SKIPPED_OPERATIONS,
SNAPSHOT_DELAY_MS,
SNAPSHOT_FETCH_SIZE)
SNAPSHOT_FETCH_SIZE,
RETRIABLE_RESTART_WAIT)
.events(
CUSTOM_CONVERTERS,
SANITIZE_FIELD_NAMES,
@ -385,6 +397,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) {
private final String logicalName;
private final String heartbeatTopicsPrefix;
private final Duration snapshotDelayMs;
private final Duration retriableRestartWait;
private final int snapshotFetchSize;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final boolean sanitizeFieldNames;
@ -402,6 +415,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de
this.logicalName = logicalName;
this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX);
this.snapshotDelayMs = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS));
this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT));
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION)));
this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config);
@ -447,6 +461,10 @@ public String getHeartbeatTopicsPrefix() {
return heartbeatTopicsPrefix;
}
public Duration getRetriableRestartWait() {
return retriableRestartWait;
}
public Duration getSnapshotDelay() {
return snapshotDelayMs;
}

View File

@ -71,6 +71,8 @@ protected static enum State {
*/
private volatile Map<String, ?> lastOffset;
private Duration retriableRestartWait;
@Override
public final void start(Map<String, String> props) {
if (context == null) {
@ -87,6 +89,9 @@ public final void start(Map<String, String> props) {
this.props = props;
Configuration config = Configuration.from(props);
retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
// need to reset the delay or you only get one delayed restart
restartDelay = null;
if (!config.validateAndRecord(getAllConfigurationFields(), LOGGER::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
@ -181,7 +186,7 @@ private void stop(boolean restart) {
}
if (restart) {
LOGGER.warn("Going to restart connector after 10 sec. after a retriable exception");
LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds());
}
else {
LOGGER.info("Stopping down connector");
@ -201,7 +206,7 @@ private void stop(boolean restart) {
doStop();
if (restart && restartDelay == null) {
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), 10_000);
restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis());
restartDelay.hasElapsed();
}
}

View File

@ -943,6 +943,10 @@ By default, no operations are skipped.
See {link-prefix}:{link-mongodb-connector}#mongodb-transaction-metadata[Transaction Metadata] for additional details.
|[[mongodb-property-retriable-restart-connector-wait-ms]]<<mongodb-property-retriable-restart-connector-wait-ms, `retriable.restart{zwsp}.connector.wait.ms`>> +
|10000 (10 seconds)
|The number of milli-seconds to wait before restarting a connector after a retriable error occurs.
|===
[[mongodb-fault-tolerance]]

View File

@ -2070,6 +2070,10 @@ See {link-prefix}:{link-postgresql-connector}#postgresql-toasted-values[Toasted
See {link-prefix}:{link-postgresql-connector}#postgresql-transaction-metadata[Transaction Metadata] for additional details.
|[[postgresql-property-retriable-restart-connector-wait-ms]]<<postgresql-property-retriable-restart-connector-wait-ms, `retriable.restart{zwsp}.connector.wait.ms`>> +
|10000 (10 seconds)
|The number of milli-seconds to wait before restarting a connector after a retriable error occurs.
|===
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer.

View File

@ -1502,7 +1502,7 @@ twice - once in initial snapshot and once in streaming phase. Nonetheless, that
data mirroring.
For `read_uncommitted` there are no data consistency guarantees at all (some data might be lost or corrupted).
|[[sqlserver-property-source-timestamp-mode]]<<sqlserver-property-source-timestamp-mode>>, `source.timestamp.mode`
|[[sqlserver-property-source-timestamp-mode]]<<sqlserver-property-source-timestamp-mode, `source.timestamp.mode`>>
|_commit_
|String representing the criteria of the attached timestamp within the source record (ts_ms).
`commit` will set the source timestamp to the instant where the record was committed in the database (default and current behavior).
@ -1596,6 +1596,10 @@ Possible values include "Z", "UTC", offset values like "+02:00", short zone ids
See {link-prefix}:{link-sqlserver-connector}#sqlserver-transaction-metadata[Transaction Metadata] for additional details.
|[[sqlserver-property-retriable-restart-connector-wait-ms]]<<sqlserver-property-retriable-restart-connector-wait-ms, `retriable.restart{zwsp}.connector.wait.ms`>> +
|10000 (10 seconds)
|The number of milli-seconds to wait before restarting a connector after a retriable error occurs.
|===
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer. Specifically, all connector configuration properties that begin with the `database.history.producer.` prefix are used (without the prefix) when creating the Kafka producer that writes to the database history, and all those that begin with the prefix `database.history.consumer.` are used (without the prefix) when creating the Kafka consumer that reads the database history upon connector startup.