From 93a3a6c5bba53538b006d87af1b702669079b6d5 Mon Sep 17 00:00:00 2001 From: jcechace Date: Mon, 1 Jun 2020 19:07:24 +0200 Subject: [PATCH] DBZ-2132 tools refactoring in preparation for Mongo and SQL Server ocp tests --- .../debezium-testing-openshift/pom.xml | 60 ++++++++++++++++++- .../openshift/tools/databases/Commands.java | 40 +++++++++++++ .../tools/databases/DatabaseClient.java | 15 +++++ .../tools/databases/DatabaseController.java | 46 ++++++-------- .../tools/databases/DatabaseDeployer.java | 8 ++- .../tools/databases/SqlDatabaseClient.java | 52 ++++++++++++++++ .../databases/SqlDatabaseController.java | 38 ++++++++++++ .../databases/{ => mysql}/MySqlDeployer.java | 14 ++++- .../{ => postgresql}/PostgreSqlDeployer.java | 14 ++++- .../tools/kafka/ConnectorConfigBuilder.java | 6 +- .../tools/kafka/KafkaConnectController.java | 21 ++++++- .../testing/openshift/ConnectorTestBase.java | 14 ++++- .../openshift/mysql/MySqlConnectorIT.java | 47 +++++++-------- .../postgresql/PostgreSqlConnectorIT.java | 47 +++++++-------- .../openshift/resources/ConfigProperties.java | 19 ++++++ .../resources/ConnectorFactories.java | 30 ++++++++++ .../pipelines/openshift_pipeline.groovy | 10 +++- 17 files changed, 388 insertions(+), 93 deletions(-) create mode 100644 debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/Commands.java create mode 100644 debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseClient.java create mode 100644 debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseClient.java create mode 100644 debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseController.java rename debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/{ => mysql}/MySqlDeployer.java (51%) rename debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/{ => postgresql}/PostgreSqlDeployer.java (51%) diff --git a/debezium-testing/debezium-testing-openshift/pom.xml b/debezium-testing/debezium-testing-openshift/pom.xml index c875e6be0..747ec93fa 100644 --- a/debezium-testing/debezium-testing-openshift/pom.xml +++ b/debezium-testing/debezium-testing-openshift/pom.xml @@ -13,8 +13,10 @@ Debezium OpenShift integration test-suite - 4.6.4 - 0.16.2 + 4.10.1 + 1.14 + 1.20 + 0.18.0 5.5.1 0.5.1 3.11.1 @@ -27,6 +29,7 @@ debezium debezium-mysql debezium-postgresql + debezium-mongo true @@ -39,13 +42,31 @@ dbz debezium - + 5432 debezium debezium debezium debezium debezium + + + 5432 + sa + Debezium1$ + sa + Debezium$1 + TestDB + + + 27017 + admin + admin + debezium + dbz + inventory + admin + @@ -67,6 +88,19 @@ ${version.fabric8.client} + + commons-codec + commons-codec + ${version.commons.codec} + + + + org.apache.commons + commons-compress + ${version.commons.compress} + true + + org.apache.kafka kafka-clients @@ -164,6 +198,16 @@ org.postgresql postgresql + + + com.microsoft.sqlserver + mssql-jdbc + + + + org.mongodb + mongodb-driver + @@ -277,6 +321,7 @@ ${ocp.project.debezium} ${ocp.project.mysql} ${ocp.project.postgresql} + ${ocp.project.mongo} ${strimzi.operator.connectors} ${database.mysql.host} @@ -293,6 +338,15 @@ ${database.postgresql.dbz.username} ${database.postgresql.dbz.password} ${database.postgresql.dbname} + + ${database.mongo.host} + ${database.mongo.port} + ${database.mongo.username} + ${database.mongo.password} + ${database.mongo.dbz.username} + ${database.mongo.dbz.password} + ${database.mongo.dbz.dbname} + ${database.mongo.dbz.login.dbname} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/Commands.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/Commands.java new file mode 100644 index 000000000..551162d66 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/Commands.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import java.util.Objects; +import java.util.function.Consumer; + +/** + * An adaptation of {@link Consumer} which allows for exceptions to pass through + * @author Jakub Cechacek + */ +@FunctionalInterface +public interface Commands { + + /** + * Performs this operation on the given argument. + * + * @param t the input argument + * @throws E possibly thrown exception + */ + void execute(T t) throws E; + + /** + * See {@link Consumer} + * @param after the operation to perform after this operation + * @return a composed {@code Consumer} that performs in sequence this + * operation followed by the {@code after} operation + * @throws NullPointerException if {@code after} is null + */ + default Commands andThen(Commands after) { + Objects.requireNonNull(after); + return (T t) -> { + execute(t); + after.execute(t); + }; + } +} \ No newline at end of file diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseClient.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseClient.java new file mode 100644 index 000000000..43250020b --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseClient.java @@ -0,0 +1,15 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +/** + * + * @author Jakub Cechacek + */ +public interface DatabaseClient { + + void execute(Commands commands) throws E; +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java index 757859a0f..1fddd12b5 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java @@ -5,10 +5,6 @@ */ package io.debezium.testing.openshift.tools.databases; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; import java.util.List; import java.util.concurrent.TimeUnit; @@ -16,6 +12,7 @@ import org.slf4j.LoggerFactory; import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.debezium.testing.openshift.tools.WaitConditions; import io.fabric8.kubernetes.api.model.LoadBalancerIngress; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; @@ -25,16 +22,16 @@ * * @author Jakub Cechacek */ -public class DatabaseController { +public abstract class DatabaseController> { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseController.class); - private final OpenShiftClient ocp; - private final String project; - private final String dbType; - private final OpenShiftUtils ocpUtils; - private Deployment deployment; - private String name; - private List services; + protected final OpenShiftClient ocp; + protected final String project; + protected final String dbType; + protected final OpenShiftUtils ocpUtils; + protected Deployment deployment; + protected String name; + protected List services; public DatabaseController(Deployment deployment, List services, String dbType, OpenShiftClient ocp) { this.deployment = deployment; @@ -55,30 +52,23 @@ public String getDatabaseUrl() { LoadBalancerIngress ingress = svc.getStatus().getLoadBalancer().getIngress().get(0); String hostname = ingress.getHostname(); Integer port = svc.getSpec().getPorts().stream().filter(p -> p.getName().equals("db")).findAny().get().getPort(); - - return "jdbc:" + dbType + "://" + hostname + ":" + port + "/"; + return constructDatabaseUrl(hostname, port); } - - public void executeStatement(String database, String username, String password, String sql) throws SQLException { - try (Connection con = DriverManager.getConnection(getDatabaseUrl(), username, password)) { - con.setCatalog(database); - Statement stmt = con.createStatement(); - stmt.execute(sql); - } - } - + public void reload() throws InterruptedException { LOGGER.info("Recreating all pods of '" + name + "' deployment in namespace '" + project + "'"); ocp.pods().inNamespace(project).withLabel("deployment", name).delete(); deployment = ocp.apps().deployments() .inNamespace(project) .withName(name) - .waitUntilCondition(this::deploymentAvailableCondition, 30, TimeUnit.SECONDS); + .waitUntilCondition(WaitConditions::deploymentAvailableCondition, 1, TimeUnit.MINUTES); LOGGER.info("Deployment '" + name + "' is available"); + initialize(); } - private boolean deploymentAvailableCondition(Deployment d) { - return d.getStatus() != null && - d.getStatus().getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Available") && c.getStatus().equalsIgnoreCase("true")); - } + public abstract void initialize() throws InterruptedException; + + public abstract C getDatabaseClient(String username, String password); + + protected abstract String constructDatabaseUrl(String hostname, int port); } diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java index b0d8e0d0f..94b4ca57b 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java @@ -23,7 +23,7 @@ /** * @author Jakub Cechacek */ -public abstract class DatabaseDeployer { +public abstract class DatabaseDeployer { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseDeployer.class); private final OpenShiftClient ocp; @@ -64,7 +64,7 @@ public T withServices(Collection services) { return getThis(); } - public DatabaseController deploy() { + public C deploy() { if (deployment == null) { throw new IllegalStateException("Deployment configuration not available"); } @@ -81,8 +81,10 @@ public DatabaseController deploy() { this.deployment = dep; this.services = svcs; - return new DatabaseController(dep, services, dbType, ocp); + return getController(dep, services, dbType, ocp); } public abstract T getThis(); + + public abstract C getController(Deployment deployment, List services, String dbType, OpenShiftClient ocp); } diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseClient.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseClient.java new file mode 100644 index 000000000..1ea31aaf8 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseClient.java @@ -0,0 +1,52 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Jakub Cechacek + */ +public class SqlDatabaseClient implements DatabaseClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseController.class); + + private String url; + private String username; + private String password; + + public SqlDatabaseClient(String url, String username, String password) { + this.url = url; + this.username = username; + this.password = password; + } + + public void execute(Commands commands) throws SQLException { + try (Connection con = DriverManager.getConnection(url, username, password)) { + commands.execute(con); + } + } + + public void execute(String database, Commands commands) throws SQLException { + Commands withDatabase = con -> con.setCatalog(database); + execute(con -> withDatabase.andThen(commands).execute(con)); + } + + public void execute(String database, String command) throws SQLException { + LOGGER.info("Running SQL Command [%s]: %s", database, command); + execute(database, con -> { + Statement stmt = con.createStatement(); + stmt.execute(command); + }); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseController.java new file mode 100644 index 000000000..49fe59b6b --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/SqlDatabaseController.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * + * @author Jakub Cechacek + */ +public class SqlDatabaseController extends DatabaseController { + + public SqlDatabaseController(Deployment deployment, List services, String dbType, OpenShiftClient ocp) { + super(deployment, services, dbType, ocp); + } + + protected String constructDatabaseUrl(String hostname, int port) { + return "jdbc:" + dbType + "://" + hostname + ":" + port + "/"; + } + + @Override + public void initialize() { + // no-op + } + + @Override + public SqlDatabaseClient getDatabaseClient(String username, String password) { + return new SqlDatabaseClient(getDatabaseUrl(), username, password); + } + +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/mysql/MySqlDeployer.java similarity index 51% rename from debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java rename to debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/mysql/MySqlDeployer.java index 90fafd8ad..d7c630434 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/mysql/MySqlDeployer.java @@ -3,17 +3,23 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.testing.openshift.tools.databases; +package io.debezium.testing.openshift.tools.databases.mysql; + +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.testing.openshift.tools.databases.DatabaseDeployer; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseController; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.openshift.client.OpenShiftClient; /** * @author Jakub Cechacek */ -public class MySqlDeployer extends DatabaseDeployer { +public class MySqlDeployer extends DatabaseDeployer { private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDeployer.class); public MySqlDeployer(OpenShiftClient ocp) { @@ -23,4 +29,8 @@ public MySqlDeployer(OpenShiftClient ocp) { public MySqlDeployer getThis() { return this; } + + public SqlDatabaseController getController(Deployment deployment, List services, String dbType, OpenShiftClient ocp) { + return new SqlDatabaseController(deployment, services, dbType, ocp); + } } diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/postgresql/PostgreSqlDeployer.java similarity index 51% rename from debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java rename to debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/postgresql/PostgreSqlDeployer.java index 728400855..27bf63317 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/postgresql/PostgreSqlDeployer.java @@ -3,18 +3,24 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.testing.openshift.tools.databases; +package io.debezium.testing.openshift.tools.databases.postgresql; + +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.testing.openshift.tools.databases.DatabaseDeployer; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseController; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.openshift.client.OpenShiftClient; /** * * @author Jakub Cechacek */ -public class PostgreSqlDeployer extends DatabaseDeployer { +public class PostgreSqlDeployer extends DatabaseDeployer { private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSqlDeployer.class); public PostgreSqlDeployer(OpenShiftClient ocp) { @@ -24,4 +30,8 @@ public PostgreSqlDeployer(OpenShiftClient ocp) { public PostgreSqlDeployer getThis() { return this; } + + public SqlDatabaseController getController(Deployment deployment, List services, String dbType, OpenShiftClient ocp) { + return new SqlDatabaseController(deployment, services, dbType, ocp); + } } diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java index 08087fd48..e0430bbc3 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java @@ -57,7 +57,11 @@ public KafkaConnector getCustomResource() { Map crConfig = new HashMap<>(config); KafkaConnectorBuilder connectorBuilder = new KafkaConnectorBuilder(); - return connectorBuilder.withNewSpec() + return connectorBuilder + .withNewMetadata() + .withLabels(new HashMap<>()) + .endMetadata() + .withNewSpec() .withClassName((String) crConfig.remove("connector.class")) .withTasksMax((Integer) crConfig.remove("task.max")) .withConfig(crConfig) diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java index ae8118c08..b7c1cb2b8 100644 --- a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java @@ -159,7 +159,7 @@ public Route exposeMetrics() { Service service = ocp.services().inNamespace(project).withName(nameSvc).get(); metricsRoute = ocpUtils - .createRoute(project, name, nameSvc, "prometheus", service.getMetadata().getLabels()); + .createRoute(project, name, nameSvc, "tcp-prometheus", service.getMetadata().getLabels()); httpUtils.awaitApi(getMetricsURL()); return metricsRoute; @@ -172,6 +172,7 @@ public Route exposeMetrics() { * @throws IOException or request error */ public void deployConnector(String name, ConnectorConfigBuilder config) throws IOException, InterruptedException { + LOGGER.info("Deploying connector " + name); if (useConnectorResources) { deployConnectorCr(name, config); } @@ -311,6 +312,24 @@ public void waitForPostgreSqlSnapshot(String connectorName) throws IOException { waitForSnapshot(connectorName, "debezium_postgres_connector_metrics_snapshotcompleted"); } + /** + * Waits until snapshot phase of given SQL Server connector completes + * @param connectorName connector name + * @throws IOException on metric request error + */ + public void waitForSqlServerSnapshot(String connectorName) throws IOException { + waitForSnapshot(connectorName, "debezium_sql_server_connector_metrics_snapshotcompleted"); + } + + /** + * Waits until snapshot phase of given MongoDB connector completes + * @param connectorName connector name + * @throws IOException on metric request error + */ + public void waitForMongoSnapshot(String connectorName) throws IOException { + waitForSnapshot(connectorName, "debezium_mongodb_connector_metrics_snapshotcompleted"); + } + /** * @return URL of Connect API endpoint */ diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java index f4f12c9ff..c816398d4 100644 --- a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.core.ThrowingRunnable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.slf4j.Logger; @@ -116,13 +117,24 @@ public static void teardown() { protected void assertTopicsExist(String... names) { try (Consumer consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) { - await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> { Set topics = consumer.listTopics().keySet(); assertThat(topics).contains(names); }); } } + protected void awaitAssert(long timeout, TimeUnit unit, ThrowingRunnable assertion) { + await() + .pollDelay(2, TimeUnit.SECONDS) + .atMost(timeout, unit) + .untilAsserted(assertion); + } + + protected void awaitAssert(ThrowingRunnable assertion) { + awaitAssert(1, TimeUnit.MINUTES, assertion); + } + protected void assertRecordsCount(String topic, int count) { try (Consumer consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) { consumer.subscribe(Collections.singleton(topic)); diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java index cfb110749..71d430be7 100644 --- a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java @@ -25,8 +25,9 @@ import io.debezium.testing.openshift.ConnectorTestBase; import io.debezium.testing.openshift.resources.ConfigProperties; import io.debezium.testing.openshift.resources.ConnectorFactories; -import io.debezium.testing.openshift.tools.databases.DatabaseController; -import io.debezium.testing.openshift.tools.databases.MySqlDeployer; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseClient; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseController; +import io.debezium.testing.openshift.tools.databases.mysql.MySqlDeployer; import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder; import okhttp3.OkHttpClient; @@ -48,7 +49,7 @@ public class MySqlConnectorIT extends ConnectorTestBase { public static final String CONNECTOR_NAME = "inventory-connector-mysql"; private static MySqlDeployer dbDeployer; - private static DatabaseController dbController; + private static SqlDatabaseController dbController; private static OkHttpClient httpClient = new OkHttpClient(); private static ConnectorFactories connectorFactories = new ConnectorFactories(); private static ConnectorConfigBuilder connectorConfig; @@ -76,6 +77,12 @@ public static void tearDownDatabase() throws IOException, InterruptedException { dbController.reload(); } + private void insertCustomer(String firstName, String lastName, String email) throws SQLException { + SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD); + String sql = "INSERT INTO customers VALUES (default, '" + firstName + "', '" + lastName + "', '" + email + "')"; + client.execute("inventory", sql); + } + @Test @Order(1) public void shouldHaveRegisteredConnector() { @@ -106,37 +113,31 @@ public void shouldCreateKafkaTopics() { @Order(3) public void shouldSnapshotChanges() throws IOException { kafkaConnectController.waitForMySqlSnapshot(connectorName); - assertRecordsCount(connectorName + ".inventory.customers", 4); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 4)); } @Test @Order(4) public void shouldStreamChanges() throws SQLException { - String sql = "INSERT INTO customers VALUES (default, 'Tom', 'Tester', 'tom@test.com')"; - dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 5); - assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com"); + insertCustomer("Tom", "Tester", "tom@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com")); } @Test @Order(5) public void shouldBeDown() throws SQLException, IOException { kafkaConnectController.undeployConnector(connectorName); - String sql = "INSERT INTO customers VALUES (default, 'Jerry', 'Tester', 'jerry@test.com')"; - dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 5); - + insertCustomer("Jerry", "Tester", "jerry@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5)); } @Test @Order(6) public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException { kafkaConnectController.deployConnector(connectorName, connectorConfig); - await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); - assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com")); } @Test @@ -144,9 +145,8 @@ public void shouldResumeStreamingAfterRedeployment() throws IOException, Interru public void shouldBeDownAfterCrash() throws SQLException { operatorController.disable(); kafkaConnectController.destroy(); - String sql = "INSERT INTO customers VALUES (default, 'Nibbles', 'Tester', 'nibbles@test.com')"; - dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 6); + insertCustomer("Nibbles", "Tester", "nibbles@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); } @Test @@ -154,11 +154,8 @@ public void shouldBeDownAfterCrash() throws SQLException { public void shouldResumeStreamingAfterCrash() throws InterruptedException { operatorController.enable(); kafkaConnectController.waitForConnectCluster(); - await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7)); - assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com"); + awaitAssert(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com")); } } diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java index bbfbb01da..d5460ef96 100644 --- a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java @@ -26,8 +26,9 @@ import io.debezium.testing.openshift.ConnectorTestBase; import io.debezium.testing.openshift.resources.ConfigProperties; import io.debezium.testing.openshift.resources.ConnectorFactories; -import io.debezium.testing.openshift.tools.databases.DatabaseController; -import io.debezium.testing.openshift.tools.databases.PostgreSqlDeployer; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseClient; +import io.debezium.testing.openshift.tools.databases.SqlDatabaseController; +import io.debezium.testing.openshift.tools.databases.postgresql.PostgreSqlDeployer; import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder; import okhttp3.OkHttpClient; @@ -49,7 +50,7 @@ public class PostgreSqlConnectorIT extends ConnectorTestBase { public static final String CONNECTOR_NAME = "inventory-connector-postgresql"; private static PostgreSqlDeployer dbDeployer; - private static DatabaseController dbController; + private static SqlDatabaseController dbController; private static OkHttpClient httpClient = new OkHttpClient(); private static ConnectorFactories connectorFactories = new ConnectorFactories(); private static ConnectorConfigBuilder connectorConfig; @@ -79,6 +80,12 @@ public static void tearDownDatabase() throws IOException, InterruptedException { dbController.reload(); } + private void insertCustomer(String firstName, String lastName, String email) throws SQLException { + SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD); + String sql = "INSERT INTO inventory.customers VALUES (default, '" + firstName + "', '" + lastName + "', '" + email + "')"; + client.execute(DATABASE_POSTGRESQL_DBZ_DBNAME, sql); + } + @Test @Order(1) public void shouldHaveRegisteredConnector() { @@ -107,37 +114,31 @@ public void shouldCreateKafkaTopics() { @Order(3) public void shouldContainRecordsInCustomersTopic() throws IOException { kafkaConnectController.waitForPostgreSqlSnapshot(connectorName); - assertRecordsCount(connectorName + ".inventory.customers", 4); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 4)); } @Test @Order(4) public void shouldStreamChanges() throws SQLException { - String sql = "INSERT INTO inventory.customers VALUES (default, 'Tom', 'Tester', 'tom@test.com')"; - dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 5); - assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com"); + insertCustomer("Tom", "Tester", "tom@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com")); } @Test @Order(5) public void shouldBeDown() throws SQLException, IOException { kafkaConnectController.undeployConnector(connectorName); - String sql = "INSERT INTO inventory.customers VALUES (default, 'Jerry', 'Tester', 'jerry@test.com')"; - dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 5); - + insertCustomer("Jerry", "Tester", "jerry@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5)); } @Test @Order(6) public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException { kafkaConnectController.deployConnector(connectorName, connectorConfig); - await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); - assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com")); } @Test @@ -145,9 +146,8 @@ public void shouldResumeStreamingAfterRedeployment() throws IOException, Interru public void shouldBeDownAfterCrash() throws SQLException { operatorController.disable(); kafkaConnectController.destroy(); - String sql = "INSERT INTO inventory.customers VALUES (default, 'Nibbles', 'Tester', 'nibbles@test.com')"; - dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql); - assertRecordsCount(connectorName + ".inventory.customers", 6); + insertCustomer("Nibbles", "Tester", "nibbles@test.com"); + awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6)); } @Test @@ -155,10 +155,7 @@ public void shouldBeDownAfterCrash() throws SQLException { public void shouldResumeStreamingAfterCrash() throws InterruptedException { operatorController.enable(); kafkaConnectController.waitForConnectCluster(); - await() - .atMost(30, TimeUnit.SECONDS) - .pollInterval(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7)); - assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com"); + awaitAssert(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7)); + awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com")); } } diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java index 124385f5e..822983782 100644 --- a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java @@ -23,6 +23,8 @@ public class ConfigProperties { public static final String OCP_PROJECT_DBZ = stringProperty("test.ocp.project.debezium"); public static final String OCP_PROJECT_MYSQL = System.getProperty("test.ocp.project.mysql", "debezium-mysql"); public static final String OCP_PROJECT_POSTGRESQL = System.getProperty("test.ocp.project.postgresql", "debezium-postgresql"); + public static final String OCP_PROJECT_SQLSERVER = System.getProperty("test.ocp.project.sqlserver", "debezium-sqlserver"); + public static final String OCP_PROJECT_MONGO = System.getProperty("test.ocp.project.mongo", "debezium-mongo"); public static final Optional OCP_PULL_SECRET_PATHS = stringOptionalProperty("test.ocp.pull.secret.paths"); public static final boolean STRIMZI_OPERATOR_CONNECTORS = booleanProperty("test.strimzi.operator.connectors"); @@ -42,6 +44,23 @@ public class ConfigProperties { public static final String DATABASE_POSTGRESQL_DBZ_DBNAME = System.getProperty("test.database.postgresql.dbname", "debezium"); public static final Optional DATABASE_POSTGRESQL_HOST = stringOptionalProperty("test.database.postgresql.host"); + public static final int DATABASE_SQLSERVER_PORT = Integer.parseInt(System.getProperty("test.database.sqlserver.port", "1433")); + public static final String DATABASE_SQLSERVER_USERNAME = System.getProperty("test.database.sqlserver.username", "sa"); + public static final String DATABASE_SQLSERVER_SA_PASSWORD = System.getProperty("test.database.sqlserver.password", "Debezium1$"); + public static final String DATABASE_SQLSERVER_DBZ_USERNAME = System.getProperty("test.database.sqlserver.dbz.username", DATABASE_SQLSERVER_USERNAME); + public static final String DATABASE_SQLSERVER_DBZ_PASSWORD = System.getProperty("test.database.sqlserver.dbz.password", DATABASE_SQLSERVER_SA_PASSWORD); + public static final String DATABASE_SQLSERVER_DBZ_DBNAME = System.getProperty("test.database.sqlserver.dbname", "testDB"); + public static final Optional DATABASE_SQLSERVER_HOST = stringOptionalProperty("test.database.sqlserver.host"); + + public static final int DATABASE_MONGO_PORT = Integer.parseInt(System.getProperty("test.database.mongo.port", "27017")); + public static final String DATABASE_MONGO_USERNAME = System.getProperty("test.database.mongo.username", "admin"); + public static final String DATABASE_MONGO_SA_PASSWORD = System.getProperty("test.database.mongo.password", "admin"); + public static final String DATABASE_MONGO_DBZ_USERNAME = System.getProperty("test.database.mongo.dbz.username", "debezium"); + public static final String DATABASE_MONGO_DBZ_PASSWORD = System.getProperty("test.database.mongo.dbz.password", "dbz"); + public static final String DATABASE_MONGO_DBZ_DBNAME = System.getProperty("test.database.mongo.dbname", "inventory"); + public static final String DATABASE_MONGO_DBZ_LOGIN_DBNAME = System.getProperty("test.database.mongo.dbz.login.dbname", "admin"); + public static final Optional DATABASE_MONGO_HOST = stringOptionalProperty("test.database.mongo.host"); + private static boolean booleanProperty(String key) { String value = System.getProperty(key); if (value == null || value.isEmpty() || value.equalsIgnoreCase("false") || value.equalsIgnoreCase("0")) { diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java index 6e30cc7bf..9f18fd77a 100644 --- a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java @@ -5,6 +5,7 @@ */ package io.debezium.testing.openshift.resources; +import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_HOST; import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MYSQL_HOST; import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_POSTGRESQL_HOST; @@ -48,4 +49,33 @@ public ConnectorConfigBuilder postgresql() { .put("slot.name", "debezium") .put("plugin.name", "pgoutput"); } + + public ConnectorConfigBuilder sqlserver() { + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(); + String dbHost = DATABASE_POSTGRESQL_HOST.orElse("sqlserver." + ConfigProperties.OCP_PROJECT_SQLSERVER + ".svc.cluster.local"); + return cb + .put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector") + .put("task.max", 1) + .put("database.hostname", dbHost) + .put("database.port", ConfigProperties.DATABASE_SQLSERVER_PORT) + .put("database.user", ConfigProperties.DATABASE_SQLSERVER_DBZ_USERNAME) + .put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD) + .put("database.dbname", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAME) + .put("database.server.name", "sqlserverdb") // this should be overwritten with unique name + .put("database.history.kafka.bootstrap.servers", "debezium-kafka-cluster-kafka-bootstrap." + ConfigProperties.OCP_PROJECT_DBZ + ".svc.cluster.local:9092") + .put("database.history.kafka.topic", "schema-changes.inventory"); + } + + public ConnectorConfigBuilder mongo() { + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(); + String dbHost = DATABASE_MONGO_HOST.orElse("mongo." + ConfigProperties.OCP_PROJECT_MONGO + ".svc.cluster.local"); + return cb + .put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector") + .put("task.max", 1) + .put("mongodb.hosts", "rs0/" + dbHost + ":" + ConfigProperties.DATABASE_MONGO_PORT) + .put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME) + .put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD) + .put("mongodb.name", "mongodb") // this should be overwritten with unique name + .put("database.whitelist", ConfigProperties.DATABASE_MONGO_DBZ_DBNAME); // might want to change + } } diff --git a/jenkins-jobs/pipelines/openshift_pipeline.groovy b/jenkins-jobs/pipelines/openshift_pipeline.groovy index 693c318c9..be58c619c 100644 --- a/jenkins-jobs/pipelines/openshift_pipeline.groovy +++ b/jenkins-jobs/pipelines/openshift_pipeline.groovy @@ -55,6 +55,8 @@ pipeline { env.OCP_PROJECT_DEBEZIUM = "debezium-${BUILD_NUMBER}" env.OCP_PROJECT_MYSQL = "debezium-${BUILD_NUMBER}-mysql" env.OCP_PROJECT_POSTGRESQL = "debezium-${BUILD_NUMBER}-postgresql" + env.OCP_PROJECT_SQLSERVER = "debezium-${BUILD_NUMBER}-sqlserver" + env.OCP_PROJECT_MONGO = "debezium-${BUILD_NUMBER}-mongo" } withCredentials([ usernamePassword(credentialsId: "${OCP_CREDENTIALS}", usernameVariable: 'OCP_USERNAME', passwordVariable: 'OCP_PASSWORD'), @@ -67,16 +69,20 @@ pipeline { oc new-project ${OCP_PROJECT_DEBEZIUM} oc new-project ${OCP_PROJECT_MYSQL} oc new-project ${OCP_PROJECT_POSTGRESQL} + oc new-project ${OCP_PROJECT_SQLSERVER} ''' sh ''' set -x - - sed -i "s/namespace: .*/namespace: ${OCP_PROJECT_DEBEZIUM}/" strimzi/install/cluster-operator/*RoleBinding*.yaml oc apply -f ${STRZ_RESOURCES} -n ${OCP_PROJECT_DEBEZIUM} ''' sh ''' set -x + oc adm policy add-scc-to-user anyuid system:serviceaccount:${OCP_PROJECT_SQLSERVER}:default + oc adm policy add-scc-to-user anyuid system:serviceaccount:${OCP_PROJECT_MONGO}:default + ''' + sh ''' + set -x docker login -u=${QUAY_USERNAME} -p=${QUAY_PASSWORD} quay.io ''' }