DBZ-2193 replaced kafkaCluster loadBalancer with route, port forward local port is now chosen as first free random port.

This commit is contained in:
Martin Medek 2022-12-09 13:28:11 +01:00 committed by Ondrej Babec
parent ed99a88fdb
commit 4a9e2816d6
13 changed files with 154 additions and 84 deletions

View File

@ -11,8 +11,9 @@
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.LinkedList;
import java.net.ServerSocket;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +34,9 @@ public abstract class AbstractOcpDatabaseController<C extends DatabaseClient<?,
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOcpDatabaseController.class);
private static final String FORWARDED_HOST = "localhost";
private static final int MAX_PORT_SEARCH_ATTEMPTS = 20;
private static final int MIN_PORT = 32768;
private static final int MAX_PORT = 60999;
protected final OpenShiftClient ocp;
protected final String project;
@ -40,7 +44,9 @@ public abstract class AbstractOcpDatabaseController<C extends DatabaseClient<?,
protected Deployment deployment;
protected String name;
protected List<Service> services;
protected List<PortForward> portForwards = new LinkedList<>();
protected PortForward portForward;
private int localPort;
public AbstractOcpDatabaseController(
Deployment deployment, List<Service> services, OpenShiftClient ocp) {
@ -61,7 +67,10 @@ private Service getService() {
}
@Override
public void reload() throws InterruptedException {
public void reload() throws InterruptedException, IOException {
if (!isRunningFromOcp()) {
closeDatabasePortForwards();
}
LOGGER.info("Removing all pods of '" + name + "' deployment in namespace '" + project + "'");
ocp.apps().deployments().inNamespace(project).withName(name).scale(0);
await()
@ -80,16 +89,12 @@ public String getDatabaseHostname() {
@Override
public int getDatabasePort() {
return getService().getSpec().getPorts().stream()
.filter(p -> p.getName().equals("db"))
.findAny()
.get().getPort();
return getOriginalDatabasePort();
}
@Override
public String getPublicDatabaseHostname() {
if (isRunningFromOcp()) {
LOGGER.info("Running from OCP, using internal database hostname");
return getDatabaseHostname();
}
return FORWARDED_HOST;
@ -97,37 +102,76 @@ public String getPublicDatabaseHostname() {
@Override
public int getPublicDatabasePort() {
return getDatabasePort();
if (isRunningFromOcp()) {
return getDatabasePort();
}
return localPort;
}
@Override
public void initialize() throws InterruptedException, IOException {
if (!isRunningFromOcp()) {
forwardDatabasePorts();
}
}
@Override
public void forwardDatabasePorts() {
String dbName = deployment.getMetadata().getLabels().get("app");
ServiceResource<Service> serviceResource = ocp.services().inNamespace(project).withName(dbName);
if (portForward != null) {
LOGGER.warn("Calling port forward when forward already on " + getOriginalDatabasePort() + "->" + localPort);
return;
}
String serviceName = getService().getMetadata().getName();
ServiceResource<Service> serviceResource = ocp.services().inNamespace(project).withName(serviceName);
int dbPort = getOriginalDatabasePort();
localPort = getAvailablePort();
serviceResource.get().getSpec().getPorts().forEach(port -> {
int servicePort = port.getPort();
PortForward forward = serviceResource
.portForward(servicePort, servicePort);
LOGGER.info("Forwarding ports " + dbPort + "->" + localPort + " on service: " + serviceName);
for (Throwable e : forward.getClientThrowables()) {
LOGGER.error("Client error when forwarding DB port " + servicePort, e);
}
PortForward forward = serviceResource
.portForward(dbPort, localPort);
for (Throwable e : forward.getServerThrowables()) {
LOGGER.error("Server error when forwarding DB port" + servicePort, e);
}
portForwards.add(forward);
});
for (Throwable e : forward.getClientThrowables()) {
LOGGER.error("Client error when forwarding DB port " + deployment, e);
}
LOGGER.info("Forwarding ports on service: " + dbName);
for (Throwable e : forward.getServerThrowables()) {
LOGGER.error("Server error when forwarding DB port" + dbPort, e);
}
portForward = forward;
}
@Override
public void closeDatabasePortForwards() throws IOException {
LOGGER.info("Closing port forwards");
for (PortForward portForward : portForwards) {
portForward.close();
portForward.close();
portForward = null;
}
private int getOriginalDatabasePort() {
return getService().getSpec().getPorts().stream()
.filter(p -> p.getName().equals("db"))
.findAny()
.get().getPort();
}
private int getAvailablePort() {
for (int i = 0; i < MAX_PORT_SEARCH_ATTEMPTS; i++) {
int portNum = ThreadLocalRandom.current().nextInt(MIN_PORT, MAX_PORT);
if (isLocalPortFree(portNum)) {
return portNum;
}
}
throw new IllegalStateException("Couldn't find free port for forwarding");
}
private boolean isLocalPortFree(int port) {
try {
new ServerSocket(port).close();
return true;
}
catch (IOException e) {
return false;
}
}
}

View File

@ -48,7 +48,7 @@ public interface DatabaseController<C extends DatabaseClient<?, ?>> {
*
* @throws InterruptedException on timing issue
*/
void reload() throws InterruptedException;
void reload() throws InterruptedException, IOException;
/**
* Database initialisation

View File

@ -7,6 +7,7 @@
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_PASSWORD;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_USERNAME;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.io.IOException;
import java.sql.Connection;
@ -37,7 +38,9 @@ public OcpDB2Controller(Deployment deployment, List<Service> services, OpenShift
@Override
public void initialize() throws IOException {
forwardDatabasePorts();
if (!isRunningFromOcp()) {
forwardDatabasePorts();
}
LOGGER.info("Waiting until DB2 instance is ready");
SqlDatabaseClient client = getDatabaseClient(DATABASE_DB2_DBZ_USERNAME, DATABASE_DB2_DBZ_PASSWORD);
try (Connection connection = client.connectWithRetries()) {
@ -47,7 +50,6 @@ public void initialize() throws IOException {
LOGGER.error(e.getMessage());
throw new RuntimeException(e);
}
closeDatabasePortForwards();
}
@Override

View File

@ -5,6 +5,7 @@
*/
package io.debezium.testing.system.tools.databases.mongodb;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import static io.debezium.testing.system.tools.WaitConditions.scaled;
import java.util.List;
@ -43,6 +44,9 @@ public String getPublicDatabaseUrl() {
}
public void initialize() throws InterruptedException {
if (!isRunningFromOcp()) {
forwardDatabasePorts();
}
Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0);
String svcName = deployment.getMetadata().getName();
CountDownLatch latch = new CountDownLatch(1);
@ -56,7 +60,6 @@ public void initialize() throws InterruptedException {
LOGGER.info("Waiting until database is initialized");
latch.await(scaled(1), TimeUnit.MINUTES);
}
}
public MongoDatabaseClient getDatabaseClient(String username, String password) {

View File

@ -8,6 +8,7 @@
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_PASSWORD;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_PDBNAME;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_ORACLE_USERNAME;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.net.URISyntaxException;
import java.nio.file.Path;
@ -47,6 +48,9 @@ public OcpOracleController(Deployment deployment, List<Service> services, OpenSh
}
public void initialize() throws InterruptedException {
if (!isRunningFromOcp()) {
forwardDatabasePorts();
}
Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0);
LOGGER.info("Uploading inventory.sql to " + DB_INIT_SCRIPT_PATH_CONTAINER);
ocp.pods().inNamespace(project).withName(pod.getMetadata().getName())

View File

@ -6,6 +6,7 @@
package io.debezium.testing.system.tools.databases.sqlserver;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_SQLSERVER_SA_PASSWORD;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.net.URISyntaxException;
import java.nio.file.Path;
@ -55,6 +56,9 @@ public String getPublicDatabaseUrl() {
}
public void initialize() throws InterruptedException {
if (!isRunningFromOcp()) {
forwardDatabasePorts();
}
Pod pod = ocp.pods().inNamespace(project).withLabel("deployment", name).list().getItems().get(0);
ocp.pods().inNamespace(project).withName(pod.getMetadata().getName())
.file(DB_INIT_SCRIPT_PATH_CONTAINER)

View File

@ -10,6 +10,7 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import java.io.IOException;
import java.util.Properties;
/**
@ -46,7 +47,7 @@ public interface KafkaController {
/**
* @return default kafka consumer configuration
*/
default Properties getDefaultConsumerProperties() {
default Properties getDefaultConsumerProperties() throws IOException {
Properties consumerProps = new Properties();
consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress());
consumerProps.put(GROUP_ID_CONFIG, "DEBEZIUM_IT_01");

View File

@ -7,15 +7,32 @@
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import static io.debezium.testing.system.tools.WaitConditions.scaled;
import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder.KAFKA_CERT_FILENAME;
import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder.KAFKA_CERT_SECRET;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Base64;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.WaitConditions;
import io.debezium.testing.system.tools.YAML;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.client.OpenShiftClient;
@ -125,4 +142,38 @@ private NonNamespaceOperation<Kafka, KafkaList, Resource<Kafka>> kafkaOperation(
return Crds.kafkaOperation(ocp).inNamespace(project);
}
@Override
public Properties getDefaultConsumerProperties() throws IOException {
Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress());
kafkaConsumerProps.put(GROUP_ID_CONFIG, "DEBEZIUM_IT_01");
kafkaConsumerProps.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaConsumerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getKafkaCaCertificate().getAbsolutePath());
kafkaConsumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
kafkaConsumerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
return kafkaConsumerProps;
}
private File getKafkaCaCertificate() throws IOException {
// get kafka cluster ca secret
Secret secret = ocp.secrets().inNamespace(project).withName(KAFKA_CERT_SECRET).get();
if (secret == null) {
throw new IllegalStateException("Kafka cluster certificate secret not found");
}
// download and decode certificate
String cert = secret.getData().get(KAFKA_CERT_FILENAME);
byte[] decodedBytes = Base64.getDecoder().decode(cert);
cert = new String(decodedBytes);
// save to local file
File crtFile = Files.createTempFile("kafka-cert-", null).toFile();
try (BufferedWriter writer = new BufferedWriter(new FileWriter(crtFile, UTF_8))) {
writer.write(cert);
writer.flush();
}
return crtFile;
}
}

View File

@ -119,17 +119,18 @@ private static List<GenericKafkaListener> defaultKafkaListeners() {
.withTls(true)
.build();
GenericKafkaListener loadBalancerExternal = new GenericKafkaListenerBuilder()
GenericKafkaListener routeExternal = new GenericKafkaListenerBuilder()
.withName("external")
.withPort(9094)
.withType(KafkaListenerType.LOADBALANCER)
.withType(KafkaListenerType.ROUTE)
.withTls(true)
.build();
// External services not needed when running from inside OCP
if (isRunningFromOcp()) {
return Arrays.asList(plainInternal, tlsInternal);
}
return Arrays.asList(plainInternal, tlsInternal, loadBalancerExternal);
return Arrays.asList(plainInternal, tlsInternal, routeExternal);
}
private static Map<String, Object> defaultKafkaConfig() {

View File

@ -5,6 +5,8 @@
*/
package io.debezium.testing.system.tools.kafka.builders;
import static io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder.DEFAULT_KAFKA_NAME;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -35,7 +37,9 @@
public class FabricKafkaConnectBuilder extends
FabricBuilderWrapper<FabricKafkaConnectBuilder, KafkaConnectBuilder, KafkaConnect> {
public static String DEFAULT_KC_NAME = "debezium-kafka-connect-cluster";
public static String DEFAULT_BOOSTRAP_SERVER = FabricKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093";
public static String KAFKA_CERT_SECRET = DEFAULT_KAFKA_NAME + "-cluster-ca-cert";
public static String KAFKA_CERT_FILENAME = "ca.crt";
public static String DEFAULT_BOOSTRAP_SERVER = DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093";
protected FabricKafkaConnectBuilder(KafkaConnectBuilder builder) {
super(builder);
@ -209,8 +213,8 @@ private static ClientTls defaultTLS() {
return new ClientTlsBuilder()
.withTrustedCertificates(
new CertSecretSourceBuilder()
.withCertificate("ca.crt")
.withSecretName("debezium-kafka-cluster-cluster-ca-cert")
.withCertificate(KAFKA_CERT_FILENAME)
.withSecretName(KAFKA_CERT_SECRET)
.build())
.build();
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.testing.system.fixtures.kafka;
import java.io.IOException;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.Network;
@ -33,7 +35,7 @@ public DockerKafka(@NotNull ExtensionContext.Store store) {
}
@Override
public void setup() {
public void setup() throws IOException {
DockerKafkaDeployer kafkaDeployer = new DockerKafkaDeployer.Builder()
.withNetwork(network)
.build();

View File

@ -5,16 +5,7 @@
*/
package io.debezium.testing.system.tests;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
@ -24,18 +15,4 @@ public MongoConnectorTest(KafkaController kafkaController, KafkaConnectControlle
KafkaAssertions<?, ?> assertions) {
super(kafkaController, connectController, connectorConfig, assertions);
}
@BeforeEach
public void setUpPortForward(MongoDatabaseController dbController) {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).forwardDatabasePorts();
}
}
@AfterEach
public void closePortForward(MongoDatabaseController dbController) throws IOException {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).closeDatabasePortForwards();
}
}
}

View File

@ -5,16 +5,7 @@
*/
package io.debezium.testing.system.tests;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
@ -24,18 +15,4 @@ public SqlConnectorTest(KafkaController kafkaController, KafkaConnectController
KafkaAssertions<?, ?> assertions) {
super(kafkaController, connectController, connectorConfig, assertions);
}
@BeforeEach
public void setUpPortForward(SqlDatabaseController dbController) {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).forwardDatabasePorts();
}
}
@AfterEach
public void closePortForward(SqlDatabaseController dbController) throws IOException {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).closeDatabasePortForwards();
}
}
}