From ee0ea0e18976d1440358ba2e696d3fc828595c5e Mon Sep 17 00:00:00 2001 From: Jiri Novotny Date: Mon, 22 Jan 2024 15:08:08 +0100 Subject: [PATCH] DBZ-7373 Zookeeper container not being deleted quick fix + added wait times --- .../system/tools/kafka/DockerKafkaController.java | 9 ++++++++- .../testing/system/tools/kafka/DockerKafkaDeployer.java | 4 +++- .../system/tools/kafka/docker/KafkaConnectConainer.java | 6 +++++- .../system/tools/kafka/docker/KafkaContainer.java | 3 +++ .../system/tools/kafka/docker/ZookeeperContainer.java | 4 ++++ 5 files changed, 23 insertions(+), 3 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java index ddd8b68b5..d345a9e05 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaController.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.kafka.docker.KafkaContainer; +import io.debezium.testing.system.tools.kafka.docker.ZookeeperContainer; /** * This class provides control over Kafka instance deployed as DockerContainer @@ -24,6 +25,7 @@ public class DockerKafkaController implements KafkaController { private static final Logger LOGGER = LoggerFactory.getLogger(DockerKafkaController.class); private final KafkaContainer container; + private ZookeeperContainer zookeeperContainer; public DockerKafkaController(KafkaContainer container) { this.container = container; @@ -38,6 +40,10 @@ public String getPublicBootstrapAddress() { return container.getPublicBootstrapAddress(); } + public void setZookeeperContainer(ZookeeperContainer zookeeperContainer) { + this.zookeeperContainer = zookeeperContainer; + } + @Override public String getBootstrapAddress() { return container.getBootstrapAddress(); @@ -51,7 +57,8 @@ public String getTlsBootstrapAddress() { @Override public boolean undeploy() { container.stop(); - return container.isRunning(); + zookeeperContainer.stop(); + return zookeeperContainer.isRunning(); } @Override diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java index d18fa180b..0581a9b87 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/DockerKafkaDeployer.java @@ -34,7 +34,9 @@ public DockerKafkaController deploy() { container.withZookeeper(zookeeperContainer); Startables.deepStart(Stream.of(zookeeperContainer, container)).join(); - return getController(container); + DockerKafkaController controller = getController(container); + controller.setZookeeperContainer(zookeeperContainer); + return controller; } public static class Builder diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java index 05cc651b4..2e27bfcea 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaConnectConainer.java @@ -5,10 +5,13 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class KafkaConnectConainer extends GenericContainer { @@ -28,7 +31,7 @@ public KafkaConnectConainer() { } private void defaultConfig() { - withReuse(true); + withReuse(false); withExposedPorts(KAFKA_CONNECT_API_PORT, KAFKA_JMX_PORT); addEnv("CONFIG_STORAGE_TOPIC", "connect_config"); addEnv("OFFSET_STORAGE_TOPIC", "connect_offsets"); @@ -36,6 +39,7 @@ private void defaultConfig() { addEnv("JMXHOST", KAFKA_JMX_HOST); addEnv("JMXPORT", String.valueOf(KAFKA_JMX_PORT)); withHttpMetrics(); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); withCommand(KAFKA_CONNECT_COMMAND); } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java index 14c512214..3d0060095 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/KafkaContainer.java @@ -5,6 +5,7 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import org.testcontainers.containers.GenericContainer; @@ -13,6 +14,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class KafkaContainer extends GenericContainer { @@ -42,6 +44,7 @@ private void defaultConfig() { withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://" + getPublicBootstrapAddress() + ",BROKER://" + getBootstrapAddress()); withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); } public KafkaContainer withZookeeper(ZookeeperContainer zookeeper) { diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java index 564708f01..b53971311 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/docker/ZookeeperContainer.java @@ -5,9 +5,12 @@ */ package io.debezium.testing.system.tools.kafka.docker; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.WaitConditions; public class ZookeeperContainer extends GenericContainer { @@ -30,6 +33,7 @@ public String serverAddress() { private void defaultConfig() { withExposedPorts(ZOOKEEPER_PORT_CLIENT); withCommand(ZOOKEEPER_COMMAND); + withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1))); } }