DBZ-6248 Expose EmbeddedEngine configuration fields

This commit is contained in:
Nir Levy 2023-03-23 18:13:43 +02:00 committed by Jiri Pechanec
parent c7e00d3881
commit 735933ce6b

View File

@ -207,7 +207,7 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
private static final int DEFAULT_ERROR_MAX_RETRIES = -1; private static final int DEFAULT_ERROR_MAX_RETRIES = -1;
private static final Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries") public static final Field ERRORS_MAX_RETRIES = Field.create("errors.max.retries")
.withDisplayName("The maximum number of retries") .withDisplayName("The maximum number of retries")
.withType(Type.INT) .withType(Type.INT)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -216,7 +216,7 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withValidation(Field::isInteger) .withValidation(Field::isInteger)
.withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries)."); .withDescription("The maximum number of retries on connection errors before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).");
private static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms") public static final Field ERRORS_RETRY_DELAY_INITIAL_MS = Field.create("errors.retry.delay.initial.ms")
.withDisplayName("Initial delay for retries") .withDisplayName("Initial delay for retries")
.withType(Type.INT) .withType(Type.INT)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -226,7 +226,7 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withDescription("Initial delay (in ms) for retries when encountering connection errors." .withDescription("Initial delay (in ms) for retries when encountering connection errors."
+ " This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'."); + " This value will be doubled upon every retry but won't exceed 'errors.retry.delay.max.ms'.");
private static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms") public static final Field ERRORS_RETRY_DELAY_MAX_MS = Field.create("errors.retry.delay.max.ms")
.withDisplayName("Max delay between retries") .withDisplayName("Max delay between retries")
.withType(Type.INT) .withType(Type.INT)
.withWidth(Width.SHORT) .withWidth(Width.SHORT)
@ -235,6 +235,14 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withValidation(Field::isPositiveInteger) .withValidation(Field::isPositiveInteger)
.withDescription("Max delay (in ms) between retries when encountering connection errors."); .withDescription("Max delay (in ms) between retries when encountering connection errors.");
public static final Field WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS = Field.create("debezium.embedded.shutdown.pause.before.interrupt.ms")
.withDisplayName("Time to wait to engine completion before interrupt")
.withType(Type.LONG)
.withDefault(Duration.ofMinutes(5).toMillis())
.withValidation(Field::isPositiveInteger)
.withDescription(String.format("How long we wait before forcefully stopping the connector thread when shutting down. " +
"Must be bigger than the time it takes two polling loops to finish ({} ms)", ChangeEventSourceCoordinator.SHUTDOWN_WAIT_TIMEOUT.toMillis() * 2));
/** /**
* The array of fields that are required by each connectors. * The array of fields that are required by each connectors.
*/ */
@ -247,15 +255,6 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS, OFFSET_FLUSH_INTERVAL_MS, OFFSET_COMMIT_TIMEOUT_MS,
ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS); ERRORS_MAX_RETRIES, ERRORS_RETRY_DELAY_INITIAL_MS, ERRORS_RETRY_DELAY_MAX_MS);
/**
* How long we wait before forcefully stopping the connector thread when
* shutting down. Must be longer than
* {@link ChangeEventSourceCoordinator#SHUTDOWN_WAIT_TIMEOUT} * 2.
*/
private static final Duration WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT = Duration.ofMinutes(5);
private static final String WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP = "debezium.embedded.shutdown.pause.before.interrupt.ms";
public static final class BuilderImpl implements Builder { public static final class BuilderImpl implements Builder {
private Configuration config; private Configuration config;
private DebeziumEngine.ChangeConsumer<SourceRecord> handler; private DebeziumEngine.ChangeConsumer<SourceRecord> handler;
@ -1150,8 +1149,7 @@ public boolean stop() {
if (thread != null) { if (thread != null) {
try { try {
// Making sure the event source coordinator has enough time to shut down before forcefully stopping it // Making sure the event source coordinator has enough time to shut down before forcefully stopping it
Duration timeout = Duration.ofMillis(Long Duration timeout = Duration.ofMillis(config.getLong(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS));
.valueOf(System.getProperty(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_PROP, Long.toString(WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_DEFAULT.toMillis()))));
LOGGER.info("Waiting for {} for connector to stop", timeout); LOGGER.info("Waiting for {} for connector to stop", timeout);
latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
} }