diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source.java index 54f3fe8a4..0d0eaff19 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.jdbc.junit.jupiter.e2e.source; +import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -115,25 +116,29 @@ public void waitUntilDeleted() { private void waitUntil(String message, Runnable doBeforeWait) { final WaitingConsumer wait = new WaitingConsumer(); - FrameConsumerResultCallback callback = new FrameConsumerResultCallback(); - callback.addConsumer(OutputFrame.OutputType.STDOUT, wait); + try (FrameConsumerResultCallback callback = new FrameConsumerResultCallback()) { + callback.addConsumer(OutputFrame.OutputType.STDOUT, wait); - try (LogContainerCmd command = connect.getDockerClient().logContainerCmd(connect.getContainerId())) { - command.withFollowStream(true).withTail(0).withStdOut(true).exec(callback); - if (doBeforeWait != null) { + try (LogContainerCmd command = connect.getDockerClient().logContainerCmd(connect.getContainerId())) { + command.withFollowStream(true).withTail(0).withStdOut(true).exec(callback); + if (doBeforeWait != null) { + try { + doBeforeWait.run(); + } + catch (Exception e) { + throw new IllegalStateException("WaitUntil callback failed", e); + } + } try { - doBeforeWait.run(); + wait.waitUntil(f -> f.getUtf8String().contains(message), 20, TimeUnit.SECONDS); } - catch (Exception e) { - throw new IllegalStateException("WaitUntil callback failed", e); + catch (TimeoutException e) { + throw new IllegalStateException("Failed to wait for '" + message + "'", e); } } - try { - wait.waitUntil(f -> f.getUtf8String().contains(message), 20, TimeUnit.SECONDS); - } - catch (TimeoutException e) { - throw new IllegalStateException("Failed to wait for '" + message + "'", e); - } + } + catch (IOException e) { + throw new RuntimeException("Wait failed for message '" + message + "'", e); } }