DBZ-7373 Zookeeper container not being deleted quick fix + added wait times
This commit is contained in:
parent
f5e50b8704
commit
ee0ea0e189
@ -13,6 +13,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.testing.system.tools.kafka.docker.KafkaContainer;
|
||||
import io.debezium.testing.system.tools.kafka.docker.ZookeeperContainer;
|
||||
|
||||
/**
|
||||
* This class provides control over Kafka instance deployed as DockerContainer
|
||||
@ -24,6 +25,7 @@ public class DockerKafkaController implements KafkaController {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(DockerKafkaController.class);
|
||||
|
||||
private final KafkaContainer container;
|
||||
private ZookeeperContainer zookeeperContainer;
|
||||
|
||||
public DockerKafkaController(KafkaContainer container) {
|
||||
this.container = container;
|
||||
@ -38,6 +40,10 @@ public String getPublicBootstrapAddress() {
|
||||
return container.getPublicBootstrapAddress();
|
||||
}
|
||||
|
||||
public void setZookeeperContainer(ZookeeperContainer zookeeperContainer) {
|
||||
this.zookeeperContainer = zookeeperContainer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBootstrapAddress() {
|
||||
return container.getBootstrapAddress();
|
||||
@ -51,7 +57,8 @@ public String getTlsBootstrapAddress() {
|
||||
@Override
|
||||
public boolean undeploy() {
|
||||
container.stop();
|
||||
return container.isRunning();
|
||||
zookeeperContainer.stop();
|
||||
return zookeeperContainer.isRunning();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -34,7 +34,9 @@ public DockerKafkaController deploy() {
|
||||
container.withZookeeper(zookeeperContainer);
|
||||
|
||||
Startables.deepStart(Stream.of(zookeeperContainer, container)).join();
|
||||
return getController(container);
|
||||
DockerKafkaController controller = getController(container);
|
||||
controller.setZookeeperContainer(zookeeperContainer);
|
||||
return controller;
|
||||
}
|
||||
|
||||
public static class Builder
|
||||
|
@ -5,10 +5,13 @@
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.docker;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
|
||||
import io.debezium.testing.system.tools.ConfigProperties;
|
||||
import io.debezium.testing.system.tools.WaitConditions;
|
||||
|
||||
public class KafkaConnectConainer extends GenericContainer<KafkaConnectConainer> {
|
||||
|
||||
@ -28,7 +31,7 @@ public KafkaConnectConainer() {
|
||||
}
|
||||
|
||||
private void defaultConfig() {
|
||||
withReuse(true);
|
||||
withReuse(false);
|
||||
withExposedPorts(KAFKA_CONNECT_API_PORT, KAFKA_JMX_PORT);
|
||||
addEnv("CONFIG_STORAGE_TOPIC", "connect_config");
|
||||
addEnv("OFFSET_STORAGE_TOPIC", "connect_offsets");
|
||||
@ -36,6 +39,7 @@ private void defaultConfig() {
|
||||
addEnv("JMXHOST", KAFKA_JMX_HOST);
|
||||
addEnv("JMXPORT", String.valueOf(KAFKA_JMX_PORT));
|
||||
withHttpMetrics();
|
||||
withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
|
||||
withCommand(KAFKA_CONNECT_COMMAND);
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.docker;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
@ -13,6 +14,7 @@
|
||||
import com.github.dockerjava.api.command.InspectContainerResponse;
|
||||
|
||||
import io.debezium.testing.system.tools.ConfigProperties;
|
||||
import io.debezium.testing.system.tools.WaitConditions;
|
||||
|
||||
public class KafkaContainer extends GenericContainer<KafkaContainer> {
|
||||
|
||||
@ -42,6 +44,7 @@ private void defaultConfig() {
|
||||
withEnv("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://" + getPublicBootstrapAddress() + ",BROKER://" + getBootstrapAddress());
|
||||
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
|
||||
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
|
||||
withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
|
||||
}
|
||||
|
||||
public KafkaContainer withZookeeper(ZookeeperContainer zookeeper) {
|
||||
|
@ -5,9 +5,12 @@
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.docker;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
|
||||
import io.debezium.testing.system.tools.ConfigProperties;
|
||||
import io.debezium.testing.system.tools.WaitConditions;
|
||||
|
||||
public class ZookeeperContainer extends GenericContainer<ZookeeperContainer> {
|
||||
|
||||
@ -30,6 +33,7 @@ public String serverAddress() {
|
||||
private void defaultConfig() {
|
||||
withExposedPorts(ZOOKEEPER_PORT_CLIENT);
|
||||
withCommand(ZOOKEEPER_COMMAND);
|
||||
withStartupTimeout(Duration.ofMinutes(WaitConditions.scaled(1)));
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user