diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 52df16836..aed6f3fa6 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -207,7 +207,7 @@ public final class EmbeddedEngine implements DebeziumEngine { private static final int DEFAULT_ERROR_MAX_RETRIES = -1; - private static final Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries") + public static final Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries") .withDisplayName("The maximum number of retries") .withType(Type.INT) .withWidth(Width.SHORT) @@ -216,7 +216,7 @@ public final class EmbeddedEngine implements DebeziumEngine { .withValidation(Field::isInteger) .withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries)."); - private static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms") + public static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms") .withDisplayName("Initial delay for retries") .withType(Type.INT) .withWidth(Width.SHORT) @@ -226,7 +226,7 @@ public final class EmbeddedEngine implements DebeziumEngine { .withDescription("Initial delay (in ms) for retries when encountering connection errors." + " This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'."); - private static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms") + public static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms") .withDisplayName("Max delay between retries") .withType(Type.INT) .withWidth(Width.SHORT) @@ -235,6 +235,14 @@ public final class EmbeddedEngine implements DebeziumEngine { .withValidation(Field::isPositiveInteger) .withDescription("Max delay (in ms) between retries when encountering connection errors."); + public static final Field WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS = Field.create("debezium.embedded.shutdown.pause.before.interrupt.ms") + .withDisplayName("Time to wait to engine completion before interrupt") + .withType(Type.LONG) + .withDefault(Duration.ofMinutes(5).toMillis()) + .withValidation(Field::isPositiveInteger) + .withDescription(String.format("How long we wait before forcefully stopping the connector thread when shutting down. " + + "Must be bigger than the time it takes two polling loops to finish ({} ms)", ChangeEventSourceCoordinator.SHUTDOWN_WAIT_TIMEOUT.toMillis() * 2)); + /** * The array of fields that are required by each connectors. */ @@ -247,15 +255,6 @@ public final class EmbeddedEngine implements DebeziumEngine { OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS); - /** - * How long we wait before forcefully stopping the connector thread when - * shutting down. Must be longer than - * {@link ChangeEventSourceCoordinator#SHUTDOWN_WAIT_TIMEOUT} * 2. - */ - private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofMinutes(5); - - private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms"; - public static final class BuilderImpl implements Builder { private Configuration config; private DebeziumEngine.ChangeConsumer handler; @@ -1150,8 +1149,7 @@ public boolean stop() { if (thread != null) { try { // Making sure the event source coordinator has enough time to shut down before forcefully stopping it - Duration timeout = Duration.ofMillis(Long - .valueOf(System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis())))); + Duration timeout = Duration.ofMillis(config.getLong(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS)); LOGGER.info("Waiting for {} for connector to stop", timeout); latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); }