DBZ-7568 Use default engine wait time in all async engine latches

This commit is contained in:
Vojtech Juranek 2024-02-27 15:21:42 +01:00 committed by Jiri Pechanec
parent 6928cd775c
commit 6cc68fdfe3

View File

@ -117,7 +117,7 @@ public void testEngineBasicLifecycle() throws Exception {
engine.run(); engine.run();
}); });
snapshotLatch.await(1, TimeUnit.SECONDS); snapshotLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(snapshotLatch.getCount()).isEqualTo(0); assertThat(snapshotLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
@ -125,7 +125,7 @@ public void testEngineBasicLifecycle() throws Exception {
appendLinesToSource(NUMBER_OF_LINES); appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10); Thread.sleep(10);
} }
allLatch.await(1, TimeUnit.SECONDS); allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(allLatch.getCount()).isEqualTo(0); assertThat(allLatch.getCount()).isEqualTo(0);
stopEngine(); stopEngine();
@ -164,7 +164,7 @@ public void testRunMultipleTasks() throws Exception {
Awaitility.await() Awaitility.await()
.alias("Haven't read all the records in time") .alias("Haven't read all the records in time")
.pollInterval(100, TimeUnit.MILLISECONDS) .pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.SECONDS) .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS)
.until(() -> recordsRead.get() == NUMBER_OF_TASKS * SimpleSourceConnector.DEFAULT_BATCH_COUNT); .until(() -> recordsRead.get() == NUMBER_OF_TASKS * SimpleSourceConnector.DEFAULT_BATCH_COUNT);
stopEngine(); stopEngine();
@ -201,14 +201,14 @@ public void testTasksAreStoppedIfSomeFailsToStart() {
Awaitility.await() Awaitility.await()
.alias("At least some tasks haven't stared on time") .alias("At least some tasks haven't stared on time")
.pollInterval(10, TimeUnit.MILLISECONDS) .pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.SECONDS) .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS)
.until(() -> runningTasks.get() > 0); .until(() -> runningTasks.get() > 0);
// Once some tasks failed to start, all started tasks should be stopped. // Once some tasks failed to start, all started tasks should be stopped.
Awaitility.await() Awaitility.await()
.alias("Tasks haven't been stopped on time") .alias("Tasks haven't been stopped on time")
.pollInterval(10, TimeUnit.MILLISECONDS) .pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(5, TimeUnit.SECONDS) .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS)
// if task fails to start, we don't call task callback, and we call stop for all tasks no matter if they started successfully or not // if task fails to start, we don't call task callback, and we call stop for all tasks no matter if they started successfully or not
// therefore it is possible that number of running tasks become negative // therefore it is possible that number of running tasks become negative
.until(() -> runningTasks.get() <= 0); .until(() -> runningTasks.get() <= 0);
@ -259,11 +259,11 @@ public void testCompletionCallbackCalledUponSuccess() throws Exception {
// Add a few more lines, and then verify they are consumed ... // Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES); appendLinesToSource(NUMBER_OF_LINES);
recordsLatch.await(1, TimeUnit.SECONDS); recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(recordsSent.get()).isEqualTo(20); assertThat(recordsSent.get()).isEqualTo(20);
stopEngine(); stopEngine();
callbackLatch.await(100, TimeUnit.MILLISECONDS); callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(callbackLatch.getCount()).isEqualTo(0); assertThat(callbackLatch.getCount()).isEqualTo(0);
} }
@ -298,7 +298,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception {
engine.run(); engine.run();
}); });
callbackLatch.await(2 * AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(callbackLatch.getCount()).isEqualTo(0); assertThat(callbackLatch.getCount()).isEqualTo(0);
} }
@ -328,7 +328,7 @@ public void testCannotStopWhileTasksAreStarting() throws Exception {
engine.run(); engine.run();
}); });
WaitInTaskStartTask.taskStartingLatch.await(100, TimeUnit.MILLISECONDS); WaitInTaskStartTask.taskStartingLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
Exception error = null; Exception error = null;
try { try {
@ -437,7 +437,7 @@ public void testExecuteSmt() throws Exception {
engine.run(); engine.run();
}); });
snapshotLatch.await(1, TimeUnit.SECONDS); snapshotLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(snapshotLatch.getCount()).isEqualTo(0); assertThat(snapshotLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
@ -445,7 +445,7 @@ public void testExecuteSmt() throws Exception {
appendLinesToSource(NUMBER_OF_LINES); appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10); Thread.sleep(10);
} }
allLatch.await(1, TimeUnit.SECONDS); allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(allLatch.getCount()).isEqualTo(0); assertThat(allLatch.getCount()).isEqualTo(0);
stopEngine(); stopEngine();
@ -479,7 +479,7 @@ public void testPollingIsRetriedUponFailure() throws Exception {
engine.run(); engine.run();
}); });
recordsLatch.await(5, TimeUnit.SECONDS); recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(recordsLatch.getCount()).isEqualTo(0); assertThat(recordsLatch.getCount()).isEqualTo(0);
stopEngine(); stopEngine();
@ -515,7 +515,7 @@ public void testConnectorFailsIfMaxRetriesExceeded() throws Exception {
engine.run(); engine.run();
}); });
recordsLatch.await(5, TimeUnit.SECONDS); recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
// Engine should fail on record 7 as we have only one retry. // Engine should fail on record 7 as we have only one retry.
assertThat(recordsLatch.getCount()).isEqualTo(4); assertThat(recordsLatch.getCount()).isEqualTo(4);
@ -621,7 +621,7 @@ public void connectorStopped() {
LOGGER.info("Stopping engine"); LOGGER.info("Stopping engine");
engine.close(); engine.close();
// If assertThat(connectorCallbackCalled.get()).isTrue() in completion callback throws, we will time out here. // If assertThat(connectorCallbackCalled.get()).isTrue() in completion callback throws, we will time out here.
completionCallbackLatch.await(100, TimeUnit.MILLISECONDS); completionCallbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(completionCallbackLatch.getCount()).isEqualTo(0); assertThat(completionCallbackLatch.getCount()).isEqualTo(0);
assertThat(connectorCallbackCalled.get()).isTrue(); assertThat(connectorCallbackCalled.get()).isTrue();
} }
@ -652,7 +652,7 @@ private void runEngineBasicLifecycleWithConsumer(final Properties props) throws
appendLinesToSource(NUMBER_OF_LINES); appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10); Thread.sleep(10);
} }
allLatch.await(1, TimeUnit.SECONDS); allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
assertThat(allLatch.getCount()).isEqualTo(0); assertThat(allLatch.getCount()).isEqualTo(0);
assertThat(interceptor.containsMessage("Using io.debezium.embedded.async.AsyncEmbeddedEngine$ParallelSmtConsumerProcessor processor")); assertThat(interceptor.containsMessage("Using io.debezium.embedded.async.AsyncEmbeddedEngine$ParallelSmtConsumerProcessor processor"));
@ -664,7 +664,7 @@ protected void stopEngine() {
try { try {
LOGGER.info("Stopping engine"); LOGGER.info("Stopping engine");
engine.close(); engine.close();
Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !isEngineRunning.get()); Awaitility.await().atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> !isEngineRunning.get());
} }
catch (IOException e) { catch (IOException e) {
LOGGER.warn("Failed during engine stop", e); LOGGER.warn("Failed during engine stop", e);
@ -743,7 +743,7 @@ static class WaitInTaskStartTask extends SimpleSourceConnector.SimpleConnectorTa
public void start(Map<String, String> props) { public void start(Map<String, String> props) {
taskStartingLatch.countDown(); taskStartingLatch.countDown();
try { try {
continueLatch.await(1, TimeUnit.SECONDS); continueLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new DebeziumException("Waiting for continuation of start was interrupted."); throw new DebeziumException("Waiting for continuation of start was interrupted.");