DBZ-5095 Artifact server management and Fabric8 builder cleanup

This commit is contained in:
jcechace 2022-05-09 13:16:01 +02:00 committed by Jakub Cechacek
parent fd6be945b5
commit ffe2bcf911
11 changed files with 240 additions and 268 deletions

View File

@ -11,7 +11,7 @@ public interface Deployer<T> {
* Deploys resource
* @return Controller for deployed resource
*/
T deploy() throws InterruptedException;
T deploy() throws Exception;
interface Builder<B extends Builder<B, D>, D extends Deployer<?>> {
D build();

View File

@ -215,18 +215,25 @@ public void ensureHasPullSecret(Deployment deployment, String secret) {
* @param labels labels used to identify pods
* @return {@link PodList} of matching pods
*/
public PodList podsWithLabels(String project, Map<String, String> labels) {
public List<Pod> podsWithLabels(String project, Map<String, String> labels) {
Supplier<PodList> podListSupplier = () -> client.pods().inNamespace(project).withLabels(labels).list();
await().atMost(scaled(5), TimeUnit.MINUTES).until(() -> podListSupplier.get().getItems().size() > 0);
PodList pods = podListSupplier.get();
List<Pod> pods = podListSupplier.get().getItems();
if (pods.getItems().isEmpty()) {
if (pods.isEmpty()) {
LOGGER.warn("Empty PodList");
}
return pods;
}
public List<Pod> podsForDeployment(Deployment deployment) {
String project = deployment.getMetadata().getNamespace();
String name = deployment.getMetadata().getName();
return podsWithLabels(project, Map.of("deployment", name));
}
/**
* Waits until all pods with given labels are ready
* @param project project where to look for pods
@ -236,9 +243,9 @@ public void waitForPods(String project, Map<String, String> labels) {
String lbls = labels.keySet().stream().map(k -> k + "=" + labels.get(k)).collect(Collectors.joining(", "));
LOGGER.info("Waiting for pods to deploy [" + lbls + "]");
PodList pods = podsWithLabels(project, labels);
List<Pod> pods = podsWithLabels(project, labels);
for (Pod p : pods.getItems()) {
for (Pod p : pods) {
client.resource(p).waitUntilReady(scaled(5), TimeUnit.MINUTES);
}
}

View File

@ -5,9 +5,25 @@
*/
package io.debezium.testing.system.tools.artifacts;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import io.debezium.testing.system.tools.OpenShiftUtils;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.model.connect.build.*;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
@ -15,22 +31,95 @@
public class OcpArtifactServerController {
private final Deployment deployment;
private final String project;
private final Service service;
private final OpenShiftClient ocp;
private final OkHttpClient http;
public OcpArtifactServerController(Deployment deployment, Service service, OpenShiftClient ocp, OkHttpClient http) {
private final Map<String, HttpUrl> artifacts;
private final OpenShiftUtils ocpUtils;
public OcpArtifactServerController(Deployment deployment, Service service, OpenShiftClient ocp, OkHttpClient http) throws IOException {
this.deployment = deployment;
this.project = deployment.getMetadata().getNamespace();
this.service = service;
this.ocp = ocp;
this.ocpUtils = new OpenShiftUtils(ocp);
this.http = http;
this.artifacts = listArtifacts();
}
public HttpUrl getBaseUrl() {
return new HttpUrl.Builder()
.scheme("http")
return new HttpUrl.Builder().scheme("http")
.host(service.getMetadata().getName() + "." + service.getMetadata().getNamespace() + ".svc.cluster.local")
.port(8080)
.build();
}
private HttpUrl createArtifactUrl(String link) {
return getBaseUrl().resolve(link);
}
public Optional<HttpUrl> geArtifactUrl(String name) {
return Optional.ofNullable(artifacts.get(name));
}
public Optional<String> getArtifactUrlAsString(String name) {
return geArtifactUrl(name).map(HttpUrl::toString);
}
private Artifact createArtifact(String url) {
Objects.requireNonNull(url);
String type = url.substring(url.lastIndexOf('.') + 1);
switch (type) {
case "zip":
return new ZipArtifactBuilder().withUrl(url).build();
case "jar":
return new JarArtifactBuilder().withUrl(url).build();
default:
throw new IllegalStateException("Unsupported artifact type: " + type);
}
}
public Plugin createPlugin(String name, List<String> artifacts) {
List<Artifact> pluginArtifacts = artifacts.stream()
.map(this::getArtifactUrlAsString)
.map(a -> a.orElseThrow(() -> new IllegalStateException("Missing artifact for plugin'" + name + "'")))
.map(this::createArtifact)
.collect(toList());
return new PluginBuilder().withName(name).withArtifacts(pluginArtifacts).build();
}
public Plugin createDebeziumPlugin(String database) {
return createDebeziumPlugin(database, List.of());
}
public Plugin createDebeziumPlugin(String database, List<String> extraArtifacts) {
List<String> commonArtifacts = List.of("debezium-connector-" + database, "debezium-scripting", "connect-converter");
List<String> artifacts = Stream.concat(commonArtifacts.stream(), extraArtifacts.stream()).collect(toList());
return createPlugin("debezium-connector-" + database, artifacts);
}
public List<String> readArtifactListing() throws IOException {
Pod pod = ocpUtils.podsForDeployment(deployment).get(0);
try (InputStream is = ocp.pods()
.inNamespace(project)
.withName(pod.getMetadata().getName())
.inContainer("debezium-artifact-server")
.file("/opt/plugins/artifacts.txt")
.read()) {
return new BufferedReader(new InputStreamReader(is)).lines().collect(toList());
}
}
public Map<String, HttpUrl> listArtifacts() throws IOException {
List<String> listing = readArtifactListing();
return listing.stream().map(l -> l.split("::", 2)).collect(toMap(e -> e[0], e -> createArtifactUrl(e[1])));
}
}

View File

@ -73,7 +73,7 @@ public OcpArtifactServerDeployer(
}
@Override
public OcpArtifactServerController deploy() throws InterruptedException {
public OcpArtifactServerController deploy() throws Exception {
LOGGER.info("Deploying debezium artifact server");
deployment = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment);

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.kafka.builders.kafka;
package io.debezium.testing.system.tools.fabric8;
import java.util.function.Consumer;
@ -11,14 +11,14 @@
* Wraps Strimzi builder in order to provide convenience methods for Debezium configuration
*
* @param <I> type of the child wrapper instance
* @param <B> type of strimzi builder
* @param <R> type of resource build by strimzi builder
* @param <B> type of Fabric8 builder
* @param <R> type of resource build by Faric8 builder
*/
public abstract class StrimziBuilderWrapper<I extends StrimziBuilderWrapper<I, B, R>, B, R> {
public abstract class FabricBuilderWrapper<I extends FabricBuilderWrapper<I, B, R>, B, R> {
protected B builder;
protected StrimziBuilderWrapper(B builder) {
protected FabricBuilderWrapper(B builder) {
this.builder = builder;
}

View File

@ -5,17 +5,13 @@
*/
package io.debezium.testing.system.tools.kafka;
import static io.debezium.testing.system.tools.ConfigProperties.STRIMZI_KC_IMAGE;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.AbstractOcpDeployer;
import io.debezium.testing.system.tools.Deployer;
import io.debezium.testing.system.tools.YAML;
import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaConnectBuilder;
import io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
@ -34,98 +30,21 @@
*/
public class OcpKafkaConnectDeployer extends AbstractOcpDeployer<OcpKafkaConnectController> {
/**
* Builder for {@link OcpKafkaConnectDeployer}
*/
public static class Builder implements Deployer.Builder<Builder, OcpKafkaConnectDeployer> {
private String project;
private OpenShiftClient ocpClient;
private OkHttpClient httpClient;
private ConfigMap configMap;
private StrimziOperatorController operatorController;
private final StrimziKafkaConnectBuilder strimziBuilder;
public Builder(StrimziKafkaConnectBuilder strimziBuilder) {
this.strimziBuilder = strimziBuilder;
}
public OcpKafkaConnectDeployer.Builder withProject(String project) {
this.project = project;
return this;
}
public OcpKafkaConnectDeployer.Builder withOcpClient(OpenShiftClient ocpClient) {
this.ocpClient = ocpClient;
return this;
}
public OcpKafkaConnectDeployer.Builder withHttpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
return this;
}
public OcpKafkaConnectDeployer.Builder withLoggingAndMetricsFromCfgMap(String cfgYamlPath) {
this.configMap = YAML.fromResource(cfgYamlPath, ConfigMap.class);
strimziBuilder
.withLoggingFromConfigMap(configMap)
.withMetricsFromConfigMap(configMap);
return this;
}
public OcpKafkaConnectDeployer.Builder withConnectorResources(boolean connectorResources) {
if (connectorResources) {
strimziBuilder.withConnectorResources();
}
return this;
}
public OcpKafkaConnectDeployer.Builder withOperatorController(StrimziOperatorController operatorController) {
this.operatorController = operatorController;
operatorController.getPullSecretName().ifPresent(strimziBuilder::withPullSecret);
return this;
}
@Override
public OcpKafkaConnectDeployer build() {
return new OcpKafkaConnectDeployer(
project,
strimziBuilder,
configMap,
operatorController,
ocpClient,
httpClient);
}
public Builder withKcBuild(boolean kcBuild) {
if (kcBuild) {
strimziBuilder
.withBuild()
.withStandardPlugins();
}
else {
strimziBuilder.withImage(STRIMZI_KC_IMAGE);
}
return this;
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaConnectDeployer.class);
private final StrimziKafkaConnectBuilder strimziBuilder;
private final FabricKafkaConnectBuilder fabricBuilder;
private final ConfigMap configMap;
private final StrimziOperatorController operatorController;
private OcpKafkaConnectDeployer(
String project,
StrimziKafkaConnectBuilder strimziBuilder,
ConfigMap configMap,
StrimziOperatorController operatorController,
OpenShiftClient ocp,
OkHttpClient http) {
public OcpKafkaConnectDeployer(
String project,
FabricKafkaConnectBuilder fabricBuilder,
ConfigMap configMap,
StrimziOperatorController operatorController,
OpenShiftClient ocp,
OkHttpClient http) {
super(project, ocp, http);
this.strimziBuilder = strimziBuilder;
this.fabricBuilder = fabricBuilder;
this.configMap = configMap;
this.operatorController = operatorController;
}
@ -142,11 +61,11 @@ public OcpKafkaConnectController deploy() throws InterruptedException {
deployConfigMap();
}
if (strimziBuilder.hasBuild()) {
if (fabricBuilder.hasBuild()) {
deployImageStream();
}
KafkaConnect kafkaConnect = strimziBuilder.build();
KafkaConnect kafkaConnect = fabricBuilder.build();
kafkaConnect = kafkaConnectOperation().createOrReplace(kafkaConnect);
OcpKafkaConnectController controller = new OcpKafkaConnectController(
@ -165,12 +84,12 @@ private void deployConfigMap() {
}
private void deployImageStream() {
Optional<String> imageStream = strimziBuilder.imageStream();
Optional<String> imageStream = fabricBuilder.imageStream();
if (!imageStream.isPresent()) {
throw new IllegalStateException("Image stream missing");
}
String[] image = strimziBuilder.imageStream().get().split(":", 2);
String[] image = fabricBuilder.imageStream().get().split(":", 2);
ImageStream is = new ImageStreamBuilder()
.withNewMetadata().withName(image[0]).endMetadata()

View File

@ -64,6 +64,16 @@ public String getBootstrapAddress() {
return name + "-kafka-bootstrap." + project + ".svc.cluster.local:9092";
}
/**
* Returns bootstrap to be used by KC.
* The address is local.
*
* @return bootstrap
*/
public String getLocalBootstrapAddress() {
return name + "-kafka-bootstrap:9093";
}
/**
* Deploy kafka topic from given CR
*

View File

@ -9,8 +9,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.testing.system.tools.AbstractOcpDeployer;
import io.debezium.testing.system.tools.Deployer;
import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaBuilder;
import io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.client.OpenShiftClient;
@ -26,55 +25,13 @@
*/
public final class OcpKafkaDeployer extends AbstractOcpDeployer<OcpKafkaController> {
/**
* Builder for {@link OcpKafkaDeployer}
*/
public static class Builder implements Deployer.Builder<Builder, OcpKafkaDeployer> {
private String project;
private OpenShiftClient ocpClient;
private OkHttpClient httpClient;
private StrimziOperatorController operatorController;
private final StrimziKafkaBuilder strimziBuilder;
public Builder(StrimziKafkaBuilder strimziBuilder) {
this.strimziBuilder = strimziBuilder;
}
public Builder withProject(String project) {
this.project = project;
return this;
}
public Builder withOcpClient(OpenShiftClient ocpClient) {
this.ocpClient = ocpClient;
return this;
}
public Builder withHttpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
return this;
}
public Builder withOperatorController(StrimziOperatorController operatorController) {
this.operatorController = operatorController;
operatorController.getPullSecretName().ifPresent(strimziBuilder::withPullSecret);
return this;
}
@Override
public OcpKafkaDeployer build() {
return new OcpKafkaDeployer(project, strimziBuilder, operatorController, ocpClient, httpClient);
}
}
private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaDeployer.class);
private final StrimziKafkaBuilder strimziBuilder;
private final FabricKafkaBuilder strimziBuilder;
private final StrimziOperatorController operatorController;
private OcpKafkaDeployer(String project, StrimziKafkaBuilder strimziBuilder, StrimziOperatorController operatorController,
OpenShiftClient ocp, OkHttpClient http) {
public OcpKafkaDeployer(String project, FabricKafkaBuilder strimziBuilder, StrimziOperatorController operatorController,
OpenShiftClient ocp, OkHttpClient http) {
super(project, ocp, http);
this.strimziBuilder = strimziBuilder;
this.operatorController = operatorController;

View File

@ -3,15 +3,14 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.kafka.builders.kafka;
package io.debezium.testing.system.tools.kafka.builders;
import static io.debezium.testing.system.tools.ConfigProperties.STRIMZI_VERSION_KAFKA;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper;
import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.EntityOperatorSpec;
import io.strimzi.api.kafka.model.EntityOperatorSpecBuilder;
import io.strimzi.api.kafka.model.EntityTopicOperatorSpec;
@ -32,10 +31,10 @@
/**
* This class simplifies building of kafka by providing default configuration for whole kafka or parts of its definition
*/
public final class StrimziKafkaBuilder extends StrimziBuilderWrapper<StrimziKafkaBuilder, KafkaBuilder, Kafka> {
public final class FabricKafkaBuilder extends FabricBuilderWrapper<FabricKafkaBuilder, KafkaBuilder, Kafka> {
public static String DEFAULT_KAFKA_NAME = "debezium-kafka-cluster";
private StrimziKafkaBuilder(KafkaBuilder kafkaBuilder) {
private FabricKafkaBuilder(KafkaBuilder kafkaBuilder) {
super(kafkaBuilder);
}
@ -44,7 +43,7 @@ public Kafka build() {
return builder.build();
}
public static StrimziKafkaBuilder base() {
public static FabricKafkaBuilder base() {
KafkaClusterSpec kafka = defaultKafkaSpec();
ZookeeperClusterSpec zookeeper = defaultKafkaZookeeperSpec();
EntityOperatorSpec entityOperator = defaultKafkaEntityOperatorSpec();
@ -59,10 +58,17 @@ public static StrimziKafkaBuilder base() {
.withEntityOperator(entityOperator)
.endSpec();
return new StrimziKafkaBuilder(builder);
return new FabricKafkaBuilder(builder);
}
public StrimziKafkaBuilder withPullSecret(String pullSecretName) {
public FabricKafkaBuilder withPullSecret(Optional<Secret> maybePullSecret) {
maybePullSecret
.map(s -> s.getMetadata().getName())
.ifPresent(this::withPullSecret);
return self();
}
public FabricKafkaBuilder withPullSecret(String pullSecretName) {
PodTemplate podTemplate = new PodTemplateBuilder().addNewImagePullSecret(pullSecretName).build();
builder

View File

@ -3,36 +3,24 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tools.kafka.builders.kafka;
package io.debezium.testing.system.tools.kafka.builders;
import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_APC_URL;
import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_DB2_DRIVER_VERSION;
import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_DBZ_VERSION;
import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_ORACLE_DRIVER_VERSION;
import static io.debezium.testing.system.tools.ConfigProperties.ARTIFACT_SERVER_URL;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.*;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController;
import io.debezium.testing.system.tools.fabric8.FabricBuilderWrapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapKeySelector;
import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.Secret;
import io.strimzi.api.kafka.model.CertSecretSourceBuilder;
import io.strimzi.api.kafka.model.ClientTls;
import io.strimzi.api.kafka.model.ClientTlsBuilder;
import io.strimzi.api.kafka.model.ContainerEnvVarBuilder;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnectBuilder;
import io.strimzi.api.kafka.model.connect.build.JarArtifactBuilder;
import io.strimzi.api.kafka.model.connect.build.Plugin;
import io.strimzi.api.kafka.model.connect.build.PluginBuilder;
import io.strimzi.api.kafka.model.connect.build.ZipArtifactBuilder;
import io.strimzi.api.kafka.model.template.ContainerTemplateBuilder;
import io.strimzi.api.kafka.model.template.KafkaConnectTemplate;
import io.strimzi.api.kafka.model.template.KafkaConnectTemplateBuilder;
@ -40,11 +28,12 @@
/**
* This class simplifies building of kafkaConnect by providing pre-made configurations for whole kafkaConnect or parts of its definition
*/
public class StrimziKafkaConnectBuilder extends StrimziBuilderWrapper<StrimziKafkaConnectBuilder, KafkaConnectBuilder, KafkaConnect> {
public class FabricKafkaConnectBuilder extends
FabricBuilderWrapper<FabricKafkaConnectBuilder, KafkaConnectBuilder, KafkaConnect> {
public static String DEFAULT_KC_NAME = "debezium-kafka-connect-cluster";
public static String DEFAULT_BOOSTRAP_SERVER = StrimziKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093";
public static String DEFAULT_BOOSTRAP_SERVER = FabricKafkaBuilder.DEFAULT_KAFKA_NAME + "-kafka-bootstrap:9093";
protected StrimziKafkaConnectBuilder(KafkaConnectBuilder builder) {
protected FabricKafkaConnectBuilder(KafkaConnectBuilder builder) {
super(builder);
}
@ -65,7 +54,7 @@ public Optional<String> imageStream() {
return Optional.of(image);
}
public static StrimziKafkaConnectBuilder base() {
public static FabricKafkaConnectBuilder base(String bootstrap) {
Map<String, Object> config = defaultConfig();
KafkaConnectTemplate template = defaultTemplate();
ClientTls tls = defaultTLS();
@ -75,35 +64,56 @@ public static StrimziKafkaConnectBuilder base() {
.withName(DEFAULT_KC_NAME)
.endMetadata()
.withNewSpec()
.withBootstrapServers(DEFAULT_BOOSTRAP_SERVER)
.withBootstrapServers(bootstrap)
.withTemplate(template)
.withConfig(config)
.withReplicas(1)
.withTls(tls)
.endSpec();
return new StrimziKafkaConnectBuilder(builder);
return new FabricKafkaConnectBuilder(builder);
}
public StrimziKafkaConnectBuilder withImage(String image) {
public FabricKafkaConnectBuilder withImage(String image) {
builder.editSpec().withImage(image).endSpec();
return self();
}
public StrimziKafkaConnectBuilder withBuild() {
public FabricKafkaConnectBuilder withBuild(OcpArtifactServerController artifactServer) {
List<Plugin> plugins = new ArrayList<>(List.of(
artifactServer.createDebeziumPlugin("mysql"),
artifactServer.createDebeziumPlugin("postgres"),
artifactServer.createDebeziumPlugin("mongodb"),
artifactServer.createDebeziumPlugin("sqlserver"),
artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc"))));
if (ConfigProperties.DATABASE_ORACLE) {
plugins.add(
artifactServer.createDebeziumPlugin("oracle", List.of("jdbc/ojdbc8")));
}
return withBuild(plugins);
}
public FabricKafkaConnectBuilder withBuild(List<Plugin> plugins) {
builder
.editSpec()
.withNewBuild()
.withNewImageStreamOutput()
.withImage("testing-openshift-connect:latest")
.endImageStreamOutput()
.withPlugins(plugins)
.endBuild()
.endSpec();
return self();
}
public StrimziKafkaConnectBuilder withConnectorResources() {
public FabricKafkaConnectBuilder withConnectorResources(Boolean enabled) {
return enabled ? withConnectorResources() : self();
}
public FabricKafkaConnectBuilder withConnectorResources() {
builder
.editMetadata()
.addToAnnotations("strimzi.io/use-connector-resources", "true")
@ -111,31 +121,14 @@ public StrimziKafkaConnectBuilder withConnectorResources() {
return self();
}
public StrimziKafkaConnectBuilder withStandardPlugins() {
Map<String, PluginBuilder> pluginBuilders = Stream.of("mysql", "postgres", "mongodb", "sqlserver", "db2", "oracle")
.collect(toMap(identity(), StrimziKafkaConnectBuilder::prepareStandardPluginBuilder));
pluginBuilders.get("db2")
.addToArtifacts(new JarArtifactBuilder()
.withUrl(String.format("%s/jdbc/jcc-%s.jar", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_DB2_DRIVER_VERSION))
.build())
.build();
pluginBuilders.get("oracle")
.addToArtifacts(new JarArtifactBuilder()
.withUrl(String.format("%s/jdbc/ojdbc8-%s.jar", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_ORACLE_DRIVER_VERSION))
.build())
.build();
List<Plugin> plugins = pluginBuilders.values().stream()
.map(PluginBuilder::build)
.collect(Collectors.toList());
builder.editSpec().editBuild().withPlugins(plugins).endBuild().endSpec();
public FabricKafkaConnectBuilder withPullSecret(Optional<Secret> maybePullSecret) {
maybePullSecret
.map(s -> s.getMetadata().getName())
.ifPresent(this::withPullSecret);
return self();
}
public StrimziKafkaConnectBuilder withPullSecret(String pullSecretName) {
public FabricKafkaConnectBuilder withPullSecret(String pullSecretName) {
if (builder.editSpec().hasImage()) {
builder
.editSpec()
@ -161,7 +154,7 @@ public StrimziKafkaConnectBuilder withPullSecret(String pullSecretName) {
return self();
}
public StrimziKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) {
public FabricKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap) {
ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder()
.withKey("log4j.properties")
.withName(configMap.getMetadata().getName())
@ -180,7 +173,7 @@ public StrimziKafkaConnectBuilder withLoggingFromConfigMap(ConfigMap configMap)
}
public StrimziKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) {
public FabricKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap) {
ConfigMapKeySelector configMapKeySelector = new ConfigMapKeySelectorBuilder()
.withKey("metrics")
.withName(configMap.getMetadata().getName())
@ -198,22 +191,6 @@ public StrimziKafkaConnectBuilder withMetricsFromConfigMap(ConfigMap configMap)
return self();
}
private static PluginBuilder prepareStandardPluginBuilder(String dbName) {
return new PluginBuilder()
.withName("debezium-connector-" + dbName)
.withArtifacts(
new ZipArtifactBuilder()
.withUrl(String.format("%s/debezium-connector-%s-%s-plugin.zip", ARTIFACT_SERVER_URL, dbName,
ARTIFACT_SERVER_DBZ_VERSION))
.build(),
new ZipArtifactBuilder()
.withUrl(ARTIFACT_SERVER_APC_URL)
.build(),
new ZipArtifactBuilder()
.withUrl(String.format("%s/debezium-scripting-%s.zip", ARTIFACT_SERVER_URL, ARTIFACT_SERVER_DBZ_VERSION))
.build());
}
private static KafkaConnectTemplate defaultTemplate() {
return new KafkaConnectTemplateBuilder().withConnectContainer(new ContainerTemplateBuilder()
.withEnv(new ContainerEnvVarBuilder()

View File

@ -5,12 +5,16 @@
*/
package io.debezium.testing.system.fixtures.kafka;
import static io.debezium.testing.system.tools.ConfigProperties.*;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.assertions.PlainKafkaAssertions;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.YAML;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerController;
import io.debezium.testing.system.tools.artifacts.OcpArtifactServerDeployer;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
@ -19,8 +23,9 @@
import io.debezium.testing.system.tools.kafka.OcpKafkaController;
import io.debezium.testing.system.tools.kafka.OcpKafkaDeployer;
import io.debezium.testing.system.tools.kafka.StrimziOperatorController;
import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaBuilder;
import io.debezium.testing.system.tools.kafka.builders.kafka.StrimziKafkaConnectBuilder;
import io.debezium.testing.system.tools.kafka.builders.FabricKafkaBuilder;
import io.debezium.testing.system.tools.kafka.builders.FabricKafkaConnectBuilder;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.openshift.client.OpenShiftClient;
import fixture5.TestFixture;
@ -32,6 +37,7 @@ public class OcpKafka extends TestFixture {
private final OpenShiftClient ocp;
private final String project;
// Kafka resources
String KAFKA_CONNECT_LOGGING = "/kafka-resources/020-kafka-connect-cfg.yaml";
// Artifact Server resources
@ -45,16 +51,11 @@ public OcpKafka(@NotNull ExtensionContext.Store store) {
}
@Override
public void setup() {
try {
StrimziOperatorController operatorController = updateKafkaOperator();
public void setup() throws Exception {
StrimziOperatorController operatorController = updateKafkaOperator();
deployKafkaCluster(operatorController);
deployConnectCluster(operatorController);
}
catch (Exception e) {
throw new IllegalStateException("Error while setting up Kafka", e);
}
OcpKafkaController kafkaController = deployKafkaCluster(operatorController);
deployConnectCluster(operatorController, kafkaController);
}
@Override
@ -62,35 +63,41 @@ public void teardown() {
// no-op: kafka is reused across tests
}
private void deployKafkaCluster(StrimziOperatorController operatorController) throws Exception {
OcpKafkaDeployer kafkaDeployer = new OcpKafkaDeployer.Builder(StrimziKafkaBuilder.base())
.withOcpClient(ocp)
.withHttpClient(new OkHttpClient())
.withProject(project)
.withOperatorController(operatorController)
.build();
private OcpKafkaController deployKafkaCluster(StrimziOperatorController operatorController) throws Exception {
FabricKafkaBuilder builder = FabricKafkaBuilder
.base()
.withPullSecret(operatorController.getPullSecret());
OcpKafkaDeployer kafkaDeployer = new OcpKafkaDeployer(
project, builder, operatorController, ocp, new OkHttpClient());
OcpKafkaController controller = kafkaDeployer.deploy();
store(KafkaController.class, controller);
store(KafkaAssertions.class, new PlainKafkaAssertions(controller.getDefaultConsumerProperties()));
return controller;
}
private void deployConnectCluster(StrimziOperatorController operatorController) throws InterruptedException {
if (ConfigProperties.STRIMZI_KC_BUILD) {
deployArtifactServer();
private void deployConnectCluster(StrimziOperatorController operatorController, OcpKafkaController kafkaController) throws Exception {
ConfigMap configMap = YAML.fromResource(KAFKA_CONNECT_LOGGING, ConfigMap.class);
FabricKafkaConnectBuilder builder = FabricKafkaConnectBuilder
.base(kafkaController.getLocalBootstrapAddress())
.withLoggingFromConfigMap(configMap)
.withMetricsFromConfigMap(configMap)
.withConnectorResources(STRIMZI_OPERATOR_CONNECTORS)
.withPullSecret(operatorController.getPullSecret());
if (STRIMZI_KC_BUILD) {
OcpArtifactServerController artifactServerController = deployArtifactServer();
builder.withBuild(artifactServerController);
}
else {
builder.withImage(STRIMZI_KC_IMAGE);
}
StrimziKafkaConnectBuilder strimziBuilder = StrimziKafkaConnectBuilder.base();
OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer.Builder(strimziBuilder)
.withOcpClient(ocp)
.withHttpClient(new OkHttpClient())
.withProject(project)
.withLoggingAndMetricsFromCfgMap(KAFKA_CONNECT_LOGGING)
.withConnectorResources(ConfigProperties.STRIMZI_OPERATOR_CONNECTORS)
.withKcBuild(ConfigProperties.STRIMZI_KC_BUILD)
.withOperatorController(operatorController)
.build();
OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer(
project, builder, configMap, operatorController, ocp, new OkHttpClient());
OcpKafkaConnectController controller = connectDeployer.deploy();
controller.allowServiceAccess();
@ -100,7 +107,7 @@ private void deployConnectCluster(StrimziOperatorController operatorController)
store(KafkaConnectController.class, controller);
}
private void deployArtifactServer() throws InterruptedException {
private OcpArtifactServerController deployArtifactServer() throws Exception {
OcpArtifactServerDeployer deployer = new OcpArtifactServerDeployer.Builder()
.withOcpClient(ocp)
.withHttpClient(new OkHttpClient())
@ -109,7 +116,7 @@ private void deployArtifactServer() throws InterruptedException {
.withService(ARTIFACT_SERVER_SERVICE)
.build();
deployer.deploy();
return deployer.deploy();
}
private StrimziOperatorController updateKafkaOperator() {