From 724fc7ac98433a211afdc60e55579877e67170ee Mon Sep 17 00:00:00 2001 From: Cory Harper Date: Thu, 16 Jul 2020 09:34:47 -0600 Subject: [PATCH] DBZ-2362 add configurable restart wait time --- .../config/CommonConnectorConfig.java | 20 ++++++++++++++++++- .../connector/common/BaseSourceTask.java | 9 +++++++-- .../ROOT/pages/connectors/mongodb.adoc | 4 ++++ .../ROOT/pages/connectors/postgresql.adoc | 4 ++++ .../ROOT/pages/connectors/sqlserver.adoc | 6 +++++- 5 files changed, 39 insertions(+), 4 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 8ce66afa4..6104601b1 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -238,6 +238,17 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500; public static final String DATABASE_CONFIG_PREFIX = "database."; private static final String CONVERTER_TYPE_SUFFIX = ".type"; + public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L; + + public static final Field RETRIABLE_RESTART_WAIT = Field.create("retriable.restart.connector.wait.ms") + .withDisplayName("Retriable restart wait (ms)") + .withType(Type.LONG) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDefault(DEFAULT_RETRIABLE_RESTART_WAIT) + .withDescription( + "Time to wait before restarting connector after retriable exception occurs. Defaults to " + DEFAULT_RETRIABLE_RESTART_WAIT + "ms.") + .withValidation(Field::isPositiveLong); public static final Field TOMBSTONES_ON_DELETE = Field.create("tombstones.on.delete") .withDisplayName("Change the behaviour of Debezium with regards to delete operations") @@ -367,7 +378,8 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { PROVIDE_TRANSACTION_METADATA, SKIPPED_OPERATIONS, SNAPSHOT_DELAY_MS, - SNAPSHOT_FETCH_SIZE) + SNAPSHOT_FETCH_SIZE, + RETRIABLE_RESTART_WAIT) .events( CUSTOM_CONVERTERS, SANITIZE_FIELD_NAMES, @@ -385,6 +397,7 @@ public static BinaryHandlingMode parse(String value, String defaultValue) { private final String logicalName; private final String heartbeatTopicsPrefix; private final Duration snapshotDelayMs; + private final Duration retriableRestartWait; private final int snapshotFetchSize; private final SourceInfoStructMaker sourceInfoStructMaker; private final boolean sanitizeFieldNames; @@ -402,6 +415,7 @@ protected CommonConnectorConfig(Configuration config, String logicalName, int de this.logicalName = logicalName; this.heartbeatTopicsPrefix = config.getString(Heartbeat.HEARTBEAT_TOPICS_PREFIX); this.snapshotDelayMs = Duration.ofMillis(config.getLong(SNAPSHOT_DELAY_MS)); + this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT)); this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize); this.sourceInfoStructMaker = getSourceInfoStructMaker(Version.parse(config.getString(SOURCE_STRUCT_MAKER_VERSION))); this.sanitizeFieldNames = config.getBoolean(SANITIZE_FIELD_NAMES) || isUsingAvroConverter(config); @@ -447,6 +461,10 @@ public String getHeartbeatTopicsPrefix() { return heartbeatTopicsPrefix; } + public Duration getRetriableRestartWait() { + return retriableRestartWait; + } + public Duration getSnapshotDelay() { return snapshotDelayMs; } diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java index 2537e4c34..7d9e2977d 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceTask.java @@ -71,6 +71,8 @@ protected static enum State { */ private volatile Map lastOffset; + private Duration retriableRestartWait; + @Override public final void start(Map props) { if (context == null) { @@ -87,6 +89,9 @@ public final void start(Map props) { this.props = props; Configuration config = Configuration.from(props); + retriableRestartWait = config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS); + // need to reset the delay or you only get one delayed restart + restartDelay = null; if (!config.validateAndRecord(getAllConfigurationFields(), LOGGER::error)) { throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details"); } @@ -181,7 +186,7 @@ private void stop(boolean restart) { } if (restart) { - LOGGER.warn("Going to restart connector after 10 sec. after a retriable exception"); + LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", retriableRestartWait.getSeconds()); } else { LOGGER.info("Stopping down connector"); @@ -201,7 +206,7 @@ private void stop(boolean restart) { doStop(); if (restart && restartDelay == null) { - restartDelay = ElapsedTimeStrategy.constant(Clock.system(), 10_000); + restartDelay = ElapsedTimeStrategy.constant(Clock.system(), retriableRestartWait.toMillis()); restartDelay.hasElapsed(); } } diff --git a/documentation/modules/ROOT/pages/connectors/mongodb.adoc b/documentation/modules/ROOT/pages/connectors/mongodb.adoc index 1cbfa740a..fec85b1fb 100644 --- a/documentation/modules/ROOT/pages/connectors/mongodb.adoc +++ b/documentation/modules/ROOT/pages/connectors/mongodb.adoc @@ -943,6 +943,10 @@ By default, no operations are skipped. See {link-prefix}:{link-mongodb-connector}#mongodb-transaction-metadata[Transaction Metadata] for additional details. +|[[mongodb-property-retriable-restart-connector-wait-ms]]<> + +|10000 (10 seconds) +|The number of milli-seconds to wait before restarting a connector after a retriable error occurs. + |=== [[mongodb-fault-tolerance]] diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index e703bcd87..1aa8c5cf8 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -2070,6 +2070,10 @@ See {link-prefix}:{link-postgresql-connector}#postgresql-toasted-values[Toasted See {link-prefix}:{link-postgresql-connector}#postgresql-transaction-metadata[Transaction Metadata] for additional details. +|[[postgresql-property-retriable-restart-connector-wait-ms]]<> + +|10000 (10 seconds) +|The number of milli-seconds to wait before restarting a connector after a retriable error occurs. + |=== The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer. diff --git a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc index c472a0ce3..daa1db73b 100644 --- a/documentation/modules/ROOT/pages/connectors/sqlserver.adoc +++ b/documentation/modules/ROOT/pages/connectors/sqlserver.adoc @@ -1502,7 +1502,7 @@ twice - once in initial snapshot and once in streaming phase. Nonetheless, that data mirroring. For `read_uncommitted` there are no data consistency guarantees at all (some data might be lost or corrupted). -|[[sqlserver-property-source-timestamp-mode]]<>, `source.timestamp.mode` +|[[sqlserver-property-source-timestamp-mode]]<> |_commit_ |String representing the criteria of the attached timestamp within the source record (ts_ms). `commit` will set the source timestamp to the instant where the record was committed in the database (default and current behavior). @@ -1596,6 +1596,10 @@ Possible values include "Z", "UTC", offset values like "+02:00", short zone ids See {link-prefix}:{link-sqlserver-connector}#sqlserver-transaction-metadata[Transaction Metadata] for additional details. +|[[sqlserver-property-retriable-restart-connector-wait-ms]]<> + +|10000 (10 seconds) +|The number of milli-seconds to wait before restarting a connector after a retriable error occurs. + |=== The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer. Specifically, all connector configuration properties that begin with the `database.history.producer.` prefix are used (without the prefix) when creating the Kafka producer that writes to the database history, and all those that begin with the prefix `database.history.consumer.` are used (without the prefix) when creating the Kafka consumer that reads the database history upon connector startup.