diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 5c948538b..6cfa8b1b4 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -34,7 +34,6 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; -import org.awaitility.Duration; import org.awaitility.core.ConditionTimeoutException; import org.fest.assertions.Assertions; import org.junit.After; @@ -846,7 +845,7 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException // Wait max 2 seconds for LSN change try { - Awaitility.await().atMost(Duration.TWO_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection))); + Awaitility.await().atMost(2, TimeUnit.MINUTES.SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection))); } catch (ConditionTimeoutException e) { // We do not require all flushes to succeed in time @@ -894,7 +893,7 @@ public void shouldFlushLsnOnEmptyMessage() throws InterruptedException, SQLExcep try { // Wait max 5 seconds for LSN change caused by DDL_STATEMENT - Awaitility.await().atMost(Duration.FIVE_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection))); + Awaitility.await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection))); } catch (ConditionTimeoutException e) { // We do not require all flushes to succeed in time diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index fd0637707..6ff583d76 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -39,7 +39,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; -import org.awaitility.Duration; import org.fest.assertions.Assertions; import org.junit.Before; import org.junit.Rule; @@ -1168,7 +1167,7 @@ public void shouldWarnOnMissingHeartbeatForFilteredEvents() throws Exception { IntStream.range(0, filteredCount) .mapToObj(x -> "INSERT INTO s1.a (pk) VALUES (default);") .collect(Collectors.joining())); - Awaitility.await().alias("WAL growing log message").pollInterval(Duration.ONE_SECOND).atMost(Duration.TEN_SECONDS).until(() -> logInterceptor.containsWarnMessage( + Awaitility.await().alias("WAL growing log message").pollInterval(1, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).until(() -> logInterceptor.containsWarnMessage( "Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation.")); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index 13153a1f5..99cb83392 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -17,6 +17,7 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.awaitility.Awaitility; @@ -297,7 +298,7 @@ protected static boolean publicationExists(String publicationName) { protected static void waitForDefaultReplicationSlotBeActive() { try (PostgresConnection connection = create()) { - Awaitility.await().atMost(org.awaitility.Duration.FIVE_SECONDS).until(() -> connection.prepareQueryAndMap( + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection.prepareQueryAndMap( "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ? and active = true", statement -> { statement.setString(1, ReplicationConnection.Builder.DEFAULT_SLOT_NAME); statement.setString(2, "postgres"); diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java index 8535a5981..50473a9e6 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerConnectorIT.java @@ -24,7 +24,6 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.awaitility.Awaitility; -import org.awaitility.Duration; import org.fest.assertions.Assertions; import org.junit.After; import org.junit.Before; @@ -209,7 +208,7 @@ public void readOnlyApplicationIntent() throws Exception { // Verify that multiple subsequent transactions are used in streaming phase with read-only intent try (final SqlServerConnection admin = TestHelper.adminConnection()) { final Set txIds = new HashSet<>(); - Awaitility.await().atMost(Duration.FIVE_SECONDS).pollInterval(Duration.ONE_HUNDRED_MILLISECONDS).until(() -> { + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> { admin.query( "SELECT (SELECT transaction_id FROM sys.dm_tran_session_transactions AS t WHERE s.session_id=t.session_id) FROM sys.dm_exec_sessions AS s WHERE program_name='" + appId + "'", diff --git a/debezium-testing/debezium-testing-openshift/README.md b/debezium-testing/debezium-testing-openshift/README.md new file mode 100644 index 000000000..3902d2024 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/README.md @@ -0,0 +1,41 @@ +# OpenShift deployment verification suite +This project verifies the basic functionality of Debezium connectors with Kafka cluster deployed to OpenShift via Strimzi project. + +## Prerequisites +OpenShift cluster with cluster-wide administrator access is required in order to run these tests. +Depending on chosen registry a configured docker credentials are required in order to push built + +## Running the tests +```bash +mvn install -Docp.url= -Docp.username= -Docp.password= -Dimage.fullname= +``` + +The following properties can be set to further configure the test execution + +| Name | Default Value | description | +| -----| ------------- | ----------- | +| ocp.url | | OpenShift API endpoint | +| ocp.username | | OpenShift admin username | +| ocp.password | | OpenShift admin password | +| ocp.project.debezium | debezium | OpenShift debezium project | +| ocp.project.mysql | debezium-mysql | OpenShift mysql project | +| image.fullname | | Full name of Kafka Connect image | + +## Building a KafkaConnect image with Debezium connectors + +To build connect image running the ```assembly``` profile from parent directory together with ```image``` profile + +```bash +mvn clean install -DskipTests -DskipITs -Passembly -Pimage +``` + +The following properties can be set to further configure image build + +| Name | Default Value | description | +| -----| ------------- | ----------- | +| image.push.skip | true | Skips push to remote registry | +| image.push.registry | quay.io | remote registry base | +| image.name | debezium/kafka:${project.version}-${image.version.strimzi}-kafka-${version.kafka} | Name of built image | +| image.fullname | ${image.push.registry}/${image.name} | Full name of the built image | +| image.base.name | strimzi/kafka:${image.version.strimzi}-kafka-${version.kafka} | Base for built image | +| image.version.strimzi | latest | Version of Strimzi Kafka image | diff --git a/debezium-testing/debezium-testing-openshift/pom.xml b/debezium-testing/debezium-testing-openshift/pom.xml new file mode 100644 index 000000000..23fe23af1 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/pom.xml @@ -0,0 +1,311 @@ + + + + + io.debezium + debezium-testing + 1.1.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + debezium-testing-openshift + Debezium OpenShift integration test-suite + + + 4.6.4 + 0.16.2 + 5.5.1 + 0.5.1 + 3.11.1 + + quay.io/debezium/kafka:${project.version}-latest-kafka-${version.kafka} + + + debezium + debezium-mysql + debezium-postgresql + + + true + + + 3306 + mysqluser + mysqlpw + debezium + dbz + debezium + + + 5432 + debezium + debezium + debezium + + + + + + io.fabric8 + kubernetes-client-bom + ${version.fabric8.client} + import + pom + + + + + + + io.fabric8 + openshift-client + ${version.fabric8.client} + + + + org.apache.kafka + kafka-clients + ${version.kafka} + + + + io.strimzi + api + ${version.strimzi} + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + + + com.squareup.okhttp3 + okhttp + + + + com.fasterxml.jackson.core + jackson-core + + + + com.fasterxml.jackson.core + jackson-databind + + + + org.junit.jupiter + junit-jupiter-api + ${version.junit5} + test + + + + org.junit.jupiter + junit-jupiter-engine + ${version.junit5} + test + + + + org.junit-pioneer + junit-pioneer + ${version.junit5.pioneer} + + + + org.assertj + assertj-core + ${version.assertj} + test + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + log4j + log4j + + + + org.awaitility + awaitility + ${version.awaitility} + + + + + + image + + + true + quay.io + ${image.push.registry} + latest + debezium/kafka:${project.version}-${image.version.strimzi}-kafka-${version.kafka} + ${image.push.registry}/${image.name} + strimzi/kafka:${image.version.strimzi}-kafka-${version.kafka} + + + + + io.debezium + debezium-connector-mysql + plugin + zip + ${project.version} + + + io.debezium + debezium-connector-postgres + plugin + zip + ${project.version} + + + io.debezium + debezium-connector-sqlserver + plugin + zip + ${project.version} + + + io.debezium + debezium-connector-mongodb + plugin + zip + ${project.version} + + + mysql + mysql-connector-java + + + + + + + + io.fabric8 + docker-maven-plugin + + IfNotPresent + ${image.push.registry} + ${image.push.skip} + + + ${image.name} + + ${image.base.name} + + /opt/kafka/plugins + + + + . + true + + io.debezium:debezium-connector-*:zip:plugin + + + + + + + + + + + + + build + pre-integration-test + + build + push + + + + + + + + + + openshiftITs + + + + org.apache.maven.plugins + maven-failsafe-plugin + + ${skipITs} + true + false + + ${ocp.url} + ${ocp.username} + ${ocp.password} + ${ocp.rhio.secret.path} + ${ocp.project.debezium} + ${ocp.project.mysql} + ${ocp.project.postgresql} + ${strimzi.operator.connectors} + + ${database.mysql.host} + ${database.mysql.dbz.username} + ${database.mysql.dbz.password} + ${database.mysql.port} + + ${database.postgresql.host} + ${database.postgresql.port} + ${database.postgresql.username} + ${database.postgresql.password} + ${database.postgresql.dbname} + + + + + tests-openshift + + integration-test + + + + + + + + + src/test/resources + true + + + + + + \ No newline at end of file diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/HttpUtils.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/HttpUtils.java new file mode 100644 index 000000000..63b20b5a8 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/HttpUtils.java @@ -0,0 +1,52 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools; + +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +/** + * Utility methods for HTTP requests + * @author Jakub Cechacek + */ +public class HttpUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class); + + private final OkHttpClient http; + + public HttpUtils(OkHttpClient http) { + this.http = http; + } + + /** + * Waits until URL starts responding with success response code + * @param url tested url + */ + public void awaitApi(HttpUrl url) { + LOGGER.info("Waiting for API at " + url); + await() + .atMost(1, TimeUnit.MINUTES) + .ignoreException(IOException.class) + .until(() -> pingApi(url)); + } + + private boolean pingApi(HttpUrl address) throws IOException { + Request r = new Request.Builder().url(address).build(); + try (Response res = http.newCall(r).execute()) { + return res.isSuccessful(); + } + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/OpenShiftUtils.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/OpenShiftUtils.java new file mode 100644 index 000000000..8c793c16d --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/OpenShiftUtils.java @@ -0,0 +1,150 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools; + +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.ServiceAccount; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicy; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * Utility methods for working with OpenShift + * @author Jakub Cechacek + */ +public class OpenShiftUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(OpenShiftUtils.class); + + private OpenShiftClient client; + + public OpenShiftUtils(OpenShiftClient client) { + this.client = client; + } + + /** + * Creates route in given namespace + * @param project project where this route will be created + * @param name name of the route + * @param service target service + * @param port target port + * @param labels labels set to set on this route + * @return {@link Route} object for created route + */ + public Route createRoute(String project, String name, String service, String port, Map labels) { + Route route = client.routes().inNamespace(project).createOrReplaceWithNew() + .withNewMetadata() + .withName(name) + .withLabels(labels) + .endMetadata() + .withNewSpec() + .withNewTo() + .withKind("Service") + .withName(service) + .endTo() + .withNewPort() + .withNewTargetPort(port) + .endPort() + .endSpec() + .done(); + return route; + } + + /** + * Creates new NetworkPolicy in given namespace allowing public access + * @param project project where this network policy will be created + * @param name name of the policy + * @param podSelectorLabels labels used as pod selectors + * @param ports ports for which access will be allowed + * @return {@link NetworkPolicy} object for created policy + */ + public NetworkPolicy createNetworkPolicy(String project, String name, Map podSelectorLabels, List ports) { + NetworkPolicy policy = client.network().networkPolicies().inNamespace(project) + .createOrReplaceWithNew() + .withNewMetadata() + .withName(name) + .endMetadata() + .withNewSpec() + .withNewPodSelector() + .withMatchLabels(podSelectorLabels) + .endPodSelector() + .addNewIngress() + .addToPorts(ports.toArray(new NetworkPolicyPort[ports.size()])) + .endIngress() + .withPolicyTypes("Ingress") + .endSpec() + .done(); + + return policy; + } + + /** + * Links pull secret to service account + * @param project project where this operation happens + * @param sa service account name + * @param secret secret name + * @return {@link} Service account object to which this secret was linked + */ + public ServiceAccount linkPullSecret(String project, String sa, String secret) { + return client.serviceAccounts() + .inNamespace(project) + .withName(sa) + .edit() + .addNewImagePullSecret(secret) + .done(); + } + + /** + * Finds pods with given labels + * @param project project where to look for pods + * @param labels labels used to identify pods + * @return {@link PodList} of matching pods + */ + public PodList podsWithLabels(String project, Map labels) { + Supplier podListSupplier = () -> client.pods().inNamespace(project).withLabels(labels).list(); + await().atMost(5, TimeUnit.MINUTES).until(() -> podListSupplier.get().getItems().size() > 0); + PodList pods = podListSupplier.get(); + + if (pods.getItems().isEmpty()) { + LOGGER.warn("Empty PodList"); + } + + return pods; + } + + /** + * Waits until all pods with given labels are ready + * @param project project where to look for pods + * @param labels labels used to identify pods + */ + public void waitForPods(String project, Map labels) { + String lbls = labels.keySet().stream().map(k -> k + "=" + labels.get(k)).collect(Collectors.joining(", ")); + LOGGER.info("Waiting for pods to deploy [" + lbls + "]"); + + PodList pods = podsWithLabels(project, labels); + + for (Pod p : pods.getItems()) { + try { + client.resource(p).waitUntilReady(5, TimeUnit.MINUTES); + } + catch (InterruptedException e) { + throw new IllegalStateException("Error when waiting for pod " + p.getMetadata().getName() + " to get ready", e); + } + } + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/YAML.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/YAML.java new file mode 100644 index 000000000..2e0716461 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/YAML.java @@ -0,0 +1,44 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools; + +import java.io.File; +import java.io.IOException; +import java.net.URL; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.InvalidFormatException; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +/** + * @author Jakub Cechace + */ +public final class YAML { + /** + * Deserialize object fromResource YAML file + * + * @param path file path + * @param c type of object + * @return deserialized object + */ + public static T from(String path, Class c) { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + try { + return mapper.readValue(new File(path), c); + } + catch (InvalidFormatException e) { + throw new IllegalArgumentException(e); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static T fromResource(String path, Class c) { + URL resource = YAML.class.getResource(path); + return from(resource.getFile(), c); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java new file mode 100644 index 000000000..77fd4f72c --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseController.java @@ -0,0 +1,51 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import java.util.List; + +import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.fabric8.kubernetes.api.model.LoadBalancerIngress; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * + * @author Jakub Cechacek + */ +public class DatabaseController { + + private final Deployment deployment; + private final List services; + private final OpenShiftClient ocp; + private final String project; + private final String dbType; + private final OpenShiftUtils ocpUtils; + + public DatabaseController(Deployment deployment, List services, String dbType, OpenShiftClient ocp) { + this.deployment = deployment; + this.services = services; + this.ocp = ocp; + this.project = deployment.getMetadata().getNamespace(); + this.dbType = dbType; + this.ocpUtils = new OpenShiftUtils(ocp); + } + + public String getDatabaseUrl() { + Service svc = ocp + .services() + .inNamespace(project) + .withName(deployment.getMetadata().getName() + "-lb") + .get(); + + LoadBalancerIngress ingress = svc.getStatus().getLoadBalancer().getIngress().get(0); + String hostname = ingress.getHostname(); + Integer port = svc.getSpec().getPorts().stream().filter(p -> p.getName().equals("db")).findAny().get().getPort(); + + return "jdbc:" + dbType + "://" + hostname + ":" + port; + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java new file mode 100644 index 000000000..b0d8e0d0f --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/DatabaseDeployer.java @@ -0,0 +1,88 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.debezium.testing.openshift.tools.YAML; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * @author Jakub Cechacek + */ +public abstract class DatabaseDeployer { + private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseDeployer.class); + + private final OpenShiftClient ocp; + private final OpenShiftUtils ocpUtils; + private final String dbType; + private String project; + private Deployment deployment; + private List services; + + public DatabaseDeployer(String dbType, OpenShiftClient ocp) { + this.dbType = dbType; + this.ocp = ocp; + this.ocpUtils = new OpenShiftUtils(ocp); + } + + public T withProject(String project) { + this.project = project; + return getThis(); + } + + public T withDeployment(String yamlPath) { + return withDeployment(YAML.fromResource(yamlPath, Deployment.class)); + } + + public T withDeployment(Deployment deployment) { + this.deployment = deployment; + return getThis(); + } + + public T withServices(String... yamlPath) { + List services = Arrays.stream(yamlPath) + .map(p -> YAML.fromResource(p, Service.class)).collect(Collectors.toList()); + return withServices(services); + } + + public T withServices(Collection services) { + this.services = new ArrayList<>(services); + return getThis(); + } + + public DatabaseController deploy() { + if (deployment == null) { + throw new IllegalStateException("Deployment configuration not available"); + } + LOGGER.info("Deploying database"); + Deployment dep = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment); + + List svcs = services.stream() + .map(s -> ocp.services().inNamespace(project).createOrReplace(s)) + .collect(Collectors.toList()); + + ocpUtils.waitForPods(project, dep.getMetadata().getLabels()); + LOGGER.info("Database deployed successfully"); + + this.deployment = dep; + this.services = svcs; + + return new DatabaseController(dep, services, dbType, ocp); + } + + public abstract T getThis(); +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java new file mode 100644 index 000000000..90fafd8ad --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/MySqlDeployer.java @@ -0,0 +1,26 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * @author Jakub Cechacek + */ +public class MySqlDeployer extends DatabaseDeployer { + private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDeployer.class); + + public MySqlDeployer(OpenShiftClient ocp) { + super("mysql", ocp); + } + + public MySqlDeployer getThis() { + return this; + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java new file mode 100644 index 000000000..728400855 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/databases/PostgreSqlDeployer.java @@ -0,0 +1,27 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.databases; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * + * @author Jakub Cechacek + */ +public class PostgreSqlDeployer extends DatabaseDeployer { + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSqlDeployer.class); + + public PostgreSqlDeployer(OpenShiftClient ocp) { + super("postgresql", ocp); + } + + public PostgreSqlDeployer getThis() { + return this; + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java new file mode 100644 index 000000000..08087fd48 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/ConnectorConfigBuilder.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.kafka; + +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.strimzi.api.kafka.model.KafkaConnector; +import io.strimzi.api.kafka.model.KafkaConnectorBuilder; + +/** + * + * @author Jakub Cechacek + */ +public class ConnectorConfigBuilder { + private final Map config; + private final ObjectMapper mapper; + + public ConnectorConfigBuilder() { + this.mapper = new ObjectMapper(); + this.config = new HashMap<>(); + } + + public ConnectorConfigBuilder put(String key, Object value) { + config.put(key, value); + return this; + } + + public Map get() { + return config; + } + + /** + * Get configuration as JSON string + * @return JSON string of connector config + */ + public String getJsonString() { + try { + return mapper.writeValueAsString(config); + } + catch (JsonProcessingException e) { + throw new IllegalStateException("Unable to convert connector config to JSON String"); + } + } + + /** + * Get configuration as OpenShift CR of type {@link KafkaConnector} + * @return Connector CR + */ + public KafkaConnector getCustomResource() { + Map crConfig = new HashMap<>(config); + + KafkaConnectorBuilder connectorBuilder = new KafkaConnectorBuilder(); + return connectorBuilder.withNewSpec() + .withClassName((String) crConfig.remove("connector.class")) + .withTasksMax((Integer) crConfig.remove("task.max")) + .withConfig(crConfig) + .endSpec() + .build(); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java new file mode 100644 index 000000000..13eb7fc57 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaConnectController.java @@ -0,0 +1,302 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.kafka; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.testing.openshift.tools.HttpUtils; +import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicy; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort; +import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPortBuilder; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.openshift.api.model.Route; +import io.fabric8.openshift.client.OpenShiftClient; +import io.strimzi.api.kafka.Crds; +import io.strimzi.api.kafka.KafkaConnectorList; +import io.strimzi.api.kafka.model.DoneableKafkaConnector; +import io.strimzi.api.kafka.model.KafkaConnect; +import io.strimzi.api.kafka.model.KafkaConnector; +import io.strimzi.api.kafka.model.status.KafkaConnectorStatus; + +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +/** + * This class provides control over Kafka Connect instance deployed in OpenShift + * @author Jakub Cechacek + */ +public class KafkaConnectController { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectController.class); + + private final OpenShiftClient ocp; + private final KafkaConnect kafkaConnect; + private final OkHttpClient http; + private final String project; + private final OpenShiftUtils ocpUtils; + private final HttpUtils httpUtils; + private final boolean useConnectorResources; + + private Route apiRoute; + private Route metricsRoute; + + public KafkaConnectController(KafkaConnect kafkaConnect, OpenShiftClient ocp, OkHttpClient http, boolean useConnectorResources) { + this.kafkaConnect = kafkaConnect; + this.ocp = ocp; + this.http = http; + this.useConnectorResources = useConnectorResources; + this.project = kafkaConnect.getMetadata().getNamespace(); + this.ocpUtils = new OpenShiftUtils(ocp); + this.httpUtils = new HttpUtils(http); + } + + /** + * Creates network policy allowing access to ports exposed by Kafka Connect + * @return + */ + public NetworkPolicy allowServiceAccess() { + LOGGER.info("Creating NetworkPolicy allowing public access to " + kafkaConnect.getMetadata().getName() + "'s services"); + + Map labels = new HashMap<>(); + labels.put("strimzi.io/cluster", kafkaConnect.getMetadata().getName()); + labels.put("strimzi.io/kind", "KafkaConnect"); + labels.put("strimzi.io/name", kafkaConnect.getMetadata().getName() + "-connect"); + + List ports = Stream.of(8083, 8404) + .map(IntOrString::new) + .map(p -> new NetworkPolicyPortBuilder().withProtocol("TCP").withPort(p).build()) + .collect(Collectors.toList()); + + NetworkPolicy policy = ocpUtils.createNetworkPolicy(project, kafkaConnect.getMetadata().getName() + "-allowed", labels, ports); + return policy; + } + + /** + * Exposes a route for kafka connect API associated with given KafkaConnect resource + * @return {@link Route} object + */ + public Route exposeApi() { + LOGGER.info("Exposing KafkaConnect API"); + String name = kafkaConnect.getMetadata().getName() + "-connect-api"; + Service service = ocp.services().inNamespace(project).withName(name).get(); + + apiRoute = ocpUtils.createRoute(project, name, name, "rest-api", service.getMetadata().getLabels()); + httpUtils.awaitApi(getApiURL()); + return apiRoute; + } + + /** + * Exposes a route for prometheus metrics for kafka connect associated with given KafkaConnect resource + * @return {@link Route} object + */ + public Route exposeMetrics() { + LOGGER.info("Exposing KafkaConnect metrics"); + String name = kafkaConnect.getMetadata().getName() + "-connect-metrics"; + String nameSvc = kafkaConnect.getMetadata().getName() + "-connect-api"; + Service service = ocp.services().inNamespace(project).withName(nameSvc).get(); + + metricsRoute = ocpUtils + .createRoute(project, name, nameSvc, "prometheus", service.getMetadata().getLabels()); + httpUtils.awaitApi(getMetricsURL()); + + return metricsRoute; + } + + /** + * Deploys Kafka connector with given name and configuration via REST + * @param name connector name + * @param config connector config + * @throws IOException or request error + */ + public void deployConnector(String name, ConnectorConfigBuilder config) throws IOException, InterruptedException { + if (useConnectorResources) { + deployConnectorCr(name, config); + } + else { + deployConnectorJson(name, config); + } + } + + private void deployConnectorJson(String name, ConnectorConfigBuilder config) throws IOException { + if (apiRoute == null) { + throw new IllegalStateException("KafkaConnect API was not exposed"); + } + + HttpUrl url = getApiURL().resolve("/connectors/" + name + "/config"); + Request r = new Request.Builder() + .url(url) + .put(RequestBody.create(config.getJsonString(), MediaType.parse("application/json"))) + .build(); + + try (Response res = http.newCall(r).execute()) { + if (!res.isSuccessful()) { + LOGGER.error(res.request().url().toString()); + throw new RuntimeException("Connector registration request returned status code '" + res.code() + "'"); + } + LOGGER.info("Registered kafka connector '" + name + "'"); + } + } + + private void deployConnectorCr(String name, ConnectorConfigBuilder config) throws InterruptedException { + LOGGER.info("Deploying connector CR"); + KafkaConnector connector = config.getCustomResource(); + connector.getMetadata().setName(name); + connector.getMetadata().getLabels().put("strimzi.io/cluster", kafkaConnect.getMetadata().getName()); + + kafkaConnectorOperation().createOrReplace(connector); + waitForKafkaConnector(connector.getMetadata().getName()); + } + + /** + * Waits until connector is properly deployed. + * Note: works only for CR deployment + * @param name name of the connector + * @throws InterruptedException on wait error + * @throws IllegalArgumentException when deployment doesn't use custom resources + */ + public KafkaConnector waitForKafkaConnector(String name) throws InterruptedException { + if (!useConnectorResources) { + throw new IllegalStateException("Unable to wait for connector, deployment doesn't use custom resources."); + } + return kafkaConnectorOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES); + } + + private boolean waitForReadyStatus(KafkaConnector connector) { + KafkaConnectorStatus status = connector.getStatus(); + if (status == null) { + return false; + } + return status.getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Ready") && c.getStatus().equalsIgnoreCase("True")); + } + + private NonNamespaceOperation> kafkaConnectorOperation() { + return Crds.kafkaConnectorOperation(ocp).inNamespace(project); + } + + /** + * Deletes Kafka connector with given name + * @param name connector name + * @throws IOException on request error + */ + public void undeployConnector(String name) throws IOException { + if (useConnectorResources) { + undeployConnectorrCr(name); + } + else { + undeployConnectorJson(name); + } + } + + private void undeployConnectorJson(String name) throws IOException { + if (apiRoute == null) { + throw new IllegalStateException("KafkaConnect API was not exposed"); + } + + HttpUrl url = getApiURL().resolve("/connectors/" + name); + Request r = new Request.Builder().url(url).delete().build(); + + try (Response res = http.newCall(r).execute()) { + if (!res.isSuccessful()) { + LOGGER.error(res.request().url().toString()); + throw new RuntimeException("Connector deletion request returned status code '" + res.code() + "'"); + } + LOGGER.info("Deleted kafka connector '" + name + "'"); + } + } + + private void undeployConnectorrCr(String name) { + kafkaConnectorOperation().withName(name).delete(); + } + + public List getConnectMetrics() throws IOException { + OkHttpClient httpClient = new OkHttpClient(); + Request r = new Request.Builder().url(getMetricsURL()).get().build(); + + try (Response res = httpClient.newCall(r).execute()) { + String metrics = res.body().string(); + return Stream.of(metrics.split("\\r?\\n")).collect(Collectors.toList()); + } + } + + /** + * Waits until Snapshot phase of given connector completes + * @param connectorName name of the connect + * @param metricName name of the metric used to determine the state + * @throws IOException on metric request error + */ + public void waitForSnapshot(String connectorName, String metricName) throws IOException { + List metrics = getConnectMetrics(); + await() + .atMost(5, TimeUnit.MINUTES) + .pollInterval(10, TimeUnit.SECONDS) + .until(() -> metrics.stream().anyMatch(s -> s.contains(metricName) && s.contains(connectorName))); + } + + /** + * Waits until snapshot phase of given MySQL connector completes + * @param connectorName connector name + * @throws IOException on metric request error + */ + public void waitForMySqlSnapshot(String connectorName) throws IOException { + waitForSnapshot(connectorName, "debezium_mysql_connector_metrics_snapshotcompleted"); + } + + /** + * Waits until snapshot phase of given PostgreSQL connector completes + * @param connectorName connector name + * @throws IOException on metric request error + */ + public void waitForPostgreSqlSnapshot(String connectorName) throws IOException { + waitForSnapshot(connectorName, "debezium_postgres_connector_metrics_snapshotcompleted"); + } + + /** + * @return URL of Connect API endpoint + */ + public HttpUrl getApiURL() { + return new HttpUrl.Builder() + .scheme("http") + .host(apiRoute.getSpec().getHost()) + .build(); + } + + /** + * @return URL of metrics endpoint + */ + public HttpUrl getMetricsURL() { + return new HttpUrl.Builder() + .scheme("http") + .host(metricsRoute.getSpec().getHost()) + .build(); + } + + /** + * Undeploy this Kafka Connect cluster by deleted related KafkaConnect CR + * @return true if the CR was found and deleted + */ + public boolean undeployCluster() { + return Crds.kafkaConnectOperation(ocp).delete(kafkaConnect); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaController.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaController.java new file mode 100644 index 000000000..925ed5493 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaController.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.kafka; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.fabric8.openshift.client.OpenShiftClient; +import io.strimzi.api.kafka.Crds; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.status.ListenerAddress; +import io.strimzi.api.kafka.model.status.ListenerStatus; + +import okhttp3.OkHttpClient; + +/** + * This class provides control over Kafka instance deployed in OpenShift + * @author Jakub Cechacek + */ +public class KafkaController { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaController.class); + + private final Kafka kafka; + private final OpenShiftClient ocp; + private final OkHttpClient http; + private final String project; + private final OpenShiftUtils ocpUtils; + + public KafkaController(Kafka kafka, OpenShiftClient ocp, OkHttpClient http) { + this.kafka = kafka; + this.ocp = ocp; + this.http = http; + this.project = kafka.getMetadata().getNamespace(); + this.ocpUtils = new OpenShiftUtils(ocp); + } + + /** + * @return host and port for public bootstrap service + */ + public String getKafkaBootstrapAddress() { + List listeners = kafka.getStatus().getListeners(); + ListenerStatus listener = listeners.stream() + .filter(l -> l.getType().equalsIgnoreCase("external")) + .findAny().orElseThrow(() -> new IllegalStateException("No external listener found for Kafka cluster " + kafka.getMetadata().getName())); + ListenerAddress address = listener.getAddresses().get(0); + return address.getHost() + ":" + address.getPort(); + } + + /** + * Undeploy this Kafka cluster by deleted related KafkaConnect CR + * @return true if the CR was found and deleted + */ + public boolean undeployCluster() { + return Crds.kafkaOperation(ocp).delete(kafka); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaDeployer.java b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaDeployer.java new file mode 100644 index 000000000..699b9db98 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/main/java/io/debezium/testing/openshift/tools/kafka/KafkaDeployer.java @@ -0,0 +1,127 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.tools.kafka; + +import static java.util.concurrent.TimeUnit.MINUTES; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.testing.openshift.tools.OpenShiftUtils; +import io.debezium.testing.openshift.tools.YAML; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; +import io.fabric8.openshift.client.OpenShiftClient; +import io.strimzi.api.kafka.Crds; +import io.strimzi.api.kafka.KafkaConnectList; +import io.strimzi.api.kafka.KafkaList; +import io.strimzi.api.kafka.model.DoneableKafka; +import io.strimzi.api.kafka.model.DoneableKafkaConnect; +import io.strimzi.api.kafka.model.Kafka; +import io.strimzi.api.kafka.model.KafkaConnect; +import io.strimzi.api.kafka.model.KafkaConnectBuilder; +import io.strimzi.api.kafka.model.status.HasStatus; +import io.strimzi.api.kafka.model.status.Status; + +import okhttp3.OkHttpClient; + +/** + * Deployment management for Kafka & Kafka Connect clusters via Strimzi + * @author Jakub Cechacek + */ +public class KafkaDeployer { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDeployer.class); + + private final OpenShiftClient ocp; + private final OkHttpClient http; + private final OpenShiftUtils ocpUtils; + private final String project; + + public KafkaDeployer(String project, OpenShiftClient ocp, OkHttpClient http) { + this.project = project; + this.ocp = ocp; + this.http = http; + this.ocpUtils = new OpenShiftUtils(ocp); + } + + public KafkaDeployer(String project, OpenShiftClient ocp) { + this(project, ocp, new OkHttpClient()); + } + + /** + * Deploys Kafka Cluster + * @param yamlPath path to CR descriptor (must be available on class path) + * @return {@link KafkaController} instance for deployed cluster + * @throws InterruptedException + */ + public KafkaController deployKafkaCluster(String yamlPath) throws InterruptedException { + LOGGER.info("Deploying Kafka from " + yamlPath); + Kafka kafka = kafkaOperation().createOrReplace(YAML.fromResource(yamlPath, Kafka.class)); + + kafka = waitForKafkaCluster(kafka.getMetadata().getName()); + return new KafkaController(kafka, ocp, http); + } + + /** + * Deploys Kafka Connect Cluster + * @param yamlPath path to CR descriptor (must be available on class path) + * @param useConnectorResources true if connector deployment should be managed by operator + * @return {@link KafkaController} instance for deployed cluster + */ + public KafkaConnectController deployKafkaConnectCluster(String yamlPath, String loggingYamlPath, boolean useConnectorResources) throws InterruptedException { + LOGGER.info("Deploying KafkaConnect from" + yamlPath); + + ocp.configMaps().inNamespace(project).createOrReplace(YAML.fromResource(loggingYamlPath, ConfigMap.class)); + + KafkaConnect kafkaConnect = YAML.fromResource(yamlPath, KafkaConnect.class); + if (useConnectorResources) { + kafkaConnect = new KafkaConnectBuilder(kafkaConnect) + .editMetadata() + .addToAnnotations("strimzi.io/use-connector-resources", "true") + .endMetadata() + .build(); + } + kafkaConnectOperation().createOrReplace(kafkaConnect); + + kafkaConnect = waitForConnectCluster(kafkaConnect.getMetadata().getName()); + return new KafkaConnectController(kafkaConnect, ocp, http, useConnectorResources); + } + + public Kafka waitForKafkaCluster(String name) throws InterruptedException { + return kafkaOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES); + } + + public KafkaConnect waitForConnectCluster(String name) throws InterruptedException { + return kafkaConnectOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES); + } + + private boolean waitForReadyStatus(HasStatus kc) { + return kc.getStatus() != null && + kc.getStatus().getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Ready") && c.getStatus().equalsIgnoreCase("True")); + } + + /** + * Deploys pull secret and links it to "default" service account in the project + * @param yamlPath path to Secret descriptor + * @return deployed pull secret + */ + public Secret deployPullSecret(String yamlPath) { + Secret pullSecret = ocp.secrets().createOrReplace(YAML.from(yamlPath, Secret.class)); + ocpUtils.linkPullSecret(project, "default", pullSecret.getMetadata().getName()); + return pullSecret; + } + + private NonNamespaceOperation> kafkaOperation() { + return Crds.kafkaOperation(ocp).inNamespace(project); + } + + private NonNamespaceOperation> kafkaConnectOperation() { + return Crds.kafkaConnectOperation(ocp).inNamespace(project); + } + +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java new file mode 100644 index 000000000..bfa3b3f07 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/ConnectorTestBase.java @@ -0,0 +1,98 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.testing.openshift.resources.ConfigProperties; +import io.debezium.testing.openshift.tools.kafka.KafkaConnectController; +import io.debezium.testing.openshift.tools.kafka.KafkaController; +import io.debezium.testing.openshift.tools.kafka.KafkaDeployer; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.openshift.client.DefaultOpenShiftClient; +import io.fabric8.openshift.client.OpenShiftClient; + +/** + * @author Jakub Cechacek + */ +public abstract class ConnectorTestBase { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorTestBase.class); + + public static final String KAFKA = "/kafka-resources/010-kafka.yaml"; + public static final String KAFKA_CONNECT_S2I_LOGGING = "/kafka-resources/020-kafka-connect-logging.yaml"; + public static final String KAFKA_CONNECT_S2I = "/kafka-resources/021-kafka-connect.yaml"; + + protected static Properties KAFKA_CONSUMER_PROPS = new Properties(); + protected static OpenShiftClient ocp; + protected static TestUtils testUtils; + protected static KafkaDeployer kafkaDeployer; + protected static KafkaController kafkaController; + protected static KafkaConnectController kafkaConnectController; + + @BeforeAll + public static void setup() throws InterruptedException { + Config cfg = new ConfigBuilder() + .withMasterUrl(ConfigProperties.OCP_URL) + .withUsername(ConfigProperties.OCP_USERNAME) + .withPassword(ConfigProperties.OCP_PASSWORD) + .withTrustCerts(true) + .build(); + ocp = new DefaultOpenShiftClient(cfg); + testUtils = new TestUtils(); + + kafkaDeployer = new KafkaDeployer(ConfigProperties.OCP_PROJECT_DBZ, ocp); + ConfigProperties.OCP_SECRET_RHIO_PATH.ifPresent(kafkaDeployer::deployPullSecret); + + kafkaController = kafkaDeployer.deployKafkaCluster(KAFKA); + kafkaConnectController = kafkaDeployer.deployKafkaConnectCluster(KAFKA_CONNECT_S2I, KAFKA_CONNECT_S2I_LOGGING, ConfigProperties.STRIMZI_OPERATOR_CONNECTORS); + kafkaConnectController.allowServiceAccess(); + kafkaConnectController.exposeApi(); + kafkaConnectController.exposeMetrics(); + + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaController.getKafkaBootstrapAddress()); + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.GROUP_ID_CONFIG, "DEBEZIUM_IT_01"); + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KAFKA_CONSUMER_PROPS.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + } + + protected void assertTopicsExist(String... names) { + try (Consumer consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) { + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + Set topics = consumer.listTopics().keySet(); + assertThat(topics).contains(names); + }); + } + } + + protected void assertRecordsCount(String topic, int count) { + try (Consumer consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) { + consumer.subscribe(Collections.singleton(topic)); + ConsumerRecords records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS)); + consumer.seekToBeginning(consumer.assignment()); + assertThat(records.count()).isEqualTo(count); + } + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/TestUtils.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/TestUtils.java new file mode 100644 index 000000000..ddc732d7c --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/TestUtils.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift; + +/** + * + * @author Jakub Cechacek + */ +public class TestUtils { + + /** + * Generates unique identifier + * @return unique id + */ + public String getUniqueId() { + return String.valueOf(System.currentTimeMillis()); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java new file mode 100644 index 000000000..4738e8dff --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/mysql/MySqlConnectorIT.java @@ -0,0 +1,104 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.mysql; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import io.debezium.testing.openshift.ConnectorTestBase; +import io.debezium.testing.openshift.resources.ConfigProperties; +import io.debezium.testing.openshift.resources.ConnectorFactories; +import io.debezium.testing.openshift.tools.databases.MySqlDeployer; +import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +/** + * @author Jakub Cechacek + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Tag("acceptance") +@Tag("mysql") +public class MySqlConnectorIT extends ConnectorTestBase { + + public static final String DB_DEPLOYMENT_PATH = "/database-resources/mysql/deployment.yaml"; + public static final String DB_SERVICE_PATH_LB = "/database-resources/mysql/service-lb.yaml"; + public static final String DB_SERVICE_PATH = "/database-resources/mysql/service.yaml"; + + public static final String CONNECTOR_NAME = "inventory-connector-mysql"; + + private static MySqlDeployer dbDeployer; + private static OkHttpClient httpClient = new OkHttpClient(); + private static ConnectorFactories connectorFactories = new ConnectorFactories(); + private static String connectorName; + + @BeforeAll + public static void setupDatabase() throws IOException, InterruptedException { + if (!ConfigProperties.DATABASE_MYSQL_HOST.isPresent()) { + dbDeployer = new MySqlDeployer(ocp) + .withProject(ConfigProperties.OCP_PROJECT_MYSQL) + .withDeployment(DB_DEPLOYMENT_PATH) + .withServices(DB_SERVICE_PATH_LB, DB_SERVICE_PATH); + dbDeployer.deploy(); + } + + connectorName = CONNECTOR_NAME + "-" + testUtils.getUniqueId(); + ConnectorConfigBuilder connectorConfig = connectorFactories.mysql().put("database.server.name", connectorName); + kafkaConnectController.deployConnector(connectorName, connectorConfig); + } + + @AfterAll + public static void tearDownDatabase() throws IOException { + kafkaConnectController.undeployConnector(connectorName); + } + + @Test + @Order(1) + public void shouldHaveRegisteredConnector() { + Request r = new Request.Builder() + .url(kafkaConnectController.getApiURL().resolve("/connectors")) + .build(); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + try (Response res = httpClient.newCall(r).execute()) { + assertThat(res.body().string()).contains(connectorName); + } + }); + } + + @Test + @Order(2) + public void shouldCreateKafkaTopics() { + assertTopicsExist( + connectorName + ".inventory.addresses", + connectorName + ".inventory.customers", + connectorName + ".inventory.geom", + connectorName + ".inventory.orders", + connectorName + ".inventory.products", + connectorName + ".inventory.products_on_hand"); + } + + @Test + @Order(3) + public void shouldContainRecordsInCustomersTopic() throws IOException { + kafkaConnectController.waitForMySqlSnapshot(connectorName); + assertRecordsCount(connectorName + ".inventory.customers", 4); + } + +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java new file mode 100644 index 000000000..68a4c9f06 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/postgresql/PostgreSqlConnectorIT.java @@ -0,0 +1,106 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.postgresql; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import io.debezium.testing.openshift.ConnectorTestBase; +import io.debezium.testing.openshift.resources.ConfigProperties; +import io.debezium.testing.openshift.resources.ConnectorFactories; +import io.debezium.testing.openshift.tools.databases.PostgreSqlDeployer; +import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; + +/** + * @author Jakub Cechacek + */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Tag("acceptance") +@Tag("postgresql") +public class PostgreSqlConnectorIT extends ConnectorTestBase { + + public static final String DB_DEPLOYMENT_PATH = "/database-resources/postgresql/deployment.yaml"; + public static final String DB_SERVICE_PATH_LB = "/database-resources/postgresql/service-lb.yaml"; + public static final String DB_SERVICE_PATH = "/database-resources/postgresql/service.yaml"; + + public static final String CONNECTOR_NAME = "inventory-connector-postgresql"; + + private static PostgreSqlDeployer dbDeployer; + private static OkHttpClient httpClient = new OkHttpClient(); + private static ConnectorFactories connectorFactories = new ConnectorFactories(); + private static String connectorName; + + @BeforeAll + public static void setupDatabase() throws IOException, InterruptedException { + if (!ConfigProperties.DATABASE_MYSQL_HOST.isPresent()) { + dbDeployer = new PostgreSqlDeployer(ocp) + .withProject(ConfigProperties.OCP_PROJECT_POSTGRESQL) + .withDeployment(DB_DEPLOYMENT_PATH) + .withServices(DB_SERVICE_PATH_LB, DB_SERVICE_PATH); + dbDeployer.deploy(); + } + + String id = testUtils.getUniqueId(); + connectorName = CONNECTOR_NAME + "-" + id; + ConnectorConfigBuilder connectorConfig = connectorFactories.postgresql() + .put("database.server.name", connectorName) + .put("slot.name", "debezium_" + id) + .put("slot.drop.on.stop", true); + kafkaConnectController.deployConnector(connectorName, connectorConfig); + } + + @AfterAll + public static void tearDownDatabase() throws IOException { + kafkaConnectController.undeployConnector(connectorName); + } + + @Test + @Order(1) + public void shouldHaveRegisteredConnector() { + Request r = new Request.Builder() + .url(kafkaConnectController.getApiURL().resolve("/connectors")) + .build(); + + await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> { + try (Response res = httpClient.newCall(r).execute()) { + assertThat(res.body().string()).contains(connectorName); + } + }); + } + + @Test + @Order(2) + public void shouldCreateKafkaTopics() { + assertTopicsExist( + connectorName + ".inventory.customers", + connectorName + ".inventory.orders", + connectorName + ".inventory.products", + connectorName + ".inventory.products_on_hand"); + } + + @Test + @Order(3) + public void shouldContainRecordsInCustomersTopic() throws IOException { + kafkaConnectController.waitForPostgreSqlSnapshot(connectorName); + assertRecordsCount(connectorName + ".inventory.customers", 4); + } + +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java new file mode 100644 index 000000000..7b58c0404 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConfigProperties.java @@ -0,0 +1,62 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.resources; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author Jakub Cechacek + */ +public class ConfigProperties { + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigProperties.class); + + public static final String OCP_URL = stringProperty("test.ocp.url"); + public static final String OCP_USERNAME = stringProperty("test.ocp.username"); + public static final String OCP_PASSWORD = stringProperty("test.ocp.password"); + public static final String OCP_PROJECT_DBZ = stringProperty("test.ocp.project.debezium"); + public static final String OCP_PROJECT_MYSQL = System.getProperty("test.ocp.project.mysql", "debezium-mysql"); + public static final String OCP_PROJECT_POSTGRESQL = System.getProperty("test.ocp.project.postgresql", "debezium-postgresql"); + public static final Optional OCP_SECRET_RHIO_PATH = stringOptionalProperty("test.ocp.secret.rhio.path"); + + public static final boolean STRIMZI_OPERATOR_CONNECTORS = booleanProperty("test.strimzi.operator.connectors"); + + public static final int DATABASE_MYSQL_PORT = Integer.parseInt(System.getProperty("test.database.mysql.port", "3306")); + public static final String DATABASE_MYSQL_DBZ_USERNAME = System.getProperty("test.database.mysql.dbz.username", "debezium"); + public static final String DATABASE_MYSQL_DBZ_PASSWORD = System.getProperty("test.database.mysql.dbz.password", "dbz"); + public static final Optional DATABASE_MYSQL_HOST = stringOptionalProperty("test.database.mysql.host"); + + public static final int DATABASE_POSTGRESQL_PORT = Integer.parseInt(System.getProperty("test.database.postgresql.port", "5432")); + public static final String DATABASE_POSTGRESQL_DBZ_USERNAME = System.getProperty("test.database.postgresql.username", "debezium"); + public static final String DATABASE_POSTGRESQL_DBZ_PASSWORD = System.getProperty("test.database.postgresql.password", "debezium"); + public static final String DATABASE_POSTGRESQL_DBZ_DBNAME = System.getProperty("test.database.postgresql.dbname", "debezium"); + public static final Optional DATABASE_POSTGRESQL_HOST = stringOptionalProperty("test.database.postgresql.host"); + + private static boolean booleanProperty(String key) { + String value = System.getProperty(key); + if (value == null || value.isEmpty() || value.equalsIgnoreCase("false") || value.equalsIgnoreCase("0")) { + return false; + } + return true; + } + + private static Optional stringOptionalProperty(String key) { + String value = System.getProperty(key); + return Optional.ofNullable((value == null || value.isEmpty()) ? null : value); + } + + private static String stringProperty(String key) { + String value = System.getProperty(key); + if (value == null) { + LOGGER.error("Undefined property " + key); + throw new IllegalStateException("Undefined property \"" + key + "\""); + } + return value; + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java new file mode 100644 index 000000000..96162cd0c --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/java/io/debezium/testing/openshift/resources/ConnectorFactories.java @@ -0,0 +1,51 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.openshift.resources; + +import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MYSQL_HOST; +import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_POSTGRESQL_HOST; + +import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder; + +/** + * + * @author Jakub Cechacek + */ +public class ConnectorFactories { + + public ConnectorConfigBuilder mysql() { + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(); + String dbHost = DATABASE_MYSQL_HOST.orElse("mysql." + ConfigProperties.OCP_PROJECT_MYSQL + ".svc.cluster.local"); + return cb + .put("connector.class", "io.debezium.connector.mysql.MySqlConnector") + .put("task.max", 1) + .put("database.hostname", dbHost) + .put("database.port", ConfigProperties.DATABASE_MYSQL_PORT) + .put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME) + .put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD) + .put("database.server.name", "mysqldb") // this should be overwritten with unique name + .put("database.whitelist", "inventory") // might want to change + .put("database.history.kafka.bootstrap.servers", "debezium-kafka-cluster-kafka-bootstrap." + ConfigProperties.OCP_PROJECT_DBZ + ".svc.cluster.local:9092") + .put("database.history.kafka.topic", "schema-changes.inventory"); + } + + public ConnectorConfigBuilder postgresql() { + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(); + String dbHost = DATABASE_POSTGRESQL_HOST.orElse("postgresql." + ConfigProperties.OCP_PROJECT_POSTGRESQL + ".svc.cluster.local"); + return cb + .put("connector.class", "io.debezium.connector.postgresql.PostgresConnector") + .put("task.max", 1) + .put("database.hostname", dbHost) + .put("database.port", ConfigProperties.DATABASE_POSTGRESQL_PORT) + .put("database.user", ConfigProperties.DATABASE_POSTGRESQL_DBZ_USERNAME) + .put("database.password", ConfigProperties.DATABASE_POSTGRESQL_DBZ_PASSWORD) + .put("database.dbname", ConfigProperties.DATABASE_POSTGRESQL_DBZ_DBNAME) + .put("database.server.name", "postgresqldb") // this should be overwritten with unique name + .put("schema.whitelist", "inventory") // might want to change + .put("slot.name", "debezium") // this should be overwritten with unique name + .put("plugin.name", "pgoutput"); + } +} diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/deployment.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/deployment.yaml new file mode 100644 index 000000000..21d95c6f9 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/deployment.yaml @@ -0,0 +1,62 @@ +kind: Deployment +apiVersion: apps/v1 +metadata: + name: mysql + labels: + app: mysql +spec: + replicas: 1 + selector: + matchLabels: + app: mysql + deployment: mysql + template: + metadata: + labels: + app: mysql + deployment: mysql + spec: + volumes: + - name: mysql-volume-1 + emptyDir: {} + containers: + - resources: {} + name: mysql + env: + - name: MYSQL_PASSWORD + value: ${database.mysql.password} + - name: MYSQL_ROOT_PASSWORD + value: ${database.mysql.root.password} + - name: MYSQL_USER + value: ${database.mysql.username} + ports: + - containerPort: 3306 + protocol: TCP + - containerPort: 33060 + protocol: TCP + imagePullPolicy: IfNotPresent + volumeMounts: + - name: mysql-volume-1 + mountPath: /var/lib/mysql + livenessProbe: + initialDelaySeconds: 30 + tcpSocket: + port: 3306 + timeoutSeconds: 1 + readinessProbe: + exec: + command: + - "/bin/sh" + - "-i" + - "-c" + - "MYSQL_PWD=\"$MYSQL_PASSWORD\" mysql -h 127.0.0.1 -u $MYSQL_USER -D inventory -e 'SELECT 1'" + initialDelaySeconds: 5 + timeoutSeconds": 1 + terminationMessagePolicy: File + terminationMessagePath: /dev/termination-log + image: 'debezium/example-mysql:latest' + restartPolicy: Always + terminationGracePeriodSeconds: 30 + dnsPolicy: ClusterFirst + strategy: + type: Recreate diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service-lb.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service-lb.yaml new file mode 100644 index 000000000..ba942a035 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service-lb.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: mysql-lb +spec: + selector: + app: mysql + deployment: mysql + type: LoadBalancer + ports: + - name: db + port: 3306 + targetPort: 3306 diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service.yaml new file mode 100644 index 000000000..9c4f11ccc --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/mysql/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: mysql +spec: + selector: + app: mysql + deployment: mysql + ports: + - name: db + port: 3306 + targetPort: 3306 diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/deployment.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/deployment.yaml new file mode 100644 index 000000000..5943fe0d2 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/deployment.yaml @@ -0,0 +1,54 @@ +kind: Deployment +apiVersion: apps/v1 +metadata: + name: postgresql + labels: + app: postgresql +spec: + replicas: 1 + selector: + matchLabels: + app: postgresql + deployment: postgresql + template: + metadata: + labels: + app: postgresql + deployment: postgresql + spec: + containers: + - resources: {} + name: postgresql + env: + - name: POSTGRESQL_PASSWORD + value: ${database.postgresql.password} + - name: POSTGRESQL_DATABASE + value: ${database.postgresql.dbname} + - name: POSTGRESQL_USER + value: ${database.postgresql.username} + ports: + - containerPort: 5432 + protocol: TCP + imagePullPolicy: Always + livenessProbe: + initialDelaySeconds: 30 + tcpSocket: + port: 5432 + timeoutSeconds: 1 + readinessProbe: + exec: + command: + - "/bin/sh" + - "-i" + - "-c" + - "PGPASSWORD=${POSTGRESQL_PASSWORD} /usr/bin/psql -w -U ${POSTGRESQL_USER} -d ${POSTGRESQL_DATABASE} -c 'SELECT 1'" + initialDelaySeconds: 5 + timeoutSeconds": 1 + terminationMessagePolicy: File + terminationMessagePath: /dev/termination-log + image: 'quay.io/debezium/example-postgres-ocp:latest' + restartPolicy: Always + terminationGracePeriodSeconds: 30 + dnsPolicy: ClusterFirst + strategy: + type: Recreate diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service-lb.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service-lb.yaml new file mode 100644 index 000000000..f59bb2ca0 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service-lb.yaml @@ -0,0 +1,13 @@ +apiVersion: v1 +kind: Service +metadata: + name: postgresql-lb +spec: + selector: + app: postgresql + deployment: postgresql + type: LoadBalancer + ports: + - name: db + port: 5432 + targetPort: 5432 diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service.yaml new file mode 100644 index 000000000..220ef51ba --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/database-resources/postgresql/service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: postgresql +spec: + selector: + app: postgresql + deployment: postgresql + ports: + - name: db + port: 5432 + targetPort: 5432 diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/010-kafka.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/010-kafka.yaml new file mode 100644 index 000000000..9958b1ec5 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/010-kafka.yaml @@ -0,0 +1,28 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: Kafka +metadata: + generation: 4 + name: debezium-kafka-cluster +spec: + entityOperator: + topicOperator: {} + userOperator: {} + kafka: + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + listeners: + external: + type: loadbalancer + tls: false + plain: {} + tls: {} + replicas: 1 + storage: + type: ephemeral + version: ${version.kafka} + zookeeper: + replicas: 1 + storage: + type: ephemeral diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/020-kafka-connect-logging.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/020-kafka-connect-logging.yaml new file mode 100644 index 000000000..c1eac3161 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/020-kafka-connect-logging.yaml @@ -0,0 +1,24 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: connect-logging +data: + log4j.properties: |+ + kafka.logs.dir=logs + + log4j.rootLogger=INFO, stdout, appender + + # Disable excessive reflection warnings - KAFKA-5229 + log4j.logger.org.reflections=ERROR + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.threshold=DEBUG + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n + + + log4j.appender.appender=org.apache.log4j.DailyRollingFileAppender + log4j.appender.appender.DatePattern='.'yyyy-MM-dd-HH + log4j.appender.appender.File=${kafka.logs.dir}/connect-service.log + log4j.appender.appender.layout=org.apache.log4j.PatternLayout + log4j.appender.appender.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/021-kafka-connect.yaml b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/021-kafka-connect.yaml new file mode 100644 index 000000000..1c2011b6c --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/kafka-resources/021-kafka-connect.yaml @@ -0,0 +1,27 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaConnect +metadata: + name: debezium-kafka-connect-cluster +spec: + version: ${version.kafka} + image: ${image.fullname} + bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093 + logging: + name: connect-logging + type: external + tls: + trustedCertificates: + - certificate: ca.crt + secretName: debezium-kafka-cluster-cluster-ca-cert + replicas: 1 + metrics: + lowercaseOutputName: true + config: + config.storage.replication.factor: 1 + offset.storage.replication.factor: 1 + status.storage.replication.factor: 1 + template: + connectContainer: + env: + - name: "JMX_PORT" + value: "5000" \ No newline at end of file diff --git a/debezium-testing/debezium-testing-openshift/src/test/resources/log4j.properties b/debezium-testing/debezium-testing-openshift/src/test/resources/log4j.properties new file mode 100644 index 000000000..d61e16634 --- /dev/null +++ b/debezium-testing/debezium-testing-openshift/src/test/resources/log4j.properties @@ -0,0 +1,9 @@ +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + + +# Root logger option +log4j.rootLogger=INFO, stdout \ No newline at end of file diff --git a/debezium-testing/pom.xml b/debezium-testing/pom.xml index 1703df813..7e52cd904 100644 --- a/debezium-testing/pom.xml +++ b/debezium-testing/pom.xml @@ -12,5 +12,6 @@ pom debezium-testing-testcontainers + debezium-testing-openshift diff --git a/pom.xml b/pom.xml index 54e34d8f6..b91876453 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ 1.4 1.21 3.0.0 - 3.1.6 + 4.0.1 1.12.5 2.4.0 4.2.2 @@ -117,6 +117,8 @@ 3.0.0 2.5.3 1.2.0 + 2.22.2 + ${version.surefire.plugin} Debezium community