DBZ-7024 Increase task management timeout to 2min

Increase task management timeout to two minutes and make this option
internal. This timeout will be hopefully sufficient for most of the
deployments. If not, we will increase the timeout it make this option
public.
This commit is contained in:
Vojtech Juranek 2024-02-07 13:36:24 +01:00 committed by Jiri Pechanec
parent fc7381ad91
commit 40131c0531
2 changed files with 18 additions and 15 deletions

View File

@ -16,15 +16,6 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig {
int AVAILABLE_CORES = Runtime.getRuntime().availableProcessors();
/**
* An optional field that specifies the maximum amount of time to wait for a task lifecycle operation, i.e. for starting and stopping the task.
*/
Field TASK_MANAGEMENT_TIMEOUT_MS = Field.create("task.management.timeout.ms")
.withDescription("Time to wait for task's lifecycle management operations (starting and stopping), given in milliseconds. "
+ "Defaults to 5 seconds (5000 ms).")
.withDefault(5_000L)
.withValidation(Field::isPositiveInteger);
/**
* An optional field that specifies the number of threads to be used for processing CDC records.
*/
@ -70,13 +61,23 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig {
.withDefault(false)
.withValidation(Field::isBoolean);
/**
* An internal field that specifies the maximum amount of time to wait for a task lifecycle operation, i.e. for starting and stopping the task.
*/
Field TASK_MANAGEMENT_TIMEOUT_MS = Field.createInternal("task.management.timeout.ms")
.withDescription("Time to wait for task's lifecycle management operations (starting and stopping), given in milliseconds. "
+ "Defaults to 2 minutes (120_000 ms).")
.withDefault(120_000L)
.withValidation(Field::isPositiveInteger);
/**
* The array of all exposed fields.
*/
Field.Set ALL_FIELDS = EmbeddedEngineConfig.ALL_FIELDS.with(
TASK_MANAGEMENT_TIMEOUT_MS,
RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS,
RECORD_PROCESSING_THREADS,
RECORD_PROCESSING_SEQUENTIALLY,
RECORD_PROCESSING_WITH_SERIAL_CONSUMER);
RECORD_PROCESSING_WITH_SERIAL_CONSUMER,
// internal fields
TASK_MANAGEMENT_TIMEOUT_MS);
}

View File

@ -55,6 +55,8 @@ public class AsyncEmbeddedEngineTest {
private static final int NUMBER_OF_LINES = 10;
protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
// As the default TASK_MANAGEMENT_TIMEOUT_MS is too large and test would run too long, use shorter tim for tests.
private static final long TEST_TASK_MANAGEMENT_TIMEOUT_MS = 1_000;
protected static final AtomicBoolean isEngineRunning = new AtomicBoolean(false);
protected static final AtomicInteger runningTasks = new AtomicInteger(0);
@ -295,7 +297,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception {
engine.run();
});
callbackLatch.await((Long) AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS.defaultValue() + 1_000, TimeUnit.MILLISECONDS);
callbackLatch.await(TEST_TASK_MANAGEMENT_TIMEOUT_MS + 1000, TimeUnit.MILLISECONDS);
assertThat(callbackLatch.getCount()).isEqualTo(0);
}
@ -607,7 +609,7 @@ protected void stopEngine() {
protected void waitForEngineToStart() {
Awaitility.await()
.alias("Engine haven't started on time")
.pollInterval((Long) AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS.defaultValue() + 1_000, TimeUnit.MILLISECONDS)
.pollInterval(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.SECONDS)
.until(() -> isEngineRunning.get());
}
@ -615,7 +617,7 @@ protected void waitForEngineToStart() {
protected void waitForEngineToStop() {
Awaitility.await()
.alias("Engine haven't stopped on time")
.pollInterval((Long) AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS.defaultValue() + 1_000, TimeUnit.MILLISECONDS)
.pollInterval(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(() -> !isEngineRunning.get());
}
@ -624,7 +626,7 @@ protected void waitForTasksToStart(int minRunningTasks) {
Awaitility.await()
.alias("Engine haven't started on time")
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost((Long) AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS.defaultValue() + 1_000, TimeUnit.MILLISECONDS)
.atMost(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.until(() -> runningTasks.get() >= minRunningTasks);
}