From 4a9e2816d689d7ffb821581e5863641593a84fc7 Mon Sep 17 00:00:00 2001 From: Martin Medek Date: Fri, 9 Dec 2022 13:28:11 +0100 Subject: [PATCH] DBZ-2193 replaced kafkaCluster loadBalancer with route, port forward local port is now chosen as first free random port. --- .../AbstractOcpDatabaseController.java | 96 ++++++++++++++----- .../tools/databases/DatabaseController.java | 2 +- .../tools/databases/db2/OcpDB2Controller.java | 6 +- .../databases/mongodb/OcpMongoController.java | 5 +- .../databases/oracle/OcpOracleController.java | 4 + .../sqlserver/OcpSqlServerController.java | 4 + .../system/tools/kafka/KafkaController.java | 3 +- .../tools/kafka/OcpKafkaController.java | 51 ++++++++++ .../kafka/builders/FabricKafkaBuilder.java | 7 +- .../builders/FabricKafkaConnectBuilder.java | 10 +- .../system/fixtures/kafka/DockerKafka.java | 4 +- .../system/tests/MongoConnectorTest.java | 23 ----- .../system/tests/SqlConnectorTest.java | 23 ----- 13 files changed, 154 insertions(+), 84 deletions(-) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/AbstractOcpDatabaseController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/AbstractOcpDatabaseController.java index 3a723ba1e..680069e20 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/AbstractOcpDatabaseController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/AbstractOcpDatabaseController.java @@ -11,8 +11,9 @@ import static org.awaitility.Awaitility.await; import java.io.IOException; -import java.util.LinkedList; +import java.net.ServerSocket; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,9 @@ public abstract class AbstractOcpDatabaseController services; - protected List portForwards = new LinkedList<>(); + protected PortForward portForward; + + private int localPort; public AbstractOcpDatabaseController( Deployment deployment, List services, OpenShiftClient ocp) { @@ -61,7 +67,10 @@ private Service getService() { } @Override - public void reload() throws InterruptedException { + public void reload() throws InterruptedException, IOException { + if (!isRunningFromOcp()) { + closeDatabasePortForwards(); + } LOGGER.info("Removing all pods of '" + name + "' deployment in namespace '" + project + "'"); ocp.apps().deployments().inNamespace(project).withName(name).scale(0); await() @@ -80,16 +89,12 @@ public String getDatabaseHostname() { @Override public int getDatabasePort() { - return getService().getSpec().getPorts().stream() - .filter(p -> p.getName().equals("db")) - .findAny() - .get().getPort(); + return getOriginalDatabasePort(); } @Override public String getPublicDatabaseHostname() { if (isRunningFromOcp()) { - LOGGER.info("Running from OCP, using internal database hostname"); return getDatabaseHostname(); } return FORWARDED_HOST; @@ -97,37 +102,76 @@ public String getPublicDatabaseHostname() { @Override public int getPublicDatabasePort() { - return getDatabasePort(); + if (isRunningFromOcp()) { + return getDatabasePort(); + } + return localPort; + } + + @Override + public void initialize() throws InterruptedException, IOException { + if (!isRunningFromOcp()) { + forwardDatabasePorts(); + } } @Override public void forwardDatabasePorts() { - String dbName = deployment.getMetadata().getLabels().get("app"); - ServiceResource serviceResource = ocp.services().inNamespace(project).withName(dbName); + if (portForward != null) { + LOGGER.warn("Calling port forward when forward already on " + getOriginalDatabasePort() + "->" + localPort); + return; + } + String serviceName = getService().getMetadata().getName(); + ServiceResource serviceResource = ocp.services().inNamespace(project).withName(serviceName); + int dbPort = getOriginalDatabasePort(); + localPort = getAvailablePort(); - serviceResource.get().getSpec().getPorts().forEach(port -> { - int servicePort = port.getPort(); - PortForward forward = serviceResource - .portForward(servicePort, servicePort); + LOGGER.info("Forwarding ports " + dbPort + "->" + localPort + " on service: " + serviceName); - for (Throwable e : forward.getClientThrowables()) { - LOGGER.error("Client error when forwarding DB port " + servicePort, e); - } + PortForward forward = serviceResource + .portForward(dbPort, localPort); - for (Throwable e : forward.getServerThrowables()) { - LOGGER.error("Server error when forwarding DB port" + servicePort, e); - } - portForwards.add(forward); - }); + for (Throwable e : forward.getClientThrowables()) { + LOGGER.error("Client error when forwarding DB port " + deployment, e); + } - LOGGER.info("Forwarding ports on service: " + dbName); + for (Throwable e : forward.getServerThrowables()) { + LOGGER.error("Server error when forwarding DB port" + dbPort, e); + } + portForward = forward; } @Override public void closeDatabasePortForwards() throws IOException { LOGGER.info("Closing port forwards"); - for (PortForward portForward : portForwards) { - portForward.close(); + portForward.close(); + portForward = null; + } + + private int getOriginalDatabasePort() { + return getService().getSpec().getPorts().stream() + .filter(p -> p.getName().equals("db")) + .findAny() + .get().getPort(); + } + + private int getAvailablePort() { + for (int i = 0; i < MAX_PORT_SEARCH_ATTEMPTS; i++) { + int portNum = ThreadLocalRandom.current().nextInt(MIN_PORT, MAX_PORT); + if (isLocalPortFree(portNum)) { + return portNum; + } + } + throw new IllegalStateException("Couldn't find free port for forwarding"); + } + + private boolean isLocalPortFree(int port) { + try { + new ServerSocket(port).close(); + return true; + } + catch (IOException e) { + return false; } } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/DatabaseController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/DatabaseController.java index 450440e44..edabce9ef 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/DatabaseController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/DatabaseController.java @@ -48,7 +48,7 @@ public interface DatabaseController> { * * @throws InterruptedException on timing issue */ - void reload() throws InterruptedException; + void reload() throws InterruptedException, IOException; /** * Database initialisation diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/db2/OcpDB2Controller.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/db2/OcpDB2Controller.java index 2999f60e0..816d1409b 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/db2/OcpDB2Controller.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/db2/OcpDB2Controller.java @@ -7,6 +7,7 @@ import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_PASSWORD; import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_USERNAME; +import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; import java.io.IOException; import java.sql.Connection; @@ -37,7 +38,9 @@ public OcpDB2Controller(Deployment deployment, List services, OpenShift @Override public void initialize() throws IOException { - forwardDatabasePorts(); + if (!isRunningFromOcp()) { + forwardDatabasePorts(); + } LOGGER.info("Waiting until DB2 instance is ready"); SqlDatabaseClient client = getDatabaseClient(DATABASE_DB2_DBZ_USERNAME, DATABASE_DB2_DBZ_PASSWORD); try (Connection connection = client.connectWithRetries()) { @@ -47,7 +50,6 @@ public void initialize() throws IOException { LOGGER.error(e.getMessage()); throw new RuntimeException(e); } - closeDatabasePortForwards(); } @Override diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/OcpMongoController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/OcpMongoController.java index 9d143b8f7..8ec2eadeb 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/OcpMongoController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/mongodb/OcpMongoController.java @@ -5,6 +5,7 @@ */ package io.debezium.testing.system.tools.databases.mongodb; +import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; import static io.debezium.testing.system.tools.WaitConditions.scaled; import java.util.List; @@ -43,6 +44,9 @@ public String getPublicDatabaseUrl() { } public void initialize() throws InterruptedException { + if (!isRunningFromOcp()) { + forwardDatabasePorts(); + } Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0); String svcName = deployment.getMetadata().getName(); CountDownLatch latch = new CountDownLatch(1); @@ -56,7 +60,6 @@ public void initialize() throws InterruptedException { LOGGER.info("Waiting until database is initialized"); latch.await(scaled(1), TimeUnit.MINUTES); } - } public MongoDatabaseClient getDatabaseClient(String username, String password) { diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/oracle/OcpOracleController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/oracle/OcpOracleController.java index c2273e032..a0bad4397 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/oracle/OcpOracleController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/oracle/OcpOracleController.java @@ -8,6 +8,7 @@ import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_PASSWORD; import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_PDBNAME; import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_USERNAME; +import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; import java.net.URISyntaxException; import java.nio.file.Path; @@ -47,6 +48,9 @@ public OcpOracleController(Deployment deployment, List services, OpenSh } public void initialize() throws InterruptedException { + if (!isRunningFromOcp()) { + forwardDatabasePorts(); + } Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0); LOGGER.info("Uploading inventory.sql to " + DB_INIT_SCRIPT_PATH_CONTAINER); ocp.pods().inNamespace(project).withName(pod.getMetadata().getName()) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/sqlserver/OcpSqlServerController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/sqlserver/OcpSqlServerController.java index 9c2b14436..5401bda51 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/sqlserver/OcpSqlServerController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/databases/sqlserver/OcpSqlServerController.java @@ -6,6 +6,7 @@ package io.debezium.testing.system.tools.databases.sqlserver; import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_SA_PASSWORD; +import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; import java.net.URISyntaxException; import java.nio.file.Path; @@ -55,6 +56,9 @@ public String getPublicDatabaseUrl() { } public void initialize() throws InterruptedException { + if (!isRunningFromOcp()) { + forwardDatabasePorts(); + } Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0); ocp.pods().inNamespace(project).withName(pod.getMetadata().getName()) .file(DB_INIT_SCRIPT_PATH_CONTAINER) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java index b39358a0b..728f2e748 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java @@ -10,6 +10,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import java.io.IOException; import java.util.Properties; /** @@ -46,7 +47,7 @@ public interface KafkaController { /** * @return default kafka consumer configuration */ - default Properties getDefaultConsumerProperties() { + default Properties getDefaultConsumerProperties() throws IOException { Properties consumerProps = new Properties(); consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress()); consumerProps.put(GROUP_ID_CONFIG, "DEBEZIUM_IT_01"); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java index 0f1aa686c..86cb6c928 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java @@ -7,15 +7,32 @@ import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; import static io.debezium.testing.system.tools.WaitConditions.scaled; +import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder.KAFKA_CERT_FILENAME; +import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder.KAFKA_CERT_SECRET; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Base64; import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.WaitConditions; import io.debezium.testing.system.tools.YAML; +import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.openshift.client.OpenShiftClient; @@ -125,4 +142,38 @@ private NonNamespaceOperation> kafkaOperation( return Crds.kafkaOperation(ocp).inNamespace(project); } + @Override + public Properties getDefaultConsumerProperties() throws IOException { + Properties kafkaConsumerProps = new Properties(); + kafkaConsumerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress()); + kafkaConsumerProps.put(GROUP_ID_CONFIG, "DEBEZIUM_IT_01"); + kafkaConsumerProps.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + kafkaConsumerProps.put(ENABLE_AUTO_COMMIT_CONFIG, false); + + kafkaConsumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getKafkaCaCertificate().getAbsolutePath()); + kafkaConsumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + kafkaConsumerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + return kafkaConsumerProps; + } + + private File getKafkaCaCertificate() throws IOException { + // get kafka cluster ca secret + Secret secret = ocp.secrets().inNamespace(project).withName(KAFKA_CERT_SECRET).get(); + if (secret == null) { + throw new IllegalStateException("Kafka cluster certificate secret not found"); + } + + // download and decode certificate + String cert = secret.getData().get(KAFKA_CERT_FILENAME); + byte[] decodedBytes = Base64.getDecoder().decode(cert); + cert = new String(decodedBytes); + + // save to local file + File crtFile = Files.createTempFile("kafka-cert-", null).toFile(); + try (BufferedWriter writer = new BufferedWriter(new FileWriter(crtFile, UTF_8))) { + writer.write(cert); + writer.flush(); + } + return crtFile; + } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java index 335a2a44a..bac44784b 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java @@ -119,17 +119,18 @@ private static List defaultKafkaListeners() { .withTls(true) .build(); - GenericKafkaListener loadBalancerExternal = new GenericKafkaListenerBuilder() + GenericKafkaListener routeExternal = new GenericKafkaListenerBuilder() .withName("external") .withPort(9094) - .withType(KafkaListenerType.LOADBALANCER) + .withType(KafkaListenerType.ROUTE) + .withTls(true) .build(); // External services not needed when running from inside OCP if (isRunningFromOcp()) { return Arrays.asList(plainInternal, tlsInternal); } - return Arrays.asList(plainInternal, tlsInternal, loadBalancerExternal); + return Arrays.asList(plainInternal, tlsInternal, routeExternal); } private static Map defaultKafkaConfig() { diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java index 5025e88a8..d42168a62 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java @@ -5,6 +5,8 @@ */ package io.debezium.testing.system.tools.kafka.builders; +import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder.DEFAULT_KAFKA_NAME; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -35,7 +37,9 @@ public class FabricKafkaConnectBuilder extends FabricBuilderWrapper { public static String DEFAULT_KC_NAME = "debezium-kafka-connect-cluster"; - public static String DEFAULT_BOOSTRAP_SERVER = FabricKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093"; + public static String KAFKA_CERT_SECRET = DEFAULT_KAFKA_NAME + "-cluster-ca-cert"; + public static String KAFKA_CERT_FILENAME = "ca.crt"; + public static String DEFAULT_BOOSTRAP_SERVER = DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093"; protected FabricKafkaConnectBuilder(KafkaConnectBuilder builder) { super(builder); @@ -209,8 +213,8 @@ private static ClientTls defaultTLS() { return new ClientTlsBuilder() .withTrustedCertificates( new CertSecretSourceBuilder() - .withCertificate("ca.crt") - .withSecretName("debezium-kafka-cluster-cluster-ca-cert") + .withCertificate(KAFKA_CERT_FILENAME) + .withSecretName(KAFKA_CERT_SECRET) .build()) .build(); } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/DockerKafka.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/DockerKafka.java index a206589a0..9c0115105 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/DockerKafka.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/DockerKafka.java @@ -5,6 +5,8 @@ */ package io.debezium.testing.system.fixtures.kafka; +import java.io.IOException; + import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.extension.ExtensionContext; import org.testcontainers.containers.Network; @@ -33,7 +35,7 @@ public DockerKafka(@NotNull ExtensionContext.Store store) { } @Override - public void setup() { + public void setup() throws IOException { DockerKafkaDeployer kafkaDeployer = new DockerKafkaDeployer.Builder() .withNetwork(network) .build(); diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/MongoConnectorTest.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/MongoConnectorTest.java index cf08a1cee..266473a90 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/MongoConnectorTest.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/MongoConnectorTest.java @@ -5,16 +5,7 @@ */ package io.debezium.testing.system.tests; -import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; - -import java.io.IOException; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - import io.debezium.testing.system.assertions.KafkaAssertions; -import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController; -import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.KafkaConnectController; import io.debezium.testing.system.tools.kafka.KafkaController; @@ -24,18 +15,4 @@ public MongoConnectorTest(KafkaController kafkaController, KafkaConnectControlle KafkaAssertions assertions) { super(kafkaController, connectController, connectorConfig, assertions); } - - @BeforeEach - public void setUpPortForward(MongoDatabaseController dbController) { - if (!isRunningFromOcp()) { - ((PortForwardableDatabaseController) dbController).forwardDatabasePorts(); - } - } - - @AfterEach - public void closePortForward(MongoDatabaseController dbController) throws IOException { - if (!isRunningFromOcp()) { - ((PortForwardableDatabaseController) dbController).closeDatabasePortForwards(); - } - } } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/SqlConnectorTest.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/SqlConnectorTest.java index 5eafa0378..c0d90ae3b 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/SqlConnectorTest.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/SqlConnectorTest.java @@ -5,16 +5,7 @@ */ package io.debezium.testing.system.tests; -import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp; - -import java.io.IOException; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; - import io.debezium.testing.system.assertions.KafkaAssertions; -import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController; -import io.debezium.testing.system.tools.databases.SqlDatabaseController; import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; import io.debezium.testing.system.tools.kafka.KafkaConnectController; import io.debezium.testing.system.tools.kafka.KafkaController; @@ -24,18 +15,4 @@ public SqlConnectorTest(KafkaController kafkaController, KafkaConnectController KafkaAssertions assertions) { super(kafkaController, connectController, connectorConfig, assertions); } - - @BeforeEach - public void setUpPortForward(SqlDatabaseController dbController) { - if (!isRunningFromOcp()) { - ((PortForwardableDatabaseController) dbController).forwardDatabasePorts(); - } - } - - @AfterEach - public void closePortForward(SqlDatabaseController dbController) throws IOException { - if (!isRunningFromOcp()) { - ((PortForwardableDatabaseController) dbController).closeDatabasePortForwards(); - } - } }