[ci] Close Kafka Connect TestContainer Log Watcher

This commit is contained in:
Chris Cranford 2023-03-31 09:07:20 -04:00
parent d561c9573b
commit c6c8ed17d8

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.jdbc.junit.jupiter.e2e.source;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@ -115,25 +116,29 @@ public void waitUntilDeleted() {
private void waitUntil(String message, Runnable doBeforeWait) {
final WaitingConsumer wait = new WaitingConsumer();
FrameConsumerResultCallback callback = new FrameConsumerResultCallback();
callback.addConsumer(OutputFrame.OutputType.STDOUT, wait);
try (FrameConsumerResultCallback callback = new FrameConsumerResultCallback()) {
callback.addConsumer(OutputFrame.OutputType.STDOUT, wait);
try (LogContainerCmd command = connect.getDockerClient().logContainerCmd(connect.getContainerId())) {
command.withFollowStream(true).withTail(0).withStdOut(true).exec(callback);
if (doBeforeWait != null) {
try (LogContainerCmd command = connect.getDockerClient().logContainerCmd(connect.getContainerId())) {
command.withFollowStream(true).withTail(0).withStdOut(true).exec(callback);
if (doBeforeWait != null) {
try {
doBeforeWait.run();
}
catch (Exception e) {
throw new IllegalStateException("WaitUntil callback failed", e);
}
}
try {
doBeforeWait.run();
wait.waitUntil(f -> f.getUtf8String().contains(message), 20, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new IllegalStateException("WaitUntil callback failed", e);
catch (TimeoutException e) {
throw new IllegalStateException("Failed to wait for '" + message + "'", e);
}
}
try {
wait.waitUntil(f -> f.getUtf8String().contains(message), 20, TimeUnit.SECONDS);
}
catch (TimeoutException e) {
throw new IllegalStateException("Failed to wait for '" + message + "'", e);
}
}
catch (IOException e) {
throw new RuntimeException("Wait failed for message '" + message + "'", e);
}
}