From ffe2bcf9115cbfcc832f83952a13289a138cecc3 Mon Sep 17 00:00:00 2001 From: jcechace Date: Mon, 9 May 2022 13:16:01 +0200 Subject: [PATCH] DBZ-5095 Artifact server management and Fabric8 builder cleanup --- .../testing/system/tools/Deployer.java | 2 +- .../testing/system/tools/OpenShiftUtils.java | 17 ++- .../OcpArtifactServerController.java | 95 ++++++++++++++- .../artifacts/OcpArtifactServerDeployer.java | 2 +- .../FabricBuilderWrapper.java} | 10 +- .../tools/kafka/OcpKafkaConnectDeployer.java | 109 +++-------------- .../tools/kafka/OcpKafkaController.java | 10 ++ .../system/tools/kafka/OcpKafkaDeployer.java | 51 +------- ...kaBuilder.java => FabricKafkaBuilder.java} | 26 ++-- ...er.java => FabricKafkaConnectBuilder.java} | 111 +++++++----------- .../system/fixtures/kafka/OcpKafka.java | 75 ++++++------ 11 files changed, 240 insertions(+), 268 deletions(-) rename debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/{kafka/builders/kafka/StrimziBuilderWrapper.java => fabric8/FabricBuilderWrapper.java} (68%) rename debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/{kafka/StrimziKafkaBuilder.java => FabricKafkaBuilder.java} (86%) rename debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/{kafka/StrimziKafkaConnectBuilder.java => FabricKafkaConnectBuilder.java} (59%) diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/Deployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/Deployer.java index 995097a1a..f3f211c61 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/Deployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/Deployer.java @@ -11,7 +11,7 @@ public interface Deployer { * Deploys resource * @return Controller for deployed resource */ - T deploy() throws InterruptedException; + T deploy() throws Exception; interface Builder, D extends Deployer> { D build(); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/OpenShiftUtils.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/OpenShiftUtils.java index 78a22e6d7..240d7804a 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/OpenShiftUtils.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/OpenShiftUtils.java @@ -215,18 +215,25 @@ public void ensureHasPullSecret(Deployment deployment, String secret) { * @param labels labels used to identify pods * @return {@link PodList} of matching pods */ - public PodList podsWithLabels(String project, Map labels) { + public List podsWithLabels(String project, Map labels) { Supplier podListSupplier = () -> client.pods().inNamespace(project).withLabels(labels).list(); await().atMost(scaled(5), TimeUnit.MINUTES).until(() -> podListSupplier.get().getItems().size() > 0); - PodList pods = podListSupplier.get(); + List pods = podListSupplier.get().getItems(); - if (pods.getItems().isEmpty()) { + if (pods.isEmpty()) { LOGGER.warn("Empty PodList"); } return pods; } + public List podsForDeployment(Deployment deployment) { + String project = deployment.getMetadata().getNamespace(); + String name = deployment.getMetadata().getName(); + + return podsWithLabels(project, Map.of("deployment", name)); + } + /** * Waits until all pods with given labels are ready * @param project project where to look for pods @@ -236,9 +243,9 @@ public void waitForPods(String project, Map labels) { String lbls = labels.keySet().stream().map(k -> k + "=" + labels.get(k)).collect(Collectors.joining(", ")); LOGGER.info("Waiting for pods to deploy [" + lbls + "]"); - PodList pods = podsWithLabels(project, labels); + List pods = podsWithLabels(project, labels); - for (Pod p : pods.getItems()) { + for (Pod p : pods) { client.resource(p).waitUntilReady(scaled(5), TimeUnit.MINUTES); } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerController.java index d7722fe83..6c88dd149 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerController.java @@ -5,9 +5,25 @@ */ package io.debezium.testing.system.tools.artifacts; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Stream; + +import io.debezium.testing.system.tools.OpenShiftUtils; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.openshift.client.OpenShiftClient; +import io.strimzi.api.kafka.model.connect.build.*; import okhttp3.HttpUrl; import okhttp3.OkHttpClient; @@ -15,22 +31,95 @@ public class OcpArtifactServerController { private final Deployment deployment; + private final String project; private final Service service; private final OpenShiftClient ocp; private final OkHttpClient http; - public OcpArtifactServerController(Deployment deployment, Service service, OpenShiftClient ocp, OkHttpClient http) { + private final Map artifacts; + private final OpenShiftUtils ocpUtils; + + public OcpArtifactServerController(Deployment deployment, Service service, OpenShiftClient ocp, OkHttpClient http) throws IOException { this.deployment = deployment; + this.project = deployment.getMetadata().getNamespace(); this.service = service; this.ocp = ocp; + this.ocpUtils = new OpenShiftUtils(ocp); this.http = http; + this.artifacts = listArtifacts(); } public HttpUrl getBaseUrl() { - return new HttpUrl.Builder() - .scheme("http") + return new HttpUrl.Builder().scheme("http") .host(service.getMetadata().getName() + "." + service.getMetadata().getNamespace() + ".svc.cluster.local") .port(8080) .build(); } + + private HttpUrl createArtifactUrl(String link) { + return getBaseUrl().resolve(link); + } + + public Optional geArtifactUrl(String name) { + return Optional.ofNullable(artifacts.get(name)); + } + + public Optional getArtifactUrlAsString(String name) { + return geArtifactUrl(name).map(HttpUrl::toString); + } + + private Artifact createArtifact(String url) { + Objects.requireNonNull(url); + String type = url.substring(url.lastIndexOf('.') + 1); + + switch (type) { + case "zip": + return new ZipArtifactBuilder().withUrl(url).build(); + case "jar": + return new JarArtifactBuilder().withUrl(url).build(); + default: + throw new IllegalStateException("Unsupported artifact type: " + type); + } + } + + public Plugin createPlugin(String name, List artifacts) { + List pluginArtifacts = artifacts.stream() + .map(this::getArtifactUrlAsString) + .map(a -> a.orElseThrow(() -> new IllegalStateException("Missing artifact for plugin'" + name + "'"))) + .map(this::createArtifact) + .collect(toList()); + + return new PluginBuilder().withName(name).withArtifacts(pluginArtifacts).build(); + } + + public Plugin createDebeziumPlugin(String database) { + return createDebeziumPlugin(database, List.of()); + } + + public Plugin createDebeziumPlugin(String database, List extraArtifacts) { + List commonArtifacts = List.of("debezium-connector-" + database, "debezium-scripting", "connect-converter"); + List artifacts = Stream.concat(commonArtifacts.stream(), extraArtifacts.stream()).collect(toList()); + + return createPlugin("debezium-connector-" + database, artifacts); + } + + public List readArtifactListing() throws IOException { + Pod pod = ocpUtils.podsForDeployment(deployment).get(0); + + try (InputStream is = ocp.pods() + .inNamespace(project) + .withName(pod.getMetadata().getName()) + .inContainer("debezium-artifact-server") + .file("/opt/plugins/artifacts.txt") + .read()) { + + return new BufferedReader(new InputStreamReader(is)).lines().collect(toList()); + } + } + + public Map listArtifacts() throws IOException { + List listing = readArtifactListing(); + + return listing.stream().map(l -> l.split("::", 2)).collect(toMap(e -> e[0], e -> createArtifactUrl(e[1]))); + } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java index d410c9ee4..fc5351ec9 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java @@ -73,7 +73,7 @@ public OcpArtifactServerDeployer( } @Override - public OcpArtifactServerController deploy() throws InterruptedException { + public OcpArtifactServerController deploy() throws Exception { LOGGER.info("Deploying debezium artifact server"); deployment = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziBuilderWrapper.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/fabric8/FabricBuilderWrapper.java similarity index 68% rename from debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziBuilderWrapper.java rename to debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/fabric8/FabricBuilderWrapper.java index 905addae1..7ee8181cd 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziBuilderWrapper.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/fabric8/FabricBuilderWrapper.java @@ -3,7 +3,7 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.testing.system.tools.kafka.builders.kafka; +package io.debezium.testing.system.tools.fabric8; import java.util.function.Consumer; @@ -11,14 +11,14 @@ * Wraps Strimzi builder in order to provide convenience methods for Debezium configuration * * @param type of the child wrapper instance - * @param type of strimzi builder - * @param type of resource build by strimzi builder + * @param type of Fabric8 builder + * @param type of resource build by Faric8 builder */ -public abstract class StrimziBuilderWrapper, B, R> { +public abstract class FabricBuilderWrapper, B, R> { protected B builder; - protected StrimziBuilderWrapper(B builder) { + protected FabricBuilderWrapper(B builder) { this.builder = builder; } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaConnectDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaConnectDeployer.java index c075114c9..42fc80da9 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaConnectDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaConnectDeployer.java @@ -5,17 +5,13 @@ */ package io.debezium.testing.system.tools.kafka; -import static io.debezium.testing.system.tools.ConfigProperties.STRIMZI_KC_IMAGE; - import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.AbstractOcpDeployer; -import io.debezium.testing.system.tools.Deployer; -import io.debezium.testing.system.tools.YAML; -import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaConnectBuilder; +import io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; @@ -34,98 +30,21 @@ */ public class OcpKafkaConnectDeployer extends AbstractOcpDeployer { - /** - * Builder for {@link OcpKafkaConnectDeployer} - */ - public static class Builder implements Deployer.Builder { - - private String project; - private OpenShiftClient ocpClient; - private OkHttpClient httpClient; - private ConfigMap configMap; - private StrimziOperatorController operatorController; - private final StrimziKafkaConnectBuilder strimziBuilder; - - public Builder(StrimziKafkaConnectBuilder strimziBuilder) { - this.strimziBuilder = strimziBuilder; - } - - public OcpKafkaConnectDeployer.Builder withProject(String project) { - this.project = project; - return this; - } - - public OcpKafkaConnectDeployer.Builder withOcpClient(OpenShiftClient ocpClient) { - this.ocpClient = ocpClient; - return this; - } - - public OcpKafkaConnectDeployer.Builder withHttpClient(OkHttpClient httpClient) { - this.httpClient = httpClient; - return this; - } - - public OcpKafkaConnectDeployer.Builder withLoggingAndMetricsFromCfgMap(String cfgYamlPath) { - this.configMap = YAML.fromResource(cfgYamlPath, ConfigMap.class); - strimziBuilder - .withLoggingFromConfigMap(configMap) - .withMetricsFromConfigMap(configMap); - return this; - } - - public OcpKafkaConnectDeployer.Builder withConnectorResources(boolean connectorResources) { - if (connectorResources) { - strimziBuilder.withConnectorResources(); - } - return this; - } - - public OcpKafkaConnectDeployer.Builder withOperatorController(StrimziOperatorController operatorController) { - this.operatorController = operatorController; - operatorController.getPullSecretName().ifPresent(strimziBuilder::withPullSecret); - - return this; - } - - @Override - public OcpKafkaConnectDeployer build() { - return new OcpKafkaConnectDeployer( - project, - strimziBuilder, - configMap, - operatorController, - ocpClient, - httpClient); - } - - public Builder withKcBuild(boolean kcBuild) { - if (kcBuild) { - strimziBuilder - .withBuild() - .withStandardPlugins(); - } - else { - strimziBuilder.withImage(STRIMZI_KC_IMAGE); - } - return this; - } - } - private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaConnectDeployer.class); - private final StrimziKafkaConnectBuilder strimziBuilder; + private final FabricKafkaConnectBuilder fabricBuilder; private final ConfigMap configMap; private final StrimziOperatorController operatorController; - private OcpKafkaConnectDeployer( - String project, - StrimziKafkaConnectBuilder strimziBuilder, - ConfigMap configMap, - StrimziOperatorController operatorController, - OpenShiftClient ocp, - OkHttpClient http) { + public OcpKafkaConnectDeployer( + String project, + FabricKafkaConnectBuilder fabricBuilder, + ConfigMap configMap, + StrimziOperatorController operatorController, + OpenShiftClient ocp, + OkHttpClient http) { super(project, ocp, http); - this.strimziBuilder = strimziBuilder; + this.fabricBuilder = fabricBuilder; this.configMap = configMap; this.operatorController = operatorController; } @@ -142,11 +61,11 @@ public OcpKafkaConnectController deploy() throws InterruptedException { deployConfigMap(); } - if (strimziBuilder.hasBuild()) { + if (fabricBuilder.hasBuild()) { deployImageStream(); } - KafkaConnect kafkaConnect = strimziBuilder.build(); + KafkaConnect kafkaConnect = fabricBuilder.build(); kafkaConnect = kafkaConnectOperation().createOrReplace(kafkaConnect); OcpKafkaConnectController controller = new OcpKafkaConnectController( @@ -165,12 +84,12 @@ private void deployConfigMap() { } private void deployImageStream() { - Optional imageStream = strimziBuilder.imageStream(); + Optional imageStream = fabricBuilder.imageStream(); if (!imageStream.isPresent()) { throw new IllegalStateException("Image stream missing"); } - String[] image = strimziBuilder.imageStream().get().split(":", 2); + String[] image = fabricBuilder.imageStream().get().split(":", 2); ImageStream is = new ImageStreamBuilder() .withNewMetadata().withName(image[0]).endMetadata() diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java index a16723895..94fd6a656 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java @@ -64,6 +64,16 @@ public String getBootstrapAddress() { return name + "-kafka-bootstrap." + project + ".svc.cluster.local:9092"; } + /** + * Returns bootstrap to be used by KC. + * The address is local. + * + * @return bootstrap + */ + public String getLocalBootstrapAddress() { + return name + "-kafka-bootstrap:9093"; + } + /** * Deploy kafka topic from given CR * diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaDeployer.java index c55f781d4..29b39d5fd 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaDeployer.java @@ -9,8 +9,7 @@ import org.slf4j.LoggerFactory; import io.debezium.testing.system.tools.AbstractOcpDeployer; -import io.debezium.testing.system.tools.Deployer; -import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaBuilder; +import io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder; import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.openshift.client.OpenShiftClient; @@ -26,55 +25,13 @@ */ public final class OcpKafkaDeployer extends AbstractOcpDeployer { - /** - * Builder for {@link OcpKafkaDeployer} - */ - public static class Builder implements Deployer.Builder { - - private String project; - private OpenShiftClient ocpClient; - private OkHttpClient httpClient; - private StrimziOperatorController operatorController; - private final StrimziKafkaBuilder strimziBuilder; - - public Builder(StrimziKafkaBuilder strimziBuilder) { - this.strimziBuilder = strimziBuilder; - } - - public Builder withProject(String project) { - this.project = project; - return this; - } - - public Builder withOcpClient(OpenShiftClient ocpClient) { - this.ocpClient = ocpClient; - return this; - } - - public Builder withHttpClient(OkHttpClient httpClient) { - this.httpClient = httpClient; - return this; - } - - public Builder withOperatorController(StrimziOperatorController operatorController) { - this.operatorController = operatorController; - operatorController.getPullSecretName().ifPresent(strimziBuilder::withPullSecret); - return this; - } - - @Override - public OcpKafkaDeployer build() { - return new OcpKafkaDeployer(project, strimziBuilder, operatorController, ocpClient, httpClient); - } - } - private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaDeployer.class); - private final StrimziKafkaBuilder strimziBuilder; + private final FabricKafkaBuilder strimziBuilder; private final StrimziOperatorController operatorController; - private OcpKafkaDeployer(String project, StrimziKafkaBuilder strimziBuilder, StrimziOperatorController operatorController, - OpenShiftClient ocp, OkHttpClient http) { + public OcpKafkaDeployer(String project, FabricKafkaBuilder strimziBuilder, StrimziOperatorController operatorController, + OpenShiftClient ocp, OkHttpClient http) { super(project, ocp, http); this.strimziBuilder = strimziBuilder; this.operatorController = operatorController; diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java similarity index 86% rename from debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaBuilder.java rename to debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java index 059b5605e..c8a61cd4d 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaBuilder.java @@ -3,15 +3,14 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.testing.system.tools.kafka.builders.kafka; +package io.debezium.testing.system.tools.kafka.builders; import static io.debezium.testing.system.tools.ConfigProperties.STRIMZI_VERSION_KAFKA; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper; +import io.fabric8.kubernetes.api.model.Secret; import io.strimzi.api.kafka.model.EntityOperatorSpec; import io.strimzi.api.kafka.model.EntityOperatorSpecBuilder; import io.strimzi.api.kafka.model.EntityTopicOperatorSpec; @@ -32,10 +31,10 @@ /** * This class simplifies building of kafka by providing default configuration for whole kafka or parts of its definition */ -public final class StrimziKafkaBuilder extends StrimziBuilderWrapper { +public final class FabricKafkaBuilder extends FabricBuilderWrapper { public static String DEFAULT_KAFKA_NAME = "debezium-kafka-cluster"; - private StrimziKafkaBuilder(KafkaBuilder kafkaBuilder) { + private FabricKafkaBuilder(KafkaBuilder kafkaBuilder) { super(kafkaBuilder); } @@ -44,7 +43,7 @@ public Kafka build() { return builder.build(); } - public static StrimziKafkaBuilder base() { + public static FabricKafkaBuilder base() { KafkaClusterSpec kafka = defaultKafkaSpec(); ZookeeperClusterSpec zookeeper = defaultKafkaZookeeperSpec(); EntityOperatorSpec entityOperator = defaultKafkaEntityOperatorSpec(); @@ -59,10 +58,17 @@ public static StrimziKafkaBuilder base() { .withEntityOperator(entityOperator) .endSpec(); - return new StrimziKafkaBuilder(builder); + return new FabricKafkaBuilder(builder); } - public StrimziKafkaBuilder withPullSecret(String pullSecretName) { + public FabricKafkaBuilder withPullSecret(Optional maybePullSecret) { + maybePullSecret + .map(s -> s.getMetadata().getName()) + .ifPresent(this::withPullSecret); + return self(); + } + + public FabricKafkaBuilder withPullSecret(String pullSecretName) { PodTemplate podTemplate = new PodTemplateBuilder().addNewImagePullSecret(pullSecretName).build(); builder diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaConnectBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java similarity index 59% rename from debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaConnectBuilder.java rename to debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java index 2eab51686..bff4155f4 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/kafka/StrimziKafkaConnectBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java @@ -3,36 +3,24 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.testing.system.tools.kafka.builders.kafka; +package io.debezium.testing.system.tools.kafka.builders; -import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_APC_URL; -import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_DB2_DRIVER_VERSION; -import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_DBZ_VERSION; -import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_ORACLE_DRIVER_VERSION; -import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_URL; -import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toMap; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.*; +import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController; +import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ConfigMapKeySelector; import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder; +import io.fabric8.kubernetes.api.model.Secret; import io.strimzi.api.kafka.model.CertSecretSourceBuilder; import io.strimzi.api.kafka.model.ClientTls; import io.strimzi.api.kafka.model.ClientTlsBuilder; import io.strimzi.api.kafka.model.ContainerEnvVarBuilder; import io.strimzi.api.kafka.model.KafkaConnect; import io.strimzi.api.kafka.model.KafkaConnectBuilder; -import io.strimzi.api.kafka.model.connect.build.JarArtifactBuilder; import io.strimzi.api.kafka.model.connect.build.Plugin; -import io.strimzi.api.kafka.model.connect.build.PluginBuilder; -import io.strimzi.api.kafka.model.connect.build.ZipArtifactBuilder; import io.strimzi.api.kafka.model.template.ContainerTemplateBuilder; import io.strimzi.api.kafka.model.template.KafkaConnectTemplate; import io.strimzi.api.kafka.model.template.KafkaConnectTemplateBuilder; @@ -40,11 +28,12 @@ /** * This class simplifies building of kafkaConnect by providing pre-made configurations for whole kafkaConnect or parts of its definition */ -public class StrimziKafkaConnectBuilder extends StrimziBuilderWrapper { +public class FabricKafkaConnectBuilder extends + FabricBuilderWrapper { public static String DEFAULT_KC_NAME = "debezium-kafka-connect-cluster"; - public static String DEFAULT_BOOSTRAP_SERVER = StrimziKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093"; + public static String DEFAULT_BOOSTRAP_SERVER = FabricKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093"; - protected StrimziKafkaConnectBuilder(KafkaConnectBuilder builder) { + protected FabricKafkaConnectBuilder(KafkaConnectBuilder builder) { super(builder); } @@ -65,7 +54,7 @@ public Optional imageStream() { return Optional.of(image); } - public static StrimziKafkaConnectBuilder base() { + public static FabricKafkaConnectBuilder base(String bootstrap) { Map config = defaultConfig(); KafkaConnectTemplate template = defaultTemplate(); ClientTls tls = defaultTLS(); @@ -75,35 +64,56 @@ public static StrimziKafkaConnectBuilder base() { .withName(DEFAULT_KC_NAME) .endMetadata() .withNewSpec() - .withBootstrapServers(DEFAULT_BOOSTRAP_SERVER) + .withBootstrapServers(bootstrap) .withTemplate(template) .withConfig(config) .withReplicas(1) .withTls(tls) .endSpec(); - return new StrimziKafkaConnectBuilder(builder); + return new FabricKafkaConnectBuilder(builder); } - public StrimziKafkaConnectBuilder withImage(String image) { + public FabricKafkaConnectBuilder withImage(String image) { builder.editSpec().withImage(image).endSpec(); return self(); } - public StrimziKafkaConnectBuilder withBuild() { + public FabricKafkaConnectBuilder withBuild(OcpArtifactServerController artifactServer) { + List plugins = new ArrayList<>(List.of( + artifactServer.createDebeziumPlugin("mysql"), + artifactServer.createDebeziumPlugin("postgres"), + artifactServer.createDebeziumPlugin("mongodb"), + artifactServer.createDebeziumPlugin("sqlserver"), + artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc")))); + + if (ConfigProperties.DATABASE_ORACLE) { + plugins.add( + artifactServer.createDebeziumPlugin("oracle", List.of("jdbc/ojdbc8"))); + } + + return withBuild(plugins); + } + + public FabricKafkaConnectBuilder withBuild(List plugins) { builder .editSpec() .withNewBuild() .withNewImageStreamOutput() .withImage("testing-openshift-connect:latest") .endImageStreamOutput() + .withPlugins(plugins) .endBuild() .endSpec(); return self(); } - public StrimziKafkaConnectBuilder withConnectorResources() { + public FabricKafkaConnectBuilder withConnectorResources(Boolean enabled) { + return enabled ? withConnectorResources() : self(); + } + + public FabricKafkaConnectBuilder withConnectorResources() { builder .editMetadata() .addToAnnotations("strimzi.io/use-connector-resources", "true") @@ -111,31 +121,14 @@ public StrimziKafkaConnectBuilder withConnectorResources() { return self(); } - public StrimziKafkaConnectBuilder withStandardPlugins() { - Map pluginBuilders = Stream.of("mysql", "postgres", "mongodb", "sqlserver", "db2", "oracle") - .collect(toMap(identity(), StrimziKafkaConnectBuilder::prepareStandardPluginBuilder)); - - pluginBuilders.get("db2") - .addToArtifacts(new JarArtifactBuilder() - .withUrl(String.format("%s/jdbc/jcc-%s.jar", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_DB2_DRIVER_VERSION)) - .build()) - .build(); - pluginBuilders.get("oracle") - .addToArtifacts(new JarArtifactBuilder() - .withUrl(String.format("%s/jdbc/ojdbc8-%s.jar", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_ORACLE_DRIVER_VERSION)) - .build()) - .build(); - - List plugins = pluginBuilders.values().stream() - .map(PluginBuilder::build) - .collect(Collectors.toList()); - - builder.editSpec().editBuild().withPlugins(plugins).endBuild().endSpec(); - + public FabricKafkaConnectBuilder withPullSecret(Optional maybePullSecret) { + maybePullSecret + .map(s -> s.getMetadata().getName()) + .ifPresent(this::withPullSecret); return self(); } - public StrimziKafkaConnectBuilder withPullSecret(String pullSecretName) { + public FabricKafkaConnectBuilder withPullSecret(String pullSecretName) { if (builder.editSpec().hasImage()) { builder .editSpec() @@ -161,7 +154,7 @@ public StrimziKafkaConnectBuilder withPullSecret(String pullSecretName) { return self(); } - public StrimziKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) { + public FabricKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) { ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder() .withKey("log4j.properties") .withName(configMap.getMetadata().getName()) @@ -180,7 +173,7 @@ public StrimziKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) } - public StrimziKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) { + public FabricKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) { ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder() .withKey("metrics") .withName(configMap.getMetadata().getName()) @@ -198,22 +191,6 @@ public StrimziKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) return self(); } - private static PluginBuilder prepareStandardPluginBuilder(String dbName) { - return new PluginBuilder() - .withName("debezium-connector-" + dbName) - .withArtifacts( - new ZipArtifactBuilder() - .withUrl(String.format("%s/debezium-connector-%s-%s-plugin.zip", ARTIFACT_SERVER_URL, dbName, - ARTIFACT_SERVER_DBZ_VERSION)) - .build(), - new ZipArtifactBuilder() - .withUrl(ARTIFACT_SERVER_APC_URL) - .build(), - new ZipArtifactBuilder() - .withUrl(String.format("%s/debezium-scripting-%s.zip", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_DBZ_VERSION)) - .build()); - } - private static KafkaConnectTemplate defaultTemplate() { return new KafkaConnectTemplateBuilder().withConnectContainer(new ContainerTemplateBuilder() .withEnv(new ContainerEnvVarBuilder() diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/OcpKafka.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/OcpKafka.java index 42954bf64..28e7233d9 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/OcpKafka.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/kafka/OcpKafka.java @@ -5,12 +5,16 @@ */ package io.debezium.testing.system.fixtures.kafka; +import static io.debezium.testing.system.tools.ConfigProperties.*; + import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.extension.ExtensionContext; import io.debezium.testing.system.assertions.KafkaAssertions; import io.debezium.testing.system.assertions.PlainKafkaAssertions; import io.debezium.testing.system.tools.ConfigProperties; +import io.debezium.testing.system.tools.YAML; +import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController; import io.debezium.testing.system.tools.artifacts.OcpArtifactServerDeployer; import io.debezium.testing.system.tools.kafka.KafkaConnectController; import io.debezium.testing.system.tools.kafka.KafkaController; @@ -19,8 +23,9 @@ import io.debezium.testing.system.tools.kafka.OcpKafkaController; import io.debezium.testing.system.tools.kafka.OcpKafkaDeployer; import io.debezium.testing.system.tools.kafka.StrimziOperatorController; -import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaBuilder; -import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaConnectBuilder; +import io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder; +import io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.openshift.client.OpenShiftClient; import fixture5.TestFixture; @@ -32,6 +37,7 @@ public class OcpKafka extends TestFixture { private final OpenShiftClient ocp; private final String project; + // Kafka resources String KAFKA_CONNECT_LOGGING = "/kafka-resources/020-kafka-connect-cfg.yaml"; // Artifact Server resources @@ -45,16 +51,11 @@ public OcpKafka(@NotNull ExtensionContext.Store store) { } @Override - public void setup() { - try { - StrimziOperatorController operatorController = updateKafkaOperator(); + public void setup() throws Exception { + StrimziOperatorController operatorController = updateKafkaOperator(); - deployKafkaCluster(operatorController); - deployConnectCluster(operatorController); - } - catch (Exception e) { - throw new IllegalStateException("Error while setting up Kafka", e); - } + OcpKafkaController kafkaController = deployKafkaCluster(operatorController); + deployConnectCluster(operatorController, kafkaController); } @Override @@ -62,35 +63,41 @@ public void teardown() { // no-op: kafka is reused across tests } - private void deployKafkaCluster(StrimziOperatorController operatorController) throws Exception { - OcpKafkaDeployer kafkaDeployer = new OcpKafkaDeployer.Builder(StrimziKafkaBuilder.base()) - .withOcpClient(ocp) - .withHttpClient(new OkHttpClient()) - .withProject(project) - .withOperatorController(operatorController) - .build(); + private OcpKafkaController deployKafkaCluster(StrimziOperatorController operatorController) throws Exception { + FabricKafkaBuilder builder = FabricKafkaBuilder + .base() + .withPullSecret(operatorController.getPullSecret()); + + OcpKafkaDeployer kafkaDeployer = new OcpKafkaDeployer( + project, builder, operatorController, ocp, new OkHttpClient()); OcpKafkaController controller = kafkaDeployer.deploy(); store(KafkaController.class, controller); store(KafkaAssertions.class, new PlainKafkaAssertions(controller.getDefaultConsumerProperties())); + + return controller; } - private void deployConnectCluster(StrimziOperatorController operatorController) throws InterruptedException { - if (ConfigProperties.STRIMZI_KC_BUILD) { - deployArtifactServer(); + private void deployConnectCluster(StrimziOperatorController operatorController, OcpKafkaController kafkaController) throws Exception { + ConfigMap configMap = YAML.fromResource(KAFKA_CONNECT_LOGGING, ConfigMap.class); + + FabricKafkaConnectBuilder builder = FabricKafkaConnectBuilder + .base(kafkaController.getLocalBootstrapAddress()) + .withLoggingFromConfigMap(configMap) + .withMetricsFromConfigMap(configMap) + .withConnectorResources(STRIMZI_OPERATOR_CONNECTORS) + .withPullSecret(operatorController.getPullSecret()); + + if (STRIMZI_KC_BUILD) { + OcpArtifactServerController artifactServerController = deployArtifactServer(); + builder.withBuild(artifactServerController); + } + else { + builder.withImage(STRIMZI_KC_IMAGE); } - StrimziKafkaConnectBuilder strimziBuilder = StrimziKafkaConnectBuilder.base(); - - OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer.Builder(strimziBuilder) - .withOcpClient(ocp) - .withHttpClient(new OkHttpClient()) - .withProject(project) - .withLoggingAndMetricsFromCfgMap(KAFKA_CONNECT_LOGGING) - .withConnectorResources(ConfigProperties.STRIMZI_OPERATOR_CONNECTORS) - .withKcBuild(ConfigProperties.STRIMZI_KC_BUILD) - .withOperatorController(operatorController) - .build(); + OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer( + project, builder, configMap, operatorController, ocp, new OkHttpClient()); OcpKafkaConnectController controller = connectDeployer.deploy(); controller.allowServiceAccess(); @@ -100,7 +107,7 @@ private void deployConnectCluster(StrimziOperatorController operatorController) store(KafkaConnectController.class, controller); } - private void deployArtifactServer() throws InterruptedException { + private OcpArtifactServerController deployArtifactServer() throws Exception { OcpArtifactServerDeployer deployer = new OcpArtifactServerDeployer.Builder() .withOcpClient(ocp) .withHttpClient(new OkHttpClient()) @@ -109,7 +116,7 @@ private void deployArtifactServer() throws InterruptedException { .withService(ARTIFACT_SERVER_SERVICE) .build(); - deployer.deploy(); + return deployer.deploy(); } private StrimziOperatorController updateKafkaOperator() {