DBZ-4830 Initial draft of dynamic Kafka resource definitions
Co-authored-by: Martin Medek <mmedek@redhat.com>
This commit is contained in:
parent
95ed1111e2
commit
ea24d85234
@ -39,7 +39,6 @@ public class OcpKafkaConnectDeployer extends AbstractOcpDeployer<OcpKafkaConnect
|
||||
public static class Builder implements Deployer.Builder<Builder, OcpKafkaConnectDeployer> {
|
||||
|
||||
private String project;
|
||||
private String yamlPath;
|
||||
private OpenShiftClient ocpClient;
|
||||
private OkHttpClient httpClient;
|
||||
private String cfgYamlPath;
|
||||
@ -47,6 +46,7 @@ public static class Builder implements Deployer.Builder<Builder, OcpKafkaConnect
|
||||
private boolean exposedMetrics;
|
||||
private boolean exposedApi;
|
||||
private StrimziOperatorController operatorController;
|
||||
private KafkaConnectBuilder kafkaConnectBuilder;
|
||||
|
||||
public OcpKafkaConnectDeployer.Builder withProject(String project) {
|
||||
this.project = project;
|
||||
@ -63,8 +63,8 @@ public OcpKafkaConnectDeployer.Builder withHttpClient(OkHttpClient httpClient) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public OcpKafkaConnectDeployer.Builder withYamlPath(String yamlPath) {
|
||||
this.yamlPath = yamlPath;
|
||||
public OcpKafkaConnectDeployer.Builder withKafkaConnectBuilder(KafkaConnectBuilder ocpKafkaConnectBuilder) {
|
||||
this.kafkaConnectBuilder = ocpKafkaConnectBuilder;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -97,7 +97,7 @@ public OcpKafkaConnectDeployer.Builder withOperatorController(StrimziOperatorCon
|
||||
public OcpKafkaConnectDeployer build() {
|
||||
return new OcpKafkaConnectDeployer(
|
||||
project,
|
||||
yamlPath,
|
||||
kafkaConnectBuilder,
|
||||
cfgYamlPath,
|
||||
connectorResources,
|
||||
operatorController,
|
||||
@ -105,12 +105,11 @@ public OcpKafkaConnectDeployer build() {
|
||||
exposedMetrics,
|
||||
ocpClient, httpClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaConnectDeployer.class);
|
||||
|
||||
private final String yamlPath;
|
||||
private final KafkaConnectBuilder kafkaConnectBuilder;
|
||||
private final String cfgYamlPath;
|
||||
private final boolean connectorResources;
|
||||
private final StrimziOperatorController operatorController;
|
||||
@ -120,7 +119,7 @@ public OcpKafkaConnectDeployer build() {
|
||||
|
||||
private OcpKafkaConnectDeployer(
|
||||
String project,
|
||||
String yamlPath,
|
||||
KafkaConnectBuilder kafkaConnectBuilder,
|
||||
String cfgYamlPath,
|
||||
boolean connectorResources,
|
||||
StrimziOperatorController operatorController,
|
||||
@ -129,7 +128,7 @@ private OcpKafkaConnectDeployer(
|
||||
OpenShiftClient ocp,
|
||||
OkHttpClient http) {
|
||||
super(project, ocp, http);
|
||||
this.yamlPath = yamlPath;
|
||||
this.kafkaConnectBuilder = kafkaConnectBuilder;
|
||||
this.cfgYamlPath = cfgYamlPath;
|
||||
this.connectorResources = connectorResources;
|
||||
this.operatorController = operatorController;
|
||||
@ -144,29 +143,26 @@ private OcpKafkaConnectDeployer(
|
||||
*/
|
||||
@Override
|
||||
public OcpKafkaConnectController deploy() throws InterruptedException {
|
||||
LOGGER.info("Deploying KafkaConnect from " + yamlPath);
|
||||
|
||||
KafkaConnect kafkaConnect = YAML.fromResource(yamlPath, KafkaConnect.class);
|
||||
Build kcBuild = kafkaConnect.getSpec().getBuild();
|
||||
KafkaConnectBuilder kcBuilder = new KafkaConnectBuilder(kafkaConnect);
|
||||
LOGGER.info("Deploying KafkaConnect");
|
||||
Build kcBuild = kafkaConnectBuilder.buildSpec().getBuild();
|
||||
|
||||
if (cfgYamlPath != null) {
|
||||
deployConfigMap();
|
||||
}
|
||||
|
||||
if (connectorResources) {
|
||||
configureConnectorResources(kcBuilder);
|
||||
configureConnectorResources(kafkaConnectBuilder);
|
||||
}
|
||||
|
||||
if (pullSecretName != null) {
|
||||
configurePullSecret(kcBuilder, kcBuild);
|
||||
configurePullSecret(kafkaConnectBuilder, kcBuild);
|
||||
}
|
||||
|
||||
if (kcBuild != null && "imagestream".equals(kcBuild.getOutput().getType())) {
|
||||
deployImageStream(kcBuild);
|
||||
}
|
||||
|
||||
kafkaConnect = kcBuilder.build();
|
||||
KafkaConnect kafkaConnect = kafkaConnectBuilder.build();
|
||||
kafkaConnect = kafkaConnectOperation().createOrReplace(kafkaConnect);
|
||||
|
||||
OcpKafkaConnectController controller = new OcpKafkaConnectController(
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
import io.debezium.testing.system.tools.AbstractOcpDeployer;
|
||||
import io.debezium.testing.system.tools.Deployer;
|
||||
import io.debezium.testing.system.tools.YAML;
|
||||
import io.debezium.testing.system.tools.kafka.builders.kafka.OcpKafkaBuilderFactory;
|
||||
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
|
||||
import io.fabric8.kubernetes.client.dsl.Resource;
|
||||
import io.fabric8.openshift.client.OpenShiftClient;
|
||||
@ -35,7 +35,6 @@ public final class OcpKafkaDeployer extends AbstractOcpDeployer<OcpKafkaControll
|
||||
public static class Builder implements Deployer.Builder<Builder, OcpKafkaDeployer> {
|
||||
|
||||
private String project;
|
||||
private String yamlPath;
|
||||
private OpenShiftClient ocpClient;
|
||||
private OkHttpClient httpClient;
|
||||
private StrimziOperatorController operatorController;
|
||||
@ -55,11 +54,6 @@ public Builder withHttpClient(OkHttpClient httpClient) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withYamlPath(String yamlPath) {
|
||||
this.yamlPath = yamlPath;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withOperatorController(StrimziOperatorController operatorController) {
|
||||
this.operatorController = operatorController;
|
||||
return this;
|
||||
@ -67,20 +61,18 @@ public Builder withOperatorController(StrimziOperatorController operatorControll
|
||||
|
||||
@Override
|
||||
public OcpKafkaDeployer build() {
|
||||
return new OcpKafkaDeployer(project, yamlPath, operatorController, ocpClient, httpClient);
|
||||
return new OcpKafkaDeployer(project, operatorController, ocpClient, httpClient);
|
||||
}
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(OcpKafkaDeployer.class);
|
||||
|
||||
private final String yamlPath;
|
||||
private final String pullSecretName;
|
||||
private final StrimziOperatorController operatorController;
|
||||
|
||||
private OcpKafkaDeployer(String project, String yamlPath, StrimziOperatorController operatorController,
|
||||
private OcpKafkaDeployer(String project, StrimziOperatorController operatorController,
|
||||
OpenShiftClient ocp, OkHttpClient http) {
|
||||
super(project, ocp, http);
|
||||
this.yamlPath = yamlPath;
|
||||
this.operatorController = operatorController;
|
||||
this.pullSecretName = this.operatorController.getPullSecretName();
|
||||
}
|
||||
@ -92,15 +84,14 @@ private OcpKafkaDeployer(String project, String yamlPath, StrimziOperatorControl
|
||||
*/
|
||||
@Override
|
||||
public OcpKafkaController deploy() throws InterruptedException {
|
||||
LOGGER.info("Deploying Kafka from " + yamlPath);
|
||||
Kafka kafka = YAML.fromResource(yamlPath, Kafka.class);
|
||||
KafkaBuilder builder = new KafkaBuilder(kafka);
|
||||
LOGGER.info("Deploying Kafka with default config");
|
||||
KafkaBuilder builder = OcpKafkaBuilderFactory.createDefaultConfig();
|
||||
|
||||
if (pullSecretName != null) {
|
||||
configurePullSecret(builder);
|
||||
}
|
||||
|
||||
kafka = kafkaOperation().createOrReplace(builder.build());
|
||||
Kafka kafka = kafkaOperation().createOrReplace(builder.build());
|
||||
|
||||
OcpKafkaController controller = new OcpKafkaController(kafka, operatorController, ocp);
|
||||
controller.waitForCluster();
|
||||
|
@ -0,0 +1,11 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.builders;
|
||||
|
||||
public interface OcpKafkaConstants {
|
||||
String DEFAULT_API_VERSION = "kafka.strimzi.io/v1beta2";
|
||||
String DEFAULT_KIND = "Kafka";
|
||||
}
|
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.builders.kafka;
|
||||
|
||||
import static io.debezium.testing.system.tools.kafka.builders.OcpKafkaConstants.DEFAULT_API_VERSION;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.ObjectMeta;
|
||||
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
|
||||
import io.strimzi.api.kafka.model.EntityOperatorSpec;
|
||||
import io.strimzi.api.kafka.model.EntityOperatorSpecBuilder;
|
||||
import io.strimzi.api.kafka.model.EntityTopicOperatorSpec;
|
||||
import io.strimzi.api.kafka.model.EntityUserOperatorSpec;
|
||||
import io.strimzi.api.kafka.model.KafkaBuilder;
|
||||
import io.strimzi.api.kafka.model.KafkaClusterSpec;
|
||||
import io.strimzi.api.kafka.model.KafkaClusterSpecBuilder;
|
||||
import io.strimzi.api.kafka.model.KafkaSpec;
|
||||
import io.strimzi.api.kafka.model.KafkaSpecBuilder;
|
||||
import io.strimzi.api.kafka.model.ZookeeperClusterSpec;
|
||||
import io.strimzi.api.kafka.model.ZookeeperClusterSpecBuilder;
|
||||
import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListener;
|
||||
import io.strimzi.api.kafka.model.listener.arraylistener.GenericKafkaListenerBuilder;
|
||||
import io.strimzi.api.kafka.model.listener.arraylistener.KafkaListenerType;
|
||||
import io.strimzi.api.kafka.model.storage.EphemeralStorage;
|
||||
|
||||
/**
|
||||
* This class simplifies building of kafka by providing default configuration for whole kafka or parts of its definition
|
||||
*/
|
||||
public class OcpKafkaBuilderFactory {
|
||||
public static String DEFAULT_KAFKA_METADATA_NAME = "debezium-kafka-cluster";
|
||||
public static String DEFAULT_KAFKA_VERSION = System.getProperty("version.strimzi.kafka");
|
||||
|
||||
public static KafkaBuilder createDefaultConfig() {
|
||||
return new KafkaBuilder()
|
||||
.withApiVersion(DEFAULT_API_VERSION)
|
||||
.withMetadata(getDefaultMeta())
|
||||
.withSpec(getDefaultKafkaSpec());
|
||||
}
|
||||
|
||||
private static ObjectMeta getDefaultMeta() {
|
||||
return new ObjectMetaBuilder()
|
||||
.withName(DEFAULT_KAFKA_METADATA_NAME)
|
||||
.withGeneration(4L)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static KafkaSpec getDefaultKafkaSpec() {
|
||||
return new KafkaSpecBuilder()
|
||||
.withEntityOperator(getDefaultEntityOperatorSpec())
|
||||
.withKafka(getDefaultKafkaClusterSpec())
|
||||
.withZookeeper(getDefaultZookeeper())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static EntityOperatorSpec getDefaultEntityOperatorSpec() {
|
||||
return new EntityOperatorSpecBuilder()
|
||||
.withTopicOperator(new EntityTopicOperatorSpec())
|
||||
.withUserOperator(new EntityUserOperatorSpec())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static List<GenericKafkaListener> getDefaultListeners() {
|
||||
return ImmutableList.of(new GenericKafkaListenerBuilder()
|
||||
.withName("plain")
|
||||
.withPort(9092)
|
||||
.withType(KafkaListenerType.INTERNAL)
|
||||
.build(),
|
||||
new GenericKafkaListenerBuilder()
|
||||
.withName("tls")
|
||||
.withPort(9093)
|
||||
.withType(KafkaListenerType.INTERNAL)
|
||||
.withTls(true)
|
||||
.build(),
|
||||
new GenericKafkaListenerBuilder()
|
||||
.withName("external")
|
||||
.withPort(9094)
|
||||
.withType(KafkaListenerType.LOADBALANCER)
|
||||
.build());
|
||||
}
|
||||
|
||||
private static Map<String, Object> getDefaultConfig() {
|
||||
return ImmutableMap.of("offsets.topic.replication.factor", 1,
|
||||
"transaction.state.log.replication.factor", 1,
|
||||
"transaction.state.log.min.isr", 1);
|
||||
}
|
||||
|
||||
private static KafkaClusterSpec getDefaultKafkaClusterSpec() {
|
||||
return new KafkaClusterSpecBuilder()
|
||||
.withConfig(getDefaultConfig())
|
||||
.withReplicas(1)
|
||||
.withVersion(DEFAULT_KAFKA_VERSION)
|
||||
.withListeners(getDefaultListeners())
|
||||
.withStorage(new EphemeralStorage())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ZookeeperClusterSpec getDefaultZookeeper() {
|
||||
return new ZookeeperClusterSpecBuilder()
|
||||
.withStorage(new EphemeralStorage())
|
||||
.withReplicas(1)
|
||||
.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.builders.kafkaconnect;
|
||||
|
||||
import static io.debezium.testing.system.tools.kafka.builders.OcpKafkaConstants.DEFAULT_API_VERSION;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.ObjectMeta;
|
||||
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
|
||||
import io.strimzi.api.kafka.model.KafkaConnectBuilder;
|
||||
import io.strimzi.api.kafka.model.KafkaConnectSpec;
|
||||
|
||||
/**
|
||||
* This class simplifies building of kafkaConnect by providing pre-made configurations for whole kafkaConnect or parts of its definition
|
||||
*/
|
||||
public class OcpKafkaConnectBuilderFactory {
|
||||
public static String DEFAULT_KAFKA_CONNECT_METADATA_NAME = "debezium-kafka-connect-cluster";
|
||||
public static String DEFAULT_KAFKA_CONNECT_VERSION = System.getProperty("version.strimzi.kafka");
|
||||
public static String DEFAULT_IMAGE = System.getProperty("image.kc");
|
||||
public static String DEFAULT_BOOSTRAP_SERVER = "debezium-kafka-cluster-kafka-bootstrap:9093";
|
||||
|
||||
public static KafkaConnectBuilder createNonKcBuildSetup() {
|
||||
return new KafkaConnectBuilder()
|
||||
.withApiVersion(DEFAULT_API_VERSION)
|
||||
.withMetadata(getDefaultMeta())
|
||||
.withSpec(getNonKcSpec());
|
||||
}
|
||||
|
||||
public static KafkaConnectBuilder createKcBuildSetup() {
|
||||
return new KafkaConnectBuilder()
|
||||
.withApiVersion(DEFAULT_API_VERSION)
|
||||
.withMetadata(getDefaultMeta())
|
||||
.withSpec(getKcSpec());
|
||||
}
|
||||
|
||||
private static ObjectMeta getDefaultMeta() {
|
||||
return new ObjectMetaBuilder()
|
||||
.withName(DEFAULT_KAFKA_CONNECT_METADATA_NAME)
|
||||
.build();
|
||||
}
|
||||
|
||||
private static KafkaConnectSpec getNonKcSpec() {
|
||||
return OcpKafkaConnectSpecBuilderFactory.createNonKcSetup()
|
||||
.build();
|
||||
}
|
||||
|
||||
private static KafkaConnectSpec getKcSpec() {
|
||||
return OcpKafkaConnectSpecBuilderFactory.createKcSetup()
|
||||
.build();
|
||||
}
|
||||
}
|
@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.testing.system.tools.kafka.builders.kafkaconnect;
|
||||
|
||||
import static io.debezium.testing.system.tools.kafka.builders.kafkaconnect.OcpKafkaConnectBuilderFactory.DEFAULT_BOOSTRAP_SERVER;
|
||||
import static io.debezium.testing.system.tools.kafka.builders.kafkaconnect.OcpKafkaConnectBuilderFactory.DEFAULT_IMAGE;
|
||||
import static io.debezium.testing.system.tools.kafka.builders.kafkaconnect.OcpKafkaConnectBuilderFactory.DEFAULT_KAFKA_CONNECT_VERSION;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
|
||||
|
||||
import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder;
|
||||
import io.strimzi.api.kafka.model.CertSecretSourceBuilder;
|
||||
import io.strimzi.api.kafka.model.ClientTls;
|
||||
import io.strimzi.api.kafka.model.ClientTlsBuilder;
|
||||
import io.strimzi.api.kafka.model.ContainerEnvVarBuilder;
|
||||
import io.strimzi.api.kafka.model.ExternalConfigurationReferenceBuilder;
|
||||
import io.strimzi.api.kafka.model.ExternalLogging;
|
||||
import io.strimzi.api.kafka.model.ExternalLoggingBuilder;
|
||||
import io.strimzi.api.kafka.model.JmxPrometheusExporterMetrics;
|
||||
import io.strimzi.api.kafka.model.JmxPrometheusExporterMetricsBuilder;
|
||||
import io.strimzi.api.kafka.model.KafkaConnectSpecBuilder;
|
||||
import io.strimzi.api.kafka.model.connect.build.BuildBuilder;
|
||||
import io.strimzi.api.kafka.model.connect.build.ImageStreamOutputBuilder;
|
||||
import io.strimzi.api.kafka.model.connect.build.JarArtifactBuilder;
|
||||
import io.strimzi.api.kafka.model.connect.build.Plugin;
|
||||
import io.strimzi.api.kafka.model.connect.build.PluginBuilder;
|
||||
import io.strimzi.api.kafka.model.connect.build.ZipArtifactBuilder;
|
||||
import io.strimzi.api.kafka.model.template.ContainerTemplateBuilder;
|
||||
import io.strimzi.api.kafka.model.template.KafkaConnectTemplate;
|
||||
import io.strimzi.api.kafka.model.template.KafkaConnectTemplateBuilder;
|
||||
|
||||
public class OcpKafkaConnectSpecBuilderFactory {
|
||||
|
||||
private static final String AS_APICURIO_URL = System.getProperty("as.apicurio.url");
|
||||
private static final String AS_DEBEZIUM_VERSION = System.getProperty("as.debezium.version");
|
||||
private static final int REPLICA_COUNT = 1;
|
||||
private static final String AS_URL = System.getProperty("as.url");
|
||||
private static final String DEBEZIUM_CONNECTOR_PLUGIN_NAME_PREFIX = "debezium-connector-";
|
||||
|
||||
public static KafkaConnectSpecBuilder createNonKcSetup() {
|
||||
return new KafkaConnectSpecBuilder()
|
||||
.withVersion(DEFAULT_KAFKA_CONNECT_VERSION)
|
||||
.withImage(DEFAULT_IMAGE)
|
||||
.withBootstrapServers(DEFAULT_BOOSTRAP_SERVER)
|
||||
.withLogging(getDefaultLogging())
|
||||
.withTls(getDefaultTls())
|
||||
.withReplicas(REPLICA_COUNT)
|
||||
.withMetricsConfig(getDefaultMetrics())
|
||||
.withConfig(getDefaultConfig())
|
||||
.withTemplate(getDefaultTemplate());
|
||||
}
|
||||
|
||||
public static KafkaConnectSpecBuilder createKcSetup() {
|
||||
List<Plugin> plugins = new LinkedList<>();
|
||||
List<String> dbsWithDefaultPlugin = ImmutableList.of("mysql", "postgres", "mongodb", "sqlserver");
|
||||
|
||||
for (String db : dbsWithDefaultPlugin) {
|
||||
plugins.add(prepareStandardPluginBuilder(db).build());
|
||||
}
|
||||
plugins.add(prepareStandardPluginBuilder("db2")
|
||||
.addToArtifacts(new JarArtifactBuilder()
|
||||
.withUrl(String.format("%s/jdbc/jcc-%s.jar", AS_URL, System.getProperty("version.db2.driver")))
|
||||
.build())
|
||||
.build());
|
||||
plugins.add(prepareStandardPluginBuilder("oracle")
|
||||
.addToArtifacts(new JarArtifactBuilder()
|
||||
.withUrl(String.format("%s/jdbc/ojdbc8-%s.jar", AS_URL, System.getProperty("version.oracle.driver")))
|
||||
.build())
|
||||
.build());
|
||||
|
||||
return new KafkaConnectSpecBuilder()
|
||||
.withVersion(DEFAULT_KAFKA_CONNECT_VERSION)
|
||||
.withTemplate(getDefaultTemplate())
|
||||
.withConfig(getDefaultConfig())
|
||||
.withMetricsConfig(getDefaultMetrics())
|
||||
.withReplicas(REPLICA_COUNT)
|
||||
.withTls(getDefaultTls())
|
||||
.withLogging(getDefaultLogging())
|
||||
.withBootstrapServers(DEFAULT_BOOSTRAP_SERVER)
|
||||
.withBuild(new BuildBuilder()
|
||||
.withOutput(new ImageStreamOutputBuilder()
|
||||
.withImage("testing-openshift-connect:latest")
|
||||
.build())
|
||||
.withPlugins(plugins)
|
||||
.build());
|
||||
|
||||
}
|
||||
|
||||
private static PluginBuilder prepareStandardPluginBuilder(String dbName) {
|
||||
return new PluginBuilder()
|
||||
.withName(DEBEZIUM_CONNECTOR_PLUGIN_NAME_PREFIX + dbName)
|
||||
.withArtifacts(
|
||||
new ZipArtifactBuilder()
|
||||
.withUrl(String.format("%s/debezium-connector-%s-%s-plugin.zip", AS_URL, dbName, AS_DEBEZIUM_VERSION))
|
||||
.build(),
|
||||
new ZipArtifactBuilder()
|
||||
.withUrl(AS_APICURIO_URL)
|
||||
.build(),
|
||||
new ZipArtifactBuilder()
|
||||
.withUrl(String.format("%s/debezium-scripting-%s.zip", AS_URL, AS_DEBEZIUM_VERSION))
|
||||
.build());
|
||||
}
|
||||
|
||||
private static KafkaConnectTemplate getDefaultTemplate() {
|
||||
return new KafkaConnectTemplateBuilder().withConnectContainer(new ContainerTemplateBuilder()
|
||||
.withEnv(new ContainerEnvVarBuilder()
|
||||
.withName("JMX_PORT")
|
||||
.withValue("5000")
|
||||
.build())
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ExternalLogging getDefaultLogging() {
|
||||
return new ExternalLoggingBuilder().withNewValueFromLike(new ExternalConfigurationReferenceBuilder()
|
||||
.withConfigMapKeyRef(new ConfigMapKeySelectorBuilder()
|
||||
.withKey("log4j.properties")
|
||||
.withName("connect-cfg")
|
||||
.build())
|
||||
.build())
|
||||
.endValueFrom()
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ClientTls getDefaultTls() {
|
||||
return new ClientTlsBuilder()
|
||||
.withTrustedCertificates(
|
||||
new CertSecretSourceBuilder()
|
||||
.withCertificate("ca.crt")
|
||||
.withSecretName("debezium-kafka-cluster-cluster-ca-cert")
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
|
||||
private static JmxPrometheusExporterMetrics getDefaultMetrics() {
|
||||
return new JmxPrometheusExporterMetricsBuilder().withNewValueFromLike(new ExternalConfigurationReferenceBuilder()
|
||||
.withConfigMapKeyRef(new ConfigMapKeySelectorBuilder()
|
||||
.withKey("metrics")
|
||||
.withName("connect-cfg")
|
||||
.build())
|
||||
.build())
|
||||
.endValueFrom()
|
||||
.build();
|
||||
}
|
||||
|
||||
private static ImmutableMap<String, Object> getDefaultConfig() {
|
||||
return ImmutableMap.of(
|
||||
"config.storage.replication.factor", 1,
|
||||
"offset.storage.replication.factor", 1,
|
||||
"status.storage.replication.factor", 1);
|
||||
}
|
||||
}
|
@ -21,7 +21,9 @@
|
||||
import io.debezium.testing.system.tools.kafka.OcpKafkaController;
|
||||
import io.debezium.testing.system.tools.kafka.OcpKafkaDeployer;
|
||||
import io.debezium.testing.system.tools.kafka.StrimziOperatorController;
|
||||
import io.debezium.testing.system.tools.kafka.builders.kafkaconnect.OcpKafkaConnectBuilderFactory;
|
||||
import io.fabric8.openshift.client.OpenShiftClient;
|
||||
import io.strimzi.api.kafka.model.KafkaConnectBuilder;
|
||||
|
||||
import fixture5.TestFixture;
|
||||
import fixture5.annotations.FixtureContext;
|
||||
@ -33,10 +35,7 @@ public class OcpKafka extends TestFixture {
|
||||
private final OpenShiftClient ocp;
|
||||
private final String project;
|
||||
// Kafka resources
|
||||
String KAFKA = "/kafka-resources/" + STRIMZI_CRD_VERSION + "/010-kafka.yaml";
|
||||
String KAFKA_CONNECT_LOGGING = "/kafka-resources/" + STRIMZI_CRD_VERSION + "/020-kafka-connect-cfg.yaml";
|
||||
String KAFKA_CONNECT = "/kafka-resources/" + STRIMZI_CRD_VERSION + "/021-kafka-connect.yaml";
|
||||
String KAFKA_CONNECT_BUILD = "/kafka-resources/" + STRIMZI_CRD_VERSION + "/121-kafka-connect-build.yaml";
|
||||
// Artifact Server resources
|
||||
String ARTIFACT_SERVER_DEPLOYMENT = "/artifact-server/010-deployment.yaml";
|
||||
String ARTIFACT_SERVER_SERVICE = "/artifact-server/020-service.yaml";
|
||||
@ -70,7 +69,6 @@ private void deployKafkaCluster(StrimziOperatorController operatorController) th
|
||||
.withOcpClient(ocp)
|
||||
.withHttpClient(new OkHttpClient())
|
||||
.withProject(project)
|
||||
.withYamlPath(KAFKA)
|
||||
.withOperatorController(operatorController)
|
||||
.build();
|
||||
|
||||
@ -80,18 +78,21 @@ private void deployKafkaCluster(StrimziOperatorController operatorController) th
|
||||
}
|
||||
|
||||
private void deployConnectCluster(StrimziOperatorController operatorController) throws InterruptedException {
|
||||
String yamlDescriptor = KAFKA_CONNECT;
|
||||
KafkaConnectBuilder kafkaConnectBuilder;
|
||||
|
||||
if (ConfigProperties.STRIMZI_KC_BUILD) {
|
||||
yamlDescriptor = KAFKA_CONNECT_BUILD;
|
||||
kafkaConnectBuilder = OcpKafkaConnectBuilderFactory.createKcBuildSetup();
|
||||
deployArtifactServer();
|
||||
}
|
||||
else {
|
||||
kafkaConnectBuilder = OcpKafkaConnectBuilderFactory.createNonKcBuildSetup();
|
||||
}
|
||||
|
||||
OcpKafkaConnectDeployer connectDeployer = new OcpKafkaConnectDeployer.Builder()
|
||||
.withOcpClient(ocp)
|
||||
.withHttpClient(new OkHttpClient())
|
||||
.withProject(project)
|
||||
.withYamlPath(yamlDescriptor)
|
||||
.withKafkaConnectBuilder(kafkaConnectBuilder)
|
||||
.withCfgYamlPath(KAFKA_CONNECT_LOGGING)
|
||||
.withConnectorResources(ConfigProperties.STRIMZI_OPERATOR_CONNECTORS)
|
||||
.withOperatorController(operatorController)
|
||||
|
@ -1,35 +0,0 @@
|
||||
apiVersion: kafka.strimzi.io/v1beta2
|
||||
kind: Kafka
|
||||
metadata:
|
||||
generation: 4
|
||||
name: debezium-kafka-cluster
|
||||
spec:
|
||||
entityOperator:
|
||||
topicOperator: {}
|
||||
userOperator: {}
|
||||
kafka:
|
||||
config:
|
||||
offsets.topic.replication.factor: 1
|
||||
transaction.state.log.replication.factor: 1
|
||||
transaction.state.log.min.isr: 1
|
||||
listeners:
|
||||
- name: plain
|
||||
port: 9092
|
||||
type: internal
|
||||
tls: false
|
||||
- name: tls
|
||||
port: 9093
|
||||
type: internal
|
||||
tls: true
|
||||
- name: external
|
||||
port: 9094
|
||||
type: loadbalancer
|
||||
tls: false
|
||||
replicas: 1
|
||||
storage:
|
||||
type: ephemeral
|
||||
version: ${version.strimzi.kafka}
|
||||
zookeeper:
|
||||
replicas: 1
|
||||
storage:
|
||||
type: ephemeral
|
@ -1,34 +0,0 @@
|
||||
apiVersion: kafka.strimzi.io/v1beta2
|
||||
kind: KafkaConnect
|
||||
metadata:
|
||||
name: debezium-kafka-connect-cluster
|
||||
spec:
|
||||
version: ${version.strimzi.kafka}
|
||||
image: ${image.kc}
|
||||
bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
|
||||
logging:
|
||||
type: external
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: connect-cfg
|
||||
key: log4j.properties
|
||||
tls:
|
||||
trustedCertificates:
|
||||
- certificate: ca.crt
|
||||
secretName: debezium-kafka-cluster-cluster-ca-cert
|
||||
replicas: 1
|
||||
metricsConfig:
|
||||
type: jmxPrometheusExporter
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: connect-cfg
|
||||
key: metrics
|
||||
config:
|
||||
config.storage.replication.factor: 1
|
||||
offset.storage.replication.factor: 1
|
||||
status.storage.replication.factor: 1
|
||||
template:
|
||||
connectContainer:
|
||||
env:
|
||||
- name: "JMX_PORT"
|
||||
value: "5000"
|
@ -1,91 +0,0 @@
|
||||
apiVersion: kafka.strimzi.io/v1beta2
|
||||
kind: KafkaConnect
|
||||
metadata:
|
||||
name: debezium-kafka-connect-cluster
|
||||
spec:
|
||||
version: ${version.strimzi.kafka}
|
||||
build:
|
||||
output:
|
||||
type: imagestream
|
||||
image: testing-openshift-connect:latest
|
||||
plugins:
|
||||
- name: debezium-connector-mysql
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-mysql-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- name: debezium-connector-postgres
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-postgres-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- name: debezium-connector-mongodb
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-mongodb-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- name: debezium-connector-db2
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-db2-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- type: jar
|
||||
url: ${as.url}/jdbc/jcc-${version.db2.driver}.jar
|
||||
- name: debezium-connector-sqlserver
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-sqlserver-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- name: debezium-connector-oracle
|
||||
artifacts:
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-connector-oracle-${as.debezium.version}-plugin.zip
|
||||
- type: zip
|
||||
url: ${as.apicurio.url}
|
||||
- type: zip
|
||||
url: ${as.url}/debezium-scripting-${as.debezium.version}.zip
|
||||
- type: jar
|
||||
url: ${as.url}/jdbc/ojdbc8-${version.oracle.driver}.jar
|
||||
|
||||
bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
|
||||
logging:
|
||||
type: external
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: connect-cfg
|
||||
key: log4j.properties
|
||||
tls:
|
||||
trustedCertificates:
|
||||
- certificate: ca.crt
|
||||
secretName: debezium-kafka-cluster-cluster-ca-cert
|
||||
replicas: 1
|
||||
metricsConfig:
|
||||
type: jmxPrometheusExporter
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: connect-cfg
|
||||
key: metrics
|
||||
config:
|
||||
config.storage.replication.factor: 1
|
||||
offset.storage.replication.factor: 1
|
||||
status.storage.replication.factor: 1
|
||||
template:
|
||||
connectContainer:
|
||||
env:
|
||||
- name: "JMX_PORT"
|
||||
value: "5000"
|
Loading…
Reference in New Issue
Block a user