diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java index 26cb923ad..09fcf0e5e 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java @@ -117,7 +117,7 @@ public void testEngineBasicLifecycle() throws Exception { engine.run(); }); - snapshotLatch.await(1, TimeUnit.SECONDS); + snapshotLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(snapshotLatch.getCount()).isEqualTo(0); for (int i = 0; i < 5; i++) { @@ -125,7 +125,7 @@ public void testEngineBasicLifecycle() throws Exception { appendLinesToSource(NUMBER_OF_LINES); Thread.sleep(10); } - allLatch.await(1, TimeUnit.SECONDS); + allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(allLatch.getCount()).isEqualTo(0); stopEngine(); @@ -164,7 +164,7 @@ public void testRunMultipleTasks() throws Exception { Awaitility.await() .alias("Haven't read all the records in time") .pollInterval(100, TimeUnit.MILLISECONDS) - .atMost(1, TimeUnit.SECONDS) + .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS) .until(() -> recordsRead.get() == NUMBER_OF_TASKS * SimpleSourceConnector.DEFAULT_BATCH_COUNT); stopEngine(); @@ -201,14 +201,14 @@ public void testTasksAreStoppedIfSomeFailsToStart() { Awaitility.await() .alias("At least some tasks haven't stared on time") .pollInterval(10, TimeUnit.MILLISECONDS) - .atMost(1, TimeUnit.SECONDS) + .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS) .until(() -> runningTasks.get() > 0); // Once some tasks failed to start, all started tasks should be stopped. Awaitility.await() .alias("Tasks haven't been stopped on time") .pollInterval(10, TimeUnit.MILLISECONDS) - .atMost(5, TimeUnit.SECONDS) + .atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS) // if task fails to start, we don't call task callback, and we call stop for all tasks no matter if they started successfully or not // therefore it is possible that number of running tasks become negative .until(() -> runningTasks.get() <= 0); @@ -259,11 +259,11 @@ public void testCompletionCallbackCalledUponSuccess() throws Exception { // Add a few more lines, and then verify they are consumed ... appendLinesToSource(NUMBER_OF_LINES); - recordsLatch.await(1, TimeUnit.SECONDS); + recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(recordsSent.get()).isEqualTo(20); stopEngine(); - callbackLatch.await(100, TimeUnit.MILLISECONDS); + callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(callbackLatch.getCount()).isEqualTo(0); } @@ -298,7 +298,7 @@ public void testCompletionCallbackCalledUponFailure() throws Exception { engine.run(); }); - callbackLatch.await(2 * AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); + callbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(callbackLatch.getCount()).isEqualTo(0); } @@ -328,7 +328,7 @@ public void testCannotStopWhileTasksAreStarting() throws Exception { engine.run(); }); - WaitInTaskStartTask.taskStartingLatch.await(100, TimeUnit.MILLISECONDS); + WaitInTaskStartTask.taskStartingLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); Exception error = null; try { @@ -437,7 +437,7 @@ public void testExecuteSmt() throws Exception { engine.run(); }); - snapshotLatch.await(1, TimeUnit.SECONDS); + snapshotLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(snapshotLatch.getCount()).isEqualTo(0); for (int i = 0; i < 5; i++) { @@ -445,7 +445,7 @@ public void testExecuteSmt() throws Exception { appendLinesToSource(NUMBER_OF_LINES); Thread.sleep(10); } - allLatch.await(1, TimeUnit.SECONDS); + allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(allLatch.getCount()).isEqualTo(0); stopEngine(); @@ -479,7 +479,7 @@ public void testPollingIsRetriedUponFailure() throws Exception { engine.run(); }); - recordsLatch.await(5, TimeUnit.SECONDS); + recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(recordsLatch.getCount()).isEqualTo(0); stopEngine(); @@ -515,7 +515,7 @@ public void testConnectorFailsIfMaxRetriesExceeded() throws Exception { engine.run(); }); - recordsLatch.await(5, TimeUnit.SECONDS); + recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); // Engine should fail on record 7 as we have only one retry. assertThat(recordsLatch.getCount()).isEqualTo(4); @@ -621,7 +621,7 @@ public void connectorStopped() { LOGGER.info("Stopping engine"); engine.close(); // If assertThat(connectorCallbackCalled.get()).isTrue() in completion callback throws, we will time out here. - completionCallbackLatch.await(100, TimeUnit.MILLISECONDS); + completionCallbackLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(completionCallbackLatch.getCount()).isEqualTo(0); assertThat(connectorCallbackCalled.get()).isTrue(); } @@ -652,7 +652,7 @@ private void runEngineBasicLifecycleWithConsumer(final Properties props) throws appendLinesToSource(NUMBER_OF_LINES); Thread.sleep(10); } - allLatch.await(1, TimeUnit.SECONDS); + allLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); assertThat(allLatch.getCount()).isEqualTo(0); assertThat(interceptor.containsMessage("Using io.debezium.embedded.async.AsyncEmbeddedEngine$ParallelSmtConsumerProcessor processor")); @@ -664,7 +664,7 @@ protected void stopEngine() { try { LOGGER.info("Stopping engine"); engine.close(); - Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !isEngineRunning.get()); + Awaitility.await().atMost(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS).until(() -> !isEngineRunning.get()); } catch (IOException e) { LOGGER.warn("Failed during engine stop", e); @@ -743,7 +743,7 @@ static class WaitInTaskStartTask extends SimpleSourceConnector.SimpleConnectorTa public void start(Map props) { taskStartingLatch.countDown(); try { - continueLatch.await(1, TimeUnit.SECONDS); + continueLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); } catch (InterruptedException e) { throw new DebeziumException("Waiting for continuation of start was interrupted.");