DBZ-2193 Created interface for the port forwardble database controller, moved the methods to be called explicitly in test setup and cleanup

Fixed mongo parameters in ConnectorFactories
This commit is contained in:
Martin Medek 2022-08-16 00:02:34 +02:00 committed by Ondrej Babec
parent e4f713e018
commit ed99a88fdb
20 changed files with 154 additions and 102 deletions

View File

@ -10,6 +10,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
@ -18,6 +20,8 @@
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.PortForward;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.client.OpenShiftClient;
/**
@ -25,15 +29,18 @@
* @author Jakub Cechacek
*/
public abstract class AbstractOcpDatabaseController<C extends DatabaseClient<?, ?>>
implements DatabaseController<C> {
implements DatabaseController<C>, PortForwardableDatabaseController {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractOcpDatabaseController.class);
private static final String FORWARDED_HOST = "localhost";
protected final OpenShiftClient ocp;
protected final String project;
protected final OpenShiftUtils ocpUtils;
protected Deployment deployment;
protected String name;
protected List<Service> services;
protected List<PortForward> portForwards = new LinkedList<>();
public AbstractOcpDatabaseController(
Deployment deployment, List<Service> services, OpenShiftClient ocp) {
@ -85,11 +92,42 @@ public String getPublicDatabaseHostname() {
LOGGER.info("Running from OCP, using internal database hostname");
return getDatabaseHostname();
}
return "localhost";
return FORWARDED_HOST;
}
@Override
public int getPublicDatabasePort() {
return getDatabasePort();
}
@Override
public void forwardDatabasePorts() {
String dbName = deployment.getMetadata().getLabels().get("app");
ServiceResource<Service> serviceResource = ocp.services().inNamespace(project).withName(dbName);
serviceResource.get().getSpec().getPorts().forEach(port -> {
int servicePort = port.getPort();
PortForward forward = serviceResource
.portForward(servicePort, servicePort);
for (Throwable e : forward.getClientThrowables()) {
LOGGER.error("Client error when forwarding DB port " + servicePort, e);
}
for (Throwable e : forward.getServerThrowables()) {
LOGGER.error("Server error when forwarding DB port" + servicePort, e);
}
portForwards.add(forward);
});
LOGGER.info("Forwarding ports on service: " + dbName);
}
@Override
public void closeDatabasePortForwards() throws IOException {
LOGGER.info("Closing port forwards");
for (PortForward portForward : portForwards) {
portForward.close();
}
}
}

View File

@ -20,7 +20,6 @@
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.client.OpenShiftClient;
/**
@ -80,13 +79,6 @@ public T deploy() {
this.services = svcs;
String dbName = deployment.getMetadata().getLabels().get("app");
ServiceResource<Service> serviceResource = ocp.services().inNamespace(project).withName(dbName);
int servicePort = serviceResource.get().getSpec().getPorts().get(0).getPort();
serviceResource
.portForward(servicePort, servicePort);
LOGGER.info("FORWARDING PORT " + servicePort + " ON " + dbName + " SERVICE");
return getController(deployment, services, ocp);
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.testing.system.tools.databases;
import java.io.IOException;
public interface DatabaseController<C extends DatabaseClient<?, ?>> {
/**
@ -52,7 +54,7 @@ public interface DatabaseController<C extends DatabaseClient<?, ?>> {
* Database initialisation
* @throws InterruptedException on timing issue
*/
default void initialize() throws InterruptedException {
default void initialize() throws InterruptedException, IOException {
// no-op
}

View File

@ -0,0 +1,14 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.databases;
import java.io.IOException;
public interface PortForwardableDatabaseController {
void forwardDatabasePorts();
void closeDatabasePortForwards() throws IOException;
}

View File

@ -8,6 +8,7 @@
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_PASSWORD;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_DB2_DBZ_USERNAME;
import java.io.IOException;
import java.sql.Connection;
import java.util.List;
@ -35,7 +36,8 @@ public OcpDB2Controller(Deployment deployment, List<Service> services, OpenShift
}
@Override
public void initialize() {
public void initialize() throws IOException {
forwardDatabasePorts();
LOGGER.info("Waiting until DB2 instance is ready");
SqlDatabaseClient client = getDatabaseClient(DATABASE_DB2_DBZ_USERNAME, DATABASE_DB2_DBZ_PASSWORD);
try (Connection connection = client.connectWithRetries()) {
@ -45,6 +47,7 @@ public void initialize() {
LOGGER.error(e.getMessage());
throw new RuntimeException(e);
}
closeDatabasePortForwards();
}
@Override

View File

@ -18,7 +18,6 @@
public class OcpMongo extends OcpDatabaseFixture<MongoDatabaseController> {
public static final String DB_DEPLOYMENT_PATH = "/database-resources/mongodb/deployment.yaml";
public static final String DB_SERVICE_PATH_LB = "/database-resources/mongodb/service-lb.yaml";
public static final String DB_SERVICE_PATH = "/database-resources/mongodb/service.yaml";
public OcpMongo(ExtensionContext.Store store) {

View File

@ -92,7 +92,7 @@ public ConnectorConfigBuilder mongo(MongoDatabaseController controller, String c
int dbPort = controller.getDatabasePort();
return cb
.put("mongodb.name", cb.getDbServerName())
.put("topic.prefix", cb.getDbServerName())
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 1)
.put("mongodb.hosts", "rs0/" + dbHost + ":" + dbPort)

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tests;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
public class MongoConnectorTest extends ConnectorTest {
public MongoConnectorTest(KafkaController kafkaController, KafkaConnectController connectController, ConnectorConfigBuilder connectorConfig,
KafkaAssertions<?, ?> assertions) {
super(kafkaController, connectController, connectorConfig, assertions);
}
@BeforeEach
public void setUpPortForward(MongoDatabaseController dbController) {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).forwardDatabasePorts();
}
}
@AfterEach
public void closePortForward(MongoDatabaseController dbController) throws IOException {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).closeDatabasePortForwards();
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tests;
import static io.debezium.testing.system.tools.OpenShiftUtils.isRunningFromOcp;
import java.io.IOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.databases.PortForwardableDatabaseController;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
public class SqlConnectorTest extends ConnectorTest {
public SqlConnectorTest(KafkaController kafkaController, KafkaConnectController connectController, ConnectorConfigBuilder connectorConfig,
KafkaAssertions<?, ?> assertions) {
super(kafkaController, connectController, connectorConfig, assertions);
}
@BeforeEach
public void setUpPortForward(SqlDatabaseController dbController) {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).forwardDatabasePorts();
}
}
@AfterEach
public void closePortForward(SqlDatabaseController dbController) throws IOException {
if (!isRunningFromOcp()) {
((PortForwardableDatabaseController) dbController).closeDatabasePortForwards();
}
}
}

View File

@ -16,7 +16,7 @@
import org.junit.jupiter.api.Test;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tests.SqlConnectorTest;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
@ -27,7 +27,7 @@
import okhttp3.Request;
import okhttp3.Response;
public abstract class Db2Tests extends ConnectorTest {
public abstract class Db2Tests extends SqlConnectorTest {
public Db2Tests(
KafkaController kafkaController,

View File

@ -22,7 +22,7 @@
import com.mongodb.client.model.Updates;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tests.MongoConnectorTest;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
@ -33,7 +33,7 @@
import okhttp3.Request;
import okhttp3.Response;
public abstract class MongoTests extends ConnectorTest {
public abstract class MongoTests extends MongoConnectorTest {
public MongoTests(
KafkaController kafkaController,

View File

@ -16,7 +16,7 @@
import org.junit.jupiter.api.Test;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tests.SqlConnectorTest;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
@ -27,7 +27,7 @@
import okhttp3.Request;
import okhttp3.Response;
public abstract class MySqlTests extends ConnectorTest {
public abstract class MySqlTests extends SqlConnectorTest {
public MySqlTests(
KafkaController kafkaController,

View File

@ -16,7 +16,7 @@
import org.junit.jupiter.api.Test;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tests.SqlConnectorTest;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
@ -27,7 +27,7 @@
import okhttp3.Request;
import okhttp3.Response;
public abstract class OracleTests extends ConnectorTest {
public abstract class OracleTests extends SqlConnectorTest {
public OracleTests(
KafkaController kafkaController,

View File

@ -17,7 +17,7 @@
import org.junit.jupiter.api.Test;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tests.SqlConnectorTest;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
@ -28,7 +28,7 @@
import okhttp3.Request;
import okhttp3.Response;
public abstract class PostgreSqlTests extends ConnectorTest {
public abstract class PostgreSqlTests extends SqlConnectorTest {
public PostgreSqlTests(
KafkaController kafkaController,

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: db2-lb
spec:
selector:
app: db2
deployment: db2
type: LoadBalancer
ports:
- name: db
port: 50000
targetPort: 50000

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: mongo-lb
spec:
selector:
app: mongo
deployment: mongo
type: LoadBalancer
ports:
- name: db
port: 27017
targetPort: 27017

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: mysql-lb
spec:
selector:
app: mysql
deployment: mysql
type: LoadBalancer
ports:
- name: db
port: 3306
targetPort: 3306

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: oracle-lb
spec:
selector:
app: oracle
deployment: oracle
type: LoadBalancer
ports:
- name: db
port: 1521
targetPort: 1521

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: postgresql-lb
spec:
selector:
app: postgresql
deployment: postgresql
type: LoadBalancer
ports:
- name: db
port: 5432
targetPort: 5432

View File

@ -1,13 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: sqlserver-lb
spec:
selector:
app: sqlserver
deployment: sqlserver
type: LoadBalancer
ports:
- name: db
port: 1433
targetPort: 1433