DBZ-7496 Intorduce configurable async engine timeout in tests

The defaul async engine timeout to start and stop is 1 second, but it's
configurable via `debezium.test.engine.waittime` system property.
This commit is contained in:
Vojtech Juranek 2024-02-16 13:42:29 +01:00 committed by Jiri Pechanec
parent ecc4c096ab
commit 46fa5e79b9
2 changed files with 12 additions and 9 deletions

View File

@ -1220,7 +1220,7 @@ protected void storeOffsets(Configuration config, Map<Map<String, ?>, Map<String
public void waitForEngineShutdown() {
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(waitTimeForRecords() * 10L, TimeUnit.SECONDS)
.atMost(waitTimeForEngine() * 3, TimeUnit.MILLISECONDS)
.until(() -> !isEngineRunning.get());
}
@ -1267,6 +1267,10 @@ protected void assertRecordTransactionMetadata(SourceRecord record, String expec
assertThat(offset.get("transaction_id")).isEqualTo(expectedTxId);
}
public static int waitTimeForEngine() {
return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "engine.waittime", "1000"));
}
public static int waitTimeForRecords() {
return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "records.waittime", "2"));
}

View File

@ -37,6 +37,7 @@
import io.debezium.DebeziumException;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.DebeziumEngineTestUtils;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.engine.DebeziumEngine;
@ -57,8 +58,6 @@ public class AsyncEmbeddedEngineTest {
private static final int NUMBER_OF_LINES = 10;
protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
// As the default TASK_MANAGEMENT_TIMEOUT_MS is too large and test would run too long, use shorter tim for tests.
private static final long TEST_TASK_MANAGEMENT_TIMEOUT_MS = 1_000;
protected static final AtomicBoolean isEngineRunning = new AtomicBoolean(false);
protected static final AtomicInteger runningTasks = new AtomicInteger(0);
@ -299,7 +298,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception {
engine.run();
});
callbackLatch.await(TEST_TASK_MANAGEMENT_TIMEOUT_MS + 1000, TimeUnit.MILLISECONDS);
callbackLatch.await(2 * AbstractConnectorTest.waitTimeForEngine(), TimeUnit.MILLISECONDS);
assertThat(callbackLatch.getCount()).isEqualTo(0);
}
@ -680,16 +679,16 @@ protected void stopEngine() {
protected void waitForEngineToStart() {
Awaitility.await()
.alias("Engine haven't started on time")
.pollInterval(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.atMost(5 * TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS)
.until(() -> isEngineRunning.get());
}
protected void waitForEngineToStop() {
Awaitility.await()
.alias("Engine haven't stopped on time")
.pollInterval(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.atMost(5 * TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.SECONDS)
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS)
.until(() -> !isEngineRunning.get());
}
@ -697,7 +696,7 @@ protected void waitForTasksToStart(int minRunningTasks) {
Awaitility.await()
.alias("Engine haven't started on time")
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(TEST_TASK_MANAGEMENT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.MILLISECONDS)
.until(() -> runningTasks.get() >= minRunningTasks);
}