DBZ-924 Adding integration tests for OpenShift and Strimzi

This commit is contained in:
jcechace 2020-01-18 20:43:48 +01:00 committed by Jiri Pechanec
parent 2298091798
commit 3209de3e5f
35 changed files with 2054 additions and 9 deletions

View File

@ -34,7 +34,6 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.awaitility.core.ConditionTimeoutException;
import org.fest.assertions.Assertions;
import org.junit.After;
@ -846,7 +845,7 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
// Wait max 2 seconds for LSN change
try {
Awaitility.await().atMost(Duration.TWO_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
Awaitility.await().atMost(2, TimeUnit.MINUTES.SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
}
catch (ConditionTimeoutException e) {
// We do not require all flushes to succeed in time
@ -894,7 +893,7 @@ public void shouldFlushLsnOnEmptyMessage() throws InterruptedException, SQLExcep
try {
// Wait max 5 seconds for LSN change caused by DDL_STATEMENT
Awaitility.await().atMost(Duration.FIVE_SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
Awaitility.await().atMost(5, TimeUnit.SECONDS).ignoreExceptions().until(() -> flushLsn.add(getConfirmedFlushLsn(connection)));
}
catch (ConditionTimeoutException e) {
// We do not require all flushes to succeed in time

View File

@ -39,7 +39,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Rule;
@ -1168,7 +1167,7 @@ public void shouldWarnOnMissingHeartbeatForFilteredEvents() throws Exception {
IntStream.range(0, filteredCount)
.mapToObj(x -> "INSERT INTO s1.a (pk) VALUES (default);")
.collect(Collectors.joining()));
Awaitility.await().alias("WAL growing log message").pollInterval(Duration.ONE_SECOND).atMost(Duration.TEN_SECONDS).until(() -> logInterceptor.containsWarnMessage(
Awaitility.await().alias("WAL growing log message").pollInterval(1, TimeUnit.SECONDS).atMost(10, TimeUnit.SECONDS).until(() -> logInterceptor.containsWarnMessage(
"Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation."));
}

View File

@ -17,6 +17,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
@ -297,7 +298,7 @@ protected static boolean publicationExists(String publicationName) {
protected static void waitForDefaultReplicationSlotBeActive() {
try (PostgresConnection connection = create()) {
Awaitility.await().atMost(org.awaitility.Duration.FIVE_SECONDS).until(() -> connection.prepareQueryAndMap(
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> connection.prepareQueryAndMap(
"select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ? and active = true", statement -> {
statement.setString(1, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
statement.setString(2, "postgres");

View File

@ -24,7 +24,6 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
@ -209,7 +208,7 @@ public void readOnlyApplicationIntent() throws Exception {
// Verify that multiple subsequent transactions are used in streaming phase with read-only intent
try (final SqlServerConnection admin = TestHelper.adminConnection()) {
final Set<Long> txIds = new HashSet<>();
Awaitility.await().atMost(Duration.FIVE_SECONDS).pollInterval(Duration.ONE_HUNDRED_MILLISECONDS).until(() -> {
Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> {
admin.query(
"SELECT (SELECT transaction_id FROM sys.dm_tran_session_transactions AS t WHERE s.session_id=t.session_id) FROM sys.dm_exec_sessions AS s WHERE program_name='"
+ appId + "'",

View File

@ -0,0 +1,41 @@
# OpenShift deployment verification suite
This project verifies the basic functionality of Debezium connectors with Kafka cluster deployed to OpenShift via Strimzi project.
## Prerequisites
OpenShift cluster with cluster-wide administrator access is required in order to run these tests.
Depending on chosen registry a configured docker credentials are required in order to push built
## Running the tests
```bash
mvn install -Docp.url=<ocp_api_url> -Docp.username=<ocp_password> -Docp.password=<ocp_password> -Dimage.fullname=<connect_image_name>
```
The following properties can be set to further configure the test execution
| Name | Default Value | description |
| -----| ------------- | ----------- |
| ocp.url | | OpenShift API endpoint |
| ocp.username | | OpenShift admin username |
| ocp.password | | OpenShift admin password |
| ocp.project.debezium | debezium | OpenShift debezium project |
| ocp.project.mysql | debezium-mysql | OpenShift mysql project |
| image.fullname | | Full name of Kafka Connect image |
## Building a KafkaConnect image with Debezium connectors
To build connect image running the ```assembly``` profile from parent directory together with ```image``` profile
```bash
mvn clean install -DskipTests -DskipITs -Passembly -Pimage
```
The following properties can be set to further configure image build
| Name | Default Value | description |
| -----| ------------- | ----------- |
| image.push.skip | true | Skips push to remote registry |
| image.push.registry | quay.io | remote registry base |
| image.name | debezium/kafka:${project.version}-${image.version.strimzi}-kafka-${version.kafka} | Name of built image |
| image.fullname | ${image.push.registry}/${image.name} | Full name of the built image |
| image.base.name | strimzi/kafka:${image.version.strimzi}-kafka-${version.kafka} | Base for built image |
| image.version.strimzi | latest | Version of Strimzi Kafka image |

View File

@ -0,0 +1,311 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-testing-openshift</artifactId>
<name>Debezium OpenShift integration test-suite</name>
<properties>
<version.fabric8.client>4.6.4</version.fabric8.client>
<version.strimzi>0.16.2</version.strimzi>
<version.junit5>5.5.1</version.junit5>
<version.junit5.pioneer>0.5.1</version.junit5.pioneer>
<version.assertj>3.11.1</version.assertj>
<image.fullname>quay.io/debezium/kafka:${project.version}-latest-kafka-${version.kafka}</image.fullname>
<!--OCP configuration-->
<ocp.project.debezium>debezium</ocp.project.debezium>
<ocp.project.mysql>debezium-mysql</ocp.project.mysql>
<ocp.project.postgresql>debezium-postgresql</ocp.project.postgresql>
<!--Strimzi configuration-->
<strimzi.operator.connectors>true</strimzi.operator.connectors>
<!--MySQL configuration-->
<database.mysql.port>3306</database.mysql.port>
<database.mysql.username>mysqluser</database.mysql.username>
<database.mysql.password>mysqlpw</database.mysql.password>
<database.mysql.dbz.username>debezium</database.mysql.dbz.username>
<database.mysql.dbz.password>dbz</database.mysql.dbz.password>
<database.mysql.root.password>debezium</database.mysql.root.password>
<!--MySQL configuration-->
<database.postgresql.port>5432</database.postgresql.port>
<database.postgresql.username>debezium</database.postgresql.username>
<database.postgresql.password>debezium</database.postgresql.password>
<database.postgresql.dbname>debezium</database.postgresql.dbname>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client-bom</artifactId>
<version>${version.fabric8.client}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>openshift-client</artifactId>
<version>${version.fabric8.client}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>api</artifactId>
<version>${version.strimzi}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${version.junit5}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${version.junit5}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<version>${version.junit5.pioneer}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${version.assertj}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${version.awaitility}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>image</id>
<properties>
<image.push.skip>true</image.push.skip>
<image.push.registry>quay.io</image.push.registry>
<image.registry>${image.push.registry}</image.registry>
<image.version.strimzi>latest</image.version.strimzi>
<image.name>debezium/kafka:${project.version}-${image.version.strimzi}-kafka-${version.kafka}</image.name>
<image.fullname>${image.push.registry}/${image.name}</image.fullname>
<image.base.name>strimzi/kafka:${image.version.strimzi}-kafka-${version.kafka}</image.base.name>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<classifier>plugin</classifier>
<type>zip</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<classifier>plugin</classifier>
<type>zip</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
<classifier>plugin</classifier>
<type>zip</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<classifier>plugin</classifier>
<type>zip</type>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- Possibly replaceable by Fabric8 plugin-->
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<imagePullPolicy>IfNotPresent</imagePullPolicy>
<pushRegistry>${image.push.registry}</pushRegistry>
<skipPush>${image.push.skip}</skipPush>
<images>
<image>
<name>${image.name}</name>
<build>
<from>${image.base.name}</from>
<assembly>
<targetDir>/opt/kafka/plugins</targetDir>
<inline>
<dependencySets>
<dependencySet>
<outputDirectory>.</outputDirectory>
<unpack>true</unpack>
<includes>
<include>io.debezium:debezium-connector-*:zip:plugin</include>
</includes>
</dependencySet>
</dependencySets>
</inline>
</assembly>
</build>
</image>
</images>
</configuration>
<!--
Connect this plugin to the maven lifecycle around the integration-test phase:
-->
<executions>
<execution>
<id>build</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
<goal>push</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>openshiftITs</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<test.ocp.url>${ocp.url}</test.ocp.url>
<test.ocp.username>${ocp.username}</test.ocp.username>
<test.ocp.password>${ocp.password}</test.ocp.password>
<test.ocp.secret.rhio.path>${ocp.rhio.secret.path}</test.ocp.secret.rhio.path>
<test.ocp.project.debezium>${ocp.project.debezium}</test.ocp.project.debezium>
<test.ocp.project.mysql>${ocp.project.mysql}</test.ocp.project.mysql>
<test.ocp.project.postgresql>${ocp.project.postgresql}</test.ocp.project.postgresql>
<test.strimzi.operator.connectors>${strimzi.operator.connectors}</test.strimzi.operator.connectors>
<test.database.mysql.host>${database.mysql.host}</test.database.mysql.host>
<test.database.mysql.dbz.username>${database.mysql.dbz.username}</test.database.mysql.dbz.username>
<test.database.mysql.dbz.password>${database.mysql.dbz.password}</test.database.mysql.dbz.password>
<test.database.mysql.port>${database.mysql.port}</test.database.mysql.port>
<test.database.postgresql.host>${database.postgresql.host}</test.database.postgresql.host>
<test.database.postgresql.port>${database.postgresql.port}</test.database.postgresql.port>
<test.database.postgresql.username>${database.postgresql.username}</test.database.postgresql.username>
<test.database.postgresql.password>${database.postgresql.password}</test.database.postgresql.password>
<test.database.postgresql.dbname>${database.postgresql.dbname}</test.database.postgresql.dbname>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<id>tests-openshift</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
</testResource>
</testResources>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
/**
* Utility methods for HTTP requests
* @author Jakub Cechacek
*/
public class HttpUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class);
private final OkHttpClient http;
public HttpUtils(OkHttpClient http) {
this.http = http;
}
/**
* Waits until URL starts responding with success response code
* @param url tested url
*/
public void awaitApi(HttpUrl url) {
LOGGER.info("Waiting for API at " + url);
await()
.atMost(1, TimeUnit.MINUTES)
.ignoreException(IOException.class)
.until(() -> pingApi(url));
}
private boolean pingApi(HttpUrl address) throws IOException {
Request r = new Request.Builder().url(address).build();
try (Response res = http.newCall(r).execute()) {
return res.isSuccessful();
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools;
import static org.awaitility.Awaitility.await;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.ServiceAccount;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
/**
* Utility methods for working with OpenShift
* @author Jakub Cechacek
*/
public class OpenShiftUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(OpenShiftUtils.class);
private OpenShiftClient client;
public OpenShiftUtils(OpenShiftClient client) {
this.client = client;
}
/**
* Creates route in given namespace
* @param project project where this route will be created
* @param name name of the route
* @param service target service
* @param port target port
* @param labels labels set to set on this route
* @return {@link Route} object for created route
*/
public Route createRoute(String project, String name, String service, String port, Map<String, String> labels) {
Route route = client.routes().inNamespace(project).createOrReplaceWithNew()
.withNewMetadata()
.withName(name)
.withLabels(labels)
.endMetadata()
.withNewSpec()
.withNewTo()
.withKind("Service")
.withName(service)
.endTo()
.withNewPort()
.withNewTargetPort(port)
.endPort()
.endSpec()
.done();
return route;
}
/**
* Creates new NetworkPolicy in given namespace allowing public access
* @param project project where this network policy will be created
* @param name name of the policy
* @param podSelectorLabels labels used as pod selectors
* @param ports ports for which access will be allowed
* @return {@link NetworkPolicy} object for created policy
*/
public NetworkPolicy createNetworkPolicy(String project, String name, Map<String, String> podSelectorLabels, List<NetworkPolicyPort> ports) {
NetworkPolicy policy = client.network().networkPolicies().inNamespace(project)
.createOrReplaceWithNew()
.withNewMetadata()
.withName(name)
.endMetadata()
.withNewSpec()
.withNewPodSelector()
.withMatchLabels(podSelectorLabels)
.endPodSelector()
.addNewIngress()
.addToPorts(ports.toArray(new NetworkPolicyPort[ports.size()]))
.endIngress()
.withPolicyTypes("Ingress")
.endSpec()
.done();
return policy;
}
/**
* Links pull secret to service account
* @param project project where this operation happens
* @param sa service account name
* @param secret secret name
* @return {@link} Service account object to which this secret was linked
*/
public ServiceAccount linkPullSecret(String project, String sa, String secret) {
return client.serviceAccounts()
.inNamespace(project)
.withName(sa)
.edit()
.addNewImagePullSecret(secret)
.done();
}
/**
* Finds pods with given labels
* @param project project where to look for pods
* @param labels labels used to identify pods
* @return {@link PodList} of matching pods
*/
public PodList podsWithLabels(String project, Map<String, String> labels) {
Supplier<PodList> podListSupplier = () -> client.pods().inNamespace(project).withLabels(labels).list();
await().atMost(5, TimeUnit.MINUTES).until(() -> podListSupplier.get().getItems().size() > 0);
PodList pods = podListSupplier.get();
if (pods.getItems().isEmpty()) {
LOGGER.warn("Empty PodList");
}
return pods;
}
/**
* Waits until all pods with given labels are ready
* @param project project where to look for pods
* @param labels labels used to identify pods
*/
public void waitForPods(String project, Map<String, String> labels) {
String lbls = labels.keySet().stream().map(k -> k + "=" + labels.get(k)).collect(Collectors.joining(", "));
LOGGER.info("Waiting for pods to deploy [" + lbls + "]");
PodList pods = podsWithLabels(project, labels);
for (Pod p : pods.getItems()) {
try {
client.resource(p).waitUntilReady(5, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
throw new IllegalStateException("Error when waiting for pod " + p.getMetadata().getName() + " to get ready", e);
}
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
/**
* @author Jakub Cechace
*/
public final class YAML {
/**
* Deserialize object fromResource YAML file
*
* @param path file path
* @param c type of object
* @return deserialized object
*/
public static <T> T from(String path, Class<T> c) {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
try {
return mapper.readValue(new File(path), c);
}
catch (InvalidFormatException e) {
throw new IllegalArgumentException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public static <T> T fromResource(String path, Class<T> c) {
URL resource = YAML.class.getResource(path);
return from(resource.getFile(), c);
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import java.util.List;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
*
* @author Jakub Cechacek
*/
public class DatabaseController {
private final Deployment deployment;
private final List<Service> services;
private final OpenShiftClient ocp;
private final String project;
private final String dbType;
private final OpenShiftUtils ocpUtils;
public DatabaseController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
this.deployment = deployment;
this.services = services;
this.ocp = ocp;
this.project = deployment.getMetadata().getNamespace();
this.dbType = dbType;
this.ocpUtils = new OpenShiftUtils(ocp);
}
public String getDatabaseUrl() {
Service svc = ocp
.services()
.inNamespace(project)
.withName(deployment.getMetadata().getName() + "-lb")
.get();
LoadBalancerIngress ingress = svc.getStatus().getLoadBalancer().getIngress().get(0);
String hostname = ingress.getHostname();
Integer port = svc.getSpec().getPorts().stream().filter(p -> p.getName().equals("db")).findAny().get().getPort();
return "jdbc:" + dbType + "://" + hostname + ":" + port;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.debezium.testing.openshift.tools.YAML;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
* @author Jakub Cechacek
*/
public abstract class DatabaseDeployer<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseDeployer.class);
private final OpenShiftClient ocp;
private final OpenShiftUtils ocpUtils;
private final String dbType;
private String project;
private Deployment deployment;
private List<Service> services;
public DatabaseDeployer(String dbType, OpenShiftClient ocp) {
this.dbType = dbType;
this.ocp = ocp;
this.ocpUtils = new OpenShiftUtils(ocp);
}
public T withProject(String project) {
this.project = project;
return getThis();
}
public T withDeployment(String yamlPath) {
return withDeployment(YAML.fromResource(yamlPath, Deployment.class));
}
public T withDeployment(Deployment deployment) {
this.deployment = deployment;
return getThis();
}
public T withServices(String... yamlPath) {
List<Service> services = Arrays.stream(yamlPath)
.map(p -> YAML.fromResource(p, Service.class)).collect(Collectors.toList());
return withServices(services);
}
public T withServices(Collection<Service> services) {
this.services = new ArrayList<>(services);
return getThis();
}
public DatabaseController deploy() {
if (deployment == null) {
throw new IllegalStateException("Deployment configuration not available");
}
LOGGER.info("Deploying database");
Deployment dep = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment);
List<Service> svcs = services.stream()
.map(s -> ocp.services().inNamespace(project).createOrReplace(s))
.collect(Collectors.toList());
ocpUtils.waitForPods(project, dep.getMetadata().getLabels());
LOGGER.info("Database deployed successfully");
this.deployment = dep;
this.services = svcs;
return new DatabaseController(dep, services, dbType, ocp);
}
public abstract T getThis();
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.openshift.client.OpenShiftClient;
/**
* @author Jakub Cechacek
*/
public class MySqlDeployer extends DatabaseDeployer<MySqlDeployer> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDeployer.class);
public MySqlDeployer(OpenShiftClient ocp) {
super("mysql", ocp);
}
public MySqlDeployer getThis() {
return this;
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.openshift.client.OpenShiftClient;
/**
*
* @author Jakub Cechacek
*/
public class PostgreSqlDeployer extends DatabaseDeployer<PostgreSqlDeployer> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSqlDeployer.class);
public PostgreSqlDeployer(OpenShiftClient ocp) {
super("postgresql", ocp);
}
public PostgreSqlDeployer getThis() {
return this;
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.kafka;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.strimzi.api.kafka.model.KafkaConnector;
import io.strimzi.api.kafka.model.KafkaConnectorBuilder;
/**
*
* @author Jakub Cechacek
*/
public class ConnectorConfigBuilder {
private final Map<String, Object> config;
private final ObjectMapper mapper;
public ConnectorConfigBuilder() {
this.mapper = new ObjectMapper();
this.config = new HashMap<>();
}
public ConnectorConfigBuilder put(String key, Object value) {
config.put(key, value);
return this;
}
public Map<String, Object> get() {
return config;
}
/**
* Get configuration as JSON string
* @return JSON string of connector config
*/
public String getJsonString() {
try {
return mapper.writeValueAsString(config);
}
catch (JsonProcessingException e) {
throw new IllegalStateException("Unable to convert connector config to JSON String");
}
}
/**
* Get configuration as OpenShift CR of type {@link KafkaConnector}
* @return Connector CR
*/
public KafkaConnector getCustomResource() {
Map<String, Object> crConfig = new HashMap<>(config);
KafkaConnectorBuilder connectorBuilder = new KafkaConnectorBuilder();
return connectorBuilder.withNewSpec()
.withClassName((String) crConfig.remove("connector.class"))
.withTasksMax((Integer) crConfig.remove("task.max"))
.withConfig(crConfig)
.endSpec()
.build();
}
}

View File

@ -0,0 +1,302 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.kafka;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.HttpUtils;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPortBuilder;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.DoneableKafkaConnector;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnector;
import io.strimzi.api.kafka.model.status.KafkaConnectorStatus;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
/**
* This class provides control over Kafka Connect instance deployed in OpenShift
* @author Jakub Cechacek
*/
public class KafkaConnectController {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectController.class);
private final OpenShiftClient ocp;
private final KafkaConnect kafkaConnect;
private final OkHttpClient http;
private final String project;
private final OpenShiftUtils ocpUtils;
private final HttpUtils httpUtils;
private final boolean useConnectorResources;
private Route apiRoute;
private Route metricsRoute;
public KafkaConnectController(KafkaConnect kafkaConnect, OpenShiftClient ocp, OkHttpClient http, boolean useConnectorResources) {
this.kafkaConnect = kafkaConnect;
this.ocp = ocp;
this.http = http;
this.useConnectorResources = useConnectorResources;
this.project = kafkaConnect.getMetadata().getNamespace();
this.ocpUtils = new OpenShiftUtils(ocp);
this.httpUtils = new HttpUtils(http);
}
/**
* Creates network policy allowing access to ports exposed by Kafka Connect
* @return
*/
public NetworkPolicy allowServiceAccess() {
LOGGER.info("Creating NetworkPolicy allowing public access to " + kafkaConnect.getMetadata().getName() + "'s services");
Map<String, String> labels = new HashMap<>();
labels.put("strimzi.io/cluster", kafkaConnect.getMetadata().getName());
labels.put("strimzi.io/kind", "KafkaConnect");
labels.put("strimzi.io/name", kafkaConnect.getMetadata().getName() + "-connect");
List<NetworkPolicyPort> ports = Stream.of(8083, 8404)
.map(IntOrString::new)
.map(p -> new NetworkPolicyPortBuilder().withProtocol("TCP").withPort(p).build())
.collect(Collectors.toList());
NetworkPolicy policy = ocpUtils.createNetworkPolicy(project, kafkaConnect.getMetadata().getName() + "-allowed", labels, ports);
return policy;
}
/**
* Exposes a route for kafka connect API associated with given KafkaConnect resource
* @return {@link Route} object
*/
public Route exposeApi() {
LOGGER.info("Exposing KafkaConnect API");
String name = kafkaConnect.getMetadata().getName() + "-connect-api";
Service service = ocp.services().inNamespace(project).withName(name).get();
apiRoute = ocpUtils.createRoute(project, name, name, "rest-api", service.getMetadata().getLabels());
httpUtils.awaitApi(getApiURL());
return apiRoute;
}
/**
* Exposes a route for prometheus metrics for kafka connect associated with given KafkaConnect resource
* @return {@link Route} object
*/
public Route exposeMetrics() {
LOGGER.info("Exposing KafkaConnect metrics");
String name = kafkaConnect.getMetadata().getName() + "-connect-metrics";
String nameSvc = kafkaConnect.getMetadata().getName() + "-connect-api";
Service service = ocp.services().inNamespace(project).withName(nameSvc).get();
metricsRoute = ocpUtils
.createRoute(project, name, nameSvc, "prometheus", service.getMetadata().getLabels());
httpUtils.awaitApi(getMetricsURL());
return metricsRoute;
}
/**
* Deploys Kafka connector with given name and configuration via REST
* @param name connector name
* @param config connector config
* @throws IOException or request error
*/
public void deployConnector(String name, ConnectorConfigBuilder config) throws IOException, InterruptedException {
if (useConnectorResources) {
deployConnectorCr(name, config);
}
else {
deployConnectorJson(name, config);
}
}
private void deployConnectorJson(String name, ConnectorConfigBuilder config) throws IOException {
if (apiRoute == null) {
throw new IllegalStateException("KafkaConnect API was not exposed");
}
HttpUrl url = getApiURL().resolve("/connectors/" + name + "/config");
Request r = new Request.Builder()
.url(url)
.put(RequestBody.create(config.getJsonString(), MediaType.parse("application/json")))
.build();
try (Response res = http.newCall(r).execute()) {
if (!res.isSuccessful()) {
LOGGER.error(res.request().url().toString());
throw new RuntimeException("Connector registration request returned status code '" + res.code() + "'");
}
LOGGER.info("Registered kafka connector '" + name + "'");
}
}
private void deployConnectorCr(String name, ConnectorConfigBuilder config) throws InterruptedException {
LOGGER.info("Deploying connector CR");
KafkaConnector connector = config.getCustomResource();
connector.getMetadata().setName(name);
connector.getMetadata().getLabels().put("strimzi.io/cluster", kafkaConnect.getMetadata().getName());
kafkaConnectorOperation().createOrReplace(connector);
waitForKafkaConnector(connector.getMetadata().getName());
}
/**
* Waits until connector is properly deployed.
* Note: works only for CR deployment
* @param name name of the connector
* @throws InterruptedException on wait error
* @throws IllegalArgumentException when deployment doesn't use custom resources
*/
public KafkaConnector waitForKafkaConnector(String name) throws InterruptedException {
if (!useConnectorResources) {
throw new IllegalStateException("Unable to wait for connector, deployment doesn't use custom resources.");
}
return kafkaConnectorOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES);
}
private boolean waitForReadyStatus(KafkaConnector connector) {
KafkaConnectorStatus status = connector.getStatus();
if (status == null) {
return false;
}
return status.getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Ready") && c.getStatus().equalsIgnoreCase("True"));
}
private NonNamespaceOperation<KafkaConnector, KafkaConnectorList, DoneableKafkaConnector, Resource<KafkaConnector, DoneableKafkaConnector>> kafkaConnectorOperation() {
return Crds.kafkaConnectorOperation(ocp).inNamespace(project);
}
/**
* Deletes Kafka connector with given name
* @param name connector name
* @throws IOException on request error
*/
public void undeployConnector(String name) throws IOException {
if (useConnectorResources) {
undeployConnectorrCr(name);
}
else {
undeployConnectorJson(name);
}
}
private void undeployConnectorJson(String name) throws IOException {
if (apiRoute == null) {
throw new IllegalStateException("KafkaConnect API was not exposed");
}
HttpUrl url = getApiURL().resolve("/connectors/" + name);
Request r = new Request.Builder().url(url).delete().build();
try (Response res = http.newCall(r).execute()) {
if (!res.isSuccessful()) {
LOGGER.error(res.request().url().toString());
throw new RuntimeException("Connector deletion request returned status code '" + res.code() + "'");
}
LOGGER.info("Deleted kafka connector '" + name + "'");
}
}
private void undeployConnectorrCr(String name) {
kafkaConnectorOperation().withName(name).delete();
}
public List<String> getConnectMetrics() throws IOException {
OkHttpClient httpClient = new OkHttpClient();
Request r = new Request.Builder().url(getMetricsURL()).get().build();
try (Response res = httpClient.newCall(r).execute()) {
String metrics = res.body().string();
return Stream.of(metrics.split("\\r?\\n")).collect(Collectors.toList());
}
}
/**
* Waits until Snapshot phase of given connector completes
* @param connectorName name of the connect
* @param metricName name of the metric used to determine the state
* @throws IOException on metric request error
*/
public void waitForSnapshot(String connectorName, String metricName) throws IOException {
List<String> metrics = getConnectMetrics();
await()
.atMost(5, TimeUnit.MINUTES)
.pollInterval(10, TimeUnit.SECONDS)
.until(() -> metrics.stream().anyMatch(s -> s.contains(metricName) && s.contains(connectorName)));
}
/**
* Waits until snapshot phase of given MySQL connector completes
* @param connectorName connector name
* @throws IOException on metric request error
*/
public void waitForMySqlSnapshot(String connectorName) throws IOException {
waitForSnapshot(connectorName, "debezium_mysql_connector_metrics_snapshotcompleted");
}
/**
* Waits until snapshot phase of given PostgreSQL connector completes
* @param connectorName connector name
* @throws IOException on metric request error
*/
public void waitForPostgreSqlSnapshot(String connectorName) throws IOException {
waitForSnapshot(connectorName, "debezium_postgres_connector_metrics_snapshotcompleted");
}
/**
* @return URL of Connect API endpoint
*/
public HttpUrl getApiURL() {
return new HttpUrl.Builder()
.scheme("http")
.host(apiRoute.getSpec().getHost())
.build();
}
/**
* @return URL of metrics endpoint
*/
public HttpUrl getMetricsURL() {
return new HttpUrl.Builder()
.scheme("http")
.host(metricsRoute.getSpec().getHost())
.build();
}
/**
* Undeploy this Kafka Connect cluster by deleted related KafkaConnect CR
* @return true if the CR was found and deleted
*/
public boolean undeployCluster() {
return Crds.kafkaConnectOperation(ocp).delete(kafkaConnect);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.kafka;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.status.ListenerAddress;
import io.strimzi.api.kafka.model.status.ListenerStatus;
import okhttp3.OkHttpClient;
/**
* This class provides control over Kafka instance deployed in OpenShift
* @author Jakub Cechacek
*/
public class KafkaController {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaController.class);
private final Kafka kafka;
private final OpenShiftClient ocp;
private final OkHttpClient http;
private final String project;
private final OpenShiftUtils ocpUtils;
public KafkaController(Kafka kafka, OpenShiftClient ocp, OkHttpClient http) {
this.kafka = kafka;
this.ocp = ocp;
this.http = http;
this.project = kafka.getMetadata().getNamespace();
this.ocpUtils = new OpenShiftUtils(ocp);
}
/**
* @return host and port for public bootstrap service
*/
public String getKafkaBootstrapAddress() {
List<ListenerStatus> listeners = kafka.getStatus().getListeners();
ListenerStatus listener = listeners.stream()
.filter(l -> l.getType().equalsIgnoreCase("external"))
.findAny().orElseThrow(() -> new IllegalStateException("No external listener found for Kafka cluster " + kafka.getMetadata().getName()));
ListenerAddress address = listener.getAddresses().get(0);
return address.getHost() + ":" + address.getPort();
}
/**
* Undeploy this Kafka cluster by deleted related KafkaConnect CR
* @return true if the CR was found and deleted
*/
public boolean undeployCluster() {
return Crds.kafkaOperation(ocp).delete(kafka);
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.kafka;
import static java.util.concurrent.TimeUnit.MINUTES;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.debezium.testing.openshift.tools.YAML;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectList;
import io.strimzi.api.kafka.KafkaList;
import io.strimzi.api.kafka.model.DoneableKafka;
import io.strimzi.api.kafka.model.DoneableKafkaConnect;
import io.strimzi.api.kafka.model.Kafka;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnectBuilder;
import io.strimzi.api.kafka.model.status.HasStatus;
import io.strimzi.api.kafka.model.status.Status;
import okhttp3.OkHttpClient;
/**
* Deployment management for Kafka & Kafka Connect clusters via Strimzi
* @author Jakub Cechacek
*/
public class KafkaDeployer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDeployer.class);
private final OpenShiftClient ocp;
private final OkHttpClient http;
private final OpenShiftUtils ocpUtils;
private final String project;
public KafkaDeployer(String project, OpenShiftClient ocp, OkHttpClient http) {
this.project = project;
this.ocp = ocp;
this.http = http;
this.ocpUtils = new OpenShiftUtils(ocp);
}
public KafkaDeployer(String project, OpenShiftClient ocp) {
this(project, ocp, new OkHttpClient());
}
/**
* Deploys Kafka Cluster
* @param yamlPath path to CR descriptor (must be available on class path)
* @return {@link KafkaController} instance for deployed cluster
* @throws InterruptedException
*/
public KafkaController deployKafkaCluster(String yamlPath) throws InterruptedException {
LOGGER.info("Deploying Kafka from " + yamlPath);
Kafka kafka = kafkaOperation().createOrReplace(YAML.fromResource(yamlPath, Kafka.class));
kafka = waitForKafkaCluster(kafka.getMetadata().getName());
return new KafkaController(kafka, ocp, http);
}
/**
* Deploys Kafka Connect Cluster
* @param yamlPath path to CR descriptor (must be available on class path)
* @param useConnectorResources true if connector deployment should be managed by operator
* @return {@link KafkaController} instance for deployed cluster
*/
public KafkaConnectController deployKafkaConnectCluster(String yamlPath, String loggingYamlPath, boolean useConnectorResources) throws InterruptedException {
LOGGER.info("Deploying KafkaConnect from" + yamlPath);
ocp.configMaps().inNamespace(project).createOrReplace(YAML.fromResource(loggingYamlPath, ConfigMap.class));
KafkaConnect kafkaConnect = YAML.fromResource(yamlPath, KafkaConnect.class);
if (useConnectorResources) {
kafkaConnect = new KafkaConnectBuilder(kafkaConnect)
.editMetadata()
.addToAnnotations("strimzi.io/use-connector-resources", "true")
.endMetadata()
.build();
}
kafkaConnectOperation().createOrReplace(kafkaConnect);
kafkaConnect = waitForConnectCluster(kafkaConnect.getMetadata().getName());
return new KafkaConnectController(kafkaConnect, ocp, http, useConnectorResources);
}
public Kafka waitForKafkaCluster(String name) throws InterruptedException {
return kafkaOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES);
}
public KafkaConnect waitForConnectCluster(String name) throws InterruptedException {
return kafkaConnectOperation().withName(name).waitUntilCondition(this::waitForReadyStatus, 5, MINUTES);
}
private <T extends Status> boolean waitForReadyStatus(HasStatus<T> kc) {
return kc.getStatus() != null &&
kc.getStatus().getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Ready") && c.getStatus().equalsIgnoreCase("True"));
}
/**
* Deploys pull secret and links it to "default" service account in the project
* @param yamlPath path to Secret descriptor
* @return deployed pull secret
*/
public Secret deployPullSecret(String yamlPath) {
Secret pullSecret = ocp.secrets().createOrReplace(YAML.from(yamlPath, Secret.class));
ocpUtils.linkPullSecret(project, "default", pullSecret.getMetadata().getName());
return pullSecret;
}
private NonNamespaceOperation<Kafka, KafkaList, DoneableKafka, Resource<Kafka, DoneableKafka>> kafkaOperation() {
return Crds.kafkaOperation(ocp).inNamespace(project);
}
private NonNamespaceOperation<KafkaConnect, KafkaConnectList, DoneableKafkaConnect, Resource<KafkaConnect, DoneableKafkaConnect>> kafkaConnectOperation() {
return Crds.kafkaConnectOperation(ocp).inNamespace(project);
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.tools.kafka.KafkaConnectController;
import io.debezium.testing.openshift.tools.kafka.KafkaController;
import io.debezium.testing.openshift.tools.kafka.KafkaDeployer;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.openshift.client.DefaultOpenShiftClient;
import io.fabric8.openshift.client.OpenShiftClient;
/**
* @author Jakub Cechacek
*/
public abstract class ConnectorTestBase {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorTestBase.class);
public static final String KAFKA = "/kafka-resources/010-kafka.yaml";
public static final String KAFKA_CONNECT_S2I_LOGGING = "/kafka-resources/020-kafka-connect-logging.yaml";
public static final String KAFKA_CONNECT_S2I = "/kafka-resources/021-kafka-connect.yaml";
protected static Properties KAFKA_CONSUMER_PROPS = new Properties();
protected static OpenShiftClient ocp;
protected static TestUtils testUtils;
protected static KafkaDeployer kafkaDeployer;
protected static KafkaController kafkaController;
protected static KafkaConnectController kafkaConnectController;
@BeforeAll
public static void setup() throws InterruptedException {
Config cfg = new ConfigBuilder()
.withMasterUrl(ConfigProperties.OCP_URL)
.withUsername(ConfigProperties.OCP_USERNAME)
.withPassword(ConfigProperties.OCP_PASSWORD)
.withTrustCerts(true)
.build();
ocp = new DefaultOpenShiftClient(cfg);
testUtils = new TestUtils();
kafkaDeployer = new KafkaDeployer(ConfigProperties.OCP_PROJECT_DBZ, ocp);
ConfigProperties.OCP_SECRET_RHIO_PATH.ifPresent(kafkaDeployer::deployPullSecret);
kafkaController = kafkaDeployer.deployKafkaCluster(KAFKA);
kafkaConnectController = kafkaDeployer.deployKafkaConnectCluster(KAFKA_CONNECT_S2I, KAFKA_CONNECT_S2I_LOGGING, ConfigProperties.STRIMZI_OPERATOR_CONNECTORS);
kafkaConnectController.allowServiceAccess();
kafkaConnectController.exposeApi();
kafkaConnectController.exposeMetrics();
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaController.getKafkaBootstrapAddress());
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.GROUP_ID_CONFIG, "DEBEZIUM_IT_01");
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KAFKA_CONSUMER_PROPS.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
}
protected void assertTopicsExist(String... names) {
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
Set<String> topics = consumer.listTopics().keySet();
assertThat(topics).contains(names);
});
}
}
protected void assertRecordsCount(String topic, int count) {
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
consumer.subscribe(Collections.singleton(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS));
consumer.seekToBeginning(consumer.assignment());
assertThat(records.count()).isEqualTo(count);
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift;
/**
*
* @author Jakub Cechacek
*/
public class TestUtils {
/**
* Generates unique identifier
* @return unique id
*/
public String getUniqueId() {
return String.valueOf(System.currentTimeMillis());
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.mysql;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.databases.MySqlDeployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
/**
* @author Jakub Cechacek
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Tag("acceptance")
@Tag("mysql")
public class MySqlConnectorIT extends ConnectorTestBase {
public static final String DB_DEPLOYMENT_PATH = "/database-resources/mysql/deployment.yaml";
public static final String DB_SERVICE_PATH_LB = "/database-resources/mysql/service-lb.yaml";
public static final String DB_SERVICE_PATH = "/database-resources/mysql/service.yaml";
public static final String CONNECTOR_NAME = "inventory-connector-mysql";
private static MySqlDeployer dbDeployer;
private static OkHttpClient httpClient = new OkHttpClient();
private static ConnectorFactories connectorFactories = new ConnectorFactories();
private static String connectorName;
@BeforeAll
public static void setupDatabase() throws IOException, InterruptedException {
if (!ConfigProperties.DATABASE_MYSQL_HOST.isPresent()) {
dbDeployer = new MySqlDeployer(ocp)
.withProject(ConfigProperties.OCP_PROJECT_MYSQL)
.withDeployment(DB_DEPLOYMENT_PATH)
.withServices(DB_SERVICE_PATH_LB, DB_SERVICE_PATH);
dbDeployer.deploy();
}
connectorName = CONNECTOR_NAME + "-" + testUtils.getUniqueId();
ConnectorConfigBuilder connectorConfig = connectorFactories.mysql().put("database.server.name", connectorName);
kafkaConnectController.deployConnector(connectorName, connectorConfig);
}
@AfterAll
public static void tearDownDatabase() throws IOException {
kafkaConnectController.undeployConnector(connectorName);
}
@Test
@Order(1)
public void shouldHaveRegisteredConnector() {
Request r = new Request.Builder()
.url(kafkaConnectController.getApiURL().resolve("/connectors"))
.build();
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
try (Response res = httpClient.newCall(r).execute()) {
assertThat(res.body().string()).contains(connectorName);
}
});
}
@Test
@Order(2)
public void shouldCreateKafkaTopics() {
assertTopicsExist(
connectorName + ".inventory.addresses",
connectorName + ".inventory.customers",
connectorName + ".inventory.geom",
connectorName + ".inventory.orders",
connectorName + ".inventory.products",
connectorName + ".inventory.products_on_hand");
}
@Test
@Order(3)
public void shouldContainRecordsInCustomersTopic() throws IOException {
kafkaConnectController.waitForMySqlSnapshot(connectorName);
assertRecordsCount(connectorName + ".inventory.customers", 4);
}
}

View File

@ -0,0 +1,106 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.postgresql;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.databases.PostgreSqlDeployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
/**
* @author Jakub Cechacek
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Tag("acceptance")
@Tag("postgresql")
public class PostgreSqlConnectorIT extends ConnectorTestBase {
public static final String DB_DEPLOYMENT_PATH = "/database-resources/postgresql/deployment.yaml";
public static final String DB_SERVICE_PATH_LB = "/database-resources/postgresql/service-lb.yaml";
public static final String DB_SERVICE_PATH = "/database-resources/postgresql/service.yaml";
public static final String CONNECTOR_NAME = "inventory-connector-postgresql";
private static PostgreSqlDeployer dbDeployer;
private static OkHttpClient httpClient = new OkHttpClient();
private static ConnectorFactories connectorFactories = new ConnectorFactories();
private static String connectorName;
@BeforeAll
public static void setupDatabase() throws IOException, InterruptedException {
if (!ConfigProperties.DATABASE_MYSQL_HOST.isPresent()) {
dbDeployer = new PostgreSqlDeployer(ocp)
.withProject(ConfigProperties.OCP_PROJECT_POSTGRESQL)
.withDeployment(DB_DEPLOYMENT_PATH)
.withServices(DB_SERVICE_PATH_LB, DB_SERVICE_PATH);
dbDeployer.deploy();
}
String id = testUtils.getUniqueId();
connectorName = CONNECTOR_NAME + "-" + id;
ConnectorConfigBuilder connectorConfig = connectorFactories.postgresql()
.put("database.server.name", connectorName)
.put("slot.name", "debezium_" + id)
.put("slot.drop.on.stop", true);
kafkaConnectController.deployConnector(connectorName, connectorConfig);
}
@AfterAll
public static void tearDownDatabase() throws IOException {
kafkaConnectController.undeployConnector(connectorName);
}
@Test
@Order(1)
public void shouldHaveRegisteredConnector() {
Request r = new Request.Builder()
.url(kafkaConnectController.getApiURL().resolve("/connectors"))
.build();
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
try (Response res = httpClient.newCall(r).execute()) {
assertThat(res.body().string()).contains(connectorName);
}
});
}
@Test
@Order(2)
public void shouldCreateKafkaTopics() {
assertTopicsExist(
connectorName + ".inventory.customers",
connectorName + ".inventory.orders",
connectorName + ".inventory.products",
connectorName + ".inventory.products_on_hand");
}
@Test
@Order(3)
public void shouldContainRecordsInCustomersTopic() throws IOException {
kafkaConnectController.waitForPostgreSqlSnapshot(connectorName);
assertRecordsCount(connectorName + ".inventory.customers", 4);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.resources;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Jakub Cechacek
*/
public class ConfigProperties {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigProperties.class);
public static final String OCP_URL = stringProperty("test.ocp.url");
public static final String OCP_USERNAME = stringProperty("test.ocp.username");
public static final String OCP_PASSWORD = stringProperty("test.ocp.password");
public static final String OCP_PROJECT_DBZ = stringProperty("test.ocp.project.debezium");
public static final String OCP_PROJECT_MYSQL = System.getProperty("test.ocp.project.mysql", "debezium-mysql");
public static final String OCP_PROJECT_POSTGRESQL = System.getProperty("test.ocp.project.postgresql", "debezium-postgresql");
public static final Optional<String> OCP_SECRET_RHIO_PATH = stringOptionalProperty("test.ocp.secret.rhio.path");
public static final boolean STRIMZI_OPERATOR_CONNECTORS = booleanProperty("test.strimzi.operator.connectors");
public static final int DATABASE_MYSQL_PORT = Integer.parseInt(System.getProperty("test.database.mysql.port", "3306"));
public static final String DATABASE_MYSQL_DBZ_USERNAME = System.getProperty("test.database.mysql.dbz.username", "debezium");
public static final String DATABASE_MYSQL_DBZ_PASSWORD = System.getProperty("test.database.mysql.dbz.password", "dbz");
public static final Optional<String> DATABASE_MYSQL_HOST = stringOptionalProperty("test.database.mysql.host");
public static final int DATABASE_POSTGRESQL_PORT = Integer.parseInt(System.getProperty("test.database.postgresql.port", "5432"));
public static final String DATABASE_POSTGRESQL_DBZ_USERNAME = System.getProperty("test.database.postgresql.username", "debezium");
public static final String DATABASE_POSTGRESQL_DBZ_PASSWORD = System.getProperty("test.database.postgresql.password", "debezium");
public static final String DATABASE_POSTGRESQL_DBZ_DBNAME = System.getProperty("test.database.postgresql.dbname", "debezium");
public static final Optional<String> DATABASE_POSTGRESQL_HOST = stringOptionalProperty("test.database.postgresql.host");
private static boolean booleanProperty(String key) {
String value = System.getProperty(key);
if (value == null || value.isEmpty() || value.equalsIgnoreCase("false") || value.equalsIgnoreCase("0")) {
return false;
}
return true;
}
private static Optional<String> stringOptionalProperty(String key) {
String value = System.getProperty(key);
return Optional.ofNullable((value == null || value.isEmpty()) ? null : value);
}
private static String stringProperty(String key) {
String value = System.getProperty(key);
if (value == null) {
LOGGER.error("Undefined property " + key);
throw new IllegalStateException("Undefined property \"" + key + "\"");
}
return value;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.resources;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MYSQL_HOST;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_POSTGRESQL_HOST;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
/**
*
* @author Jakub Cechacek
*/
public class ConnectorFactories {
public ConnectorConfigBuilder mysql() {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder();
String dbHost = DATABASE_MYSQL_HOST.orElse("mysql." + ConfigProperties.OCP_PROJECT_MYSQL + ".svc.cluster.local");
return cb
.put("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)
.put("database.port", ConfigProperties.DATABASE_MYSQL_PORT)
.put("database.user", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD)
.put("database.server.name", "mysqldb") // this should be overwritten with unique name
.put("database.whitelist", "inventory") // might want to change
.put("database.history.kafka.bootstrap.servers", "debezium-kafka-cluster-kafka-bootstrap." + ConfigProperties.OCP_PROJECT_DBZ + ".svc.cluster.local:9092")
.put("database.history.kafka.topic", "schema-changes.inventory");
}
public ConnectorConfigBuilder postgresql() {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder();
String dbHost = DATABASE_POSTGRESQL_HOST.orElse("postgresql." + ConfigProperties.OCP_PROJECT_POSTGRESQL + ".svc.cluster.local");
return cb
.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)
.put("database.port", ConfigProperties.DATABASE_POSTGRESQL_PORT)
.put("database.user", ConfigProperties.DATABASE_POSTGRESQL_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_POSTGRESQL_DBZ_PASSWORD)
.put("database.dbname", ConfigProperties.DATABASE_POSTGRESQL_DBZ_DBNAME)
.put("database.server.name", "postgresqldb") // this should be overwritten with unique name
.put("schema.whitelist", "inventory") // might want to change
.put("slot.name", "debezium") // this should be overwritten with unique name
.put("plugin.name", "pgoutput");
}
}

View File

@ -0,0 +1,62 @@
kind: Deployment
apiVersion: apps/v1
metadata:
name: mysql
labels:
app: mysql
spec:
replicas: 1
selector:
matchLabels:
app: mysql
deployment: mysql
template:
metadata:
labels:
app: mysql
deployment: mysql
spec:
volumes:
- name: mysql-volume-1
emptyDir: {}
containers:
- resources: {}
name: mysql
env:
- name: MYSQL_PASSWORD
value: ${database.mysql.password}
- name: MYSQL_ROOT_PASSWORD
value: ${database.mysql.root.password}
- name: MYSQL_USER
value: ${database.mysql.username}
ports:
- containerPort: 3306
protocol: TCP
- containerPort: 33060
protocol: TCP
imagePullPolicy: IfNotPresent
volumeMounts:
- name: mysql-volume-1
mountPath: /var/lib/mysql
livenessProbe:
initialDelaySeconds: 30
tcpSocket:
port: 3306
timeoutSeconds: 1
readinessProbe:
exec:
command:
- "/bin/sh"
- "-i"
- "-c"
- "MYSQL_PWD=\"$MYSQL_PASSWORD\" mysql -h 127.0.0.1 -u $MYSQL_USER -D inventory -e 'SELECT 1'"
initialDelaySeconds: 5
timeoutSeconds": 1
terminationMessagePolicy: File
terminationMessagePath: /dev/termination-log
image: 'debezium/example-mysql:latest'
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
strategy:
type: Recreate

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: mysql-lb
spec:
selector:
app: mysql
deployment: mysql
type: LoadBalancer
ports:
- name: db
port: 3306
targetPort: 3306

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
selector:
app: mysql
deployment: mysql
ports:
- name: db
port: 3306
targetPort: 3306

View File

@ -0,0 +1,54 @@
kind: Deployment
apiVersion: apps/v1
metadata:
name: postgresql
labels:
app: postgresql
spec:
replicas: 1
selector:
matchLabels:
app: postgresql
deployment: postgresql
template:
metadata:
labels:
app: postgresql
deployment: postgresql
spec:
containers:
- resources: {}
name: postgresql
env:
- name: POSTGRESQL_PASSWORD
value: ${database.postgresql.password}
- name: POSTGRESQL_DATABASE
value: ${database.postgresql.dbname}
- name: POSTGRESQL_USER
value: ${database.postgresql.username}
ports:
- containerPort: 5432
protocol: TCP
imagePullPolicy: Always
livenessProbe:
initialDelaySeconds: 30
tcpSocket:
port: 5432
timeoutSeconds: 1
readinessProbe:
exec:
command:
- "/bin/sh"
- "-i"
- "-c"
- "PGPASSWORD=${POSTGRESQL_PASSWORD} /usr/bin/psql -w -U ${POSTGRESQL_USER} -d ${POSTGRESQL_DATABASE} -c 'SELECT 1'"
initialDelaySeconds: 5
timeoutSeconds": 1
terminationMessagePolicy: File
terminationMessagePath: /dev/termination-log
image: 'quay.io/debezium/example-postgres-ocp:latest'
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
strategy:
type: Recreate

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: postgresql-lb
spec:
selector:
app: postgresql
deployment: postgresql
type: LoadBalancer
ports:
- name: db
port: 5432
targetPort: 5432

View File

@ -0,0 +1,12 @@
apiVersion: v1
kind: Service
metadata:
name: postgresql
spec:
selector:
app: postgresql
deployment: postgresql
ports:
- name: db
port: 5432
targetPort: 5432

View File

@ -0,0 +1,28 @@
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
generation: 4
name: debezium-kafka-cluster
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
listeners:
external:
type: loadbalancer
tls: false
plain: {}
tls: {}
replicas: 1
storage:
type: ephemeral
version: ${version.kafka}
zookeeper:
replicas: 1
storage:
type: ephemeral

View File

@ -0,0 +1,24 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: connect-logging
data:
log4j.properties: |+
kafka.logs.dir=logs
log4j.rootLogger=INFO, stdout, appender
# Disable excessive reflection warnings - KAFKA-5229
log4j.logger.org.reflections=ERROR
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.threshold=DEBUG
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n
log4j.appender.appender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.appender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.appender.File=${kafka.logs.dir}/connect-service.log
log4j.appender.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.appender.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n

View File

@ -0,0 +1,27 @@
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: debezium-kafka-connect-cluster
spec:
version: ${version.kafka}
image: ${image.fullname}
bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
logging:
name: connect-logging
type: external
tls:
trustedCertificates:
- certificate: ca.crt
secretName: debezium-kafka-cluster-cluster-ca-cert
replicas: 1
metrics:
lowercaseOutputName: true
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
template:
connectContainer:
env:
- name: "JMX_PORT"
value: "5000"

View File

@ -0,0 +1,9 @@
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Root logger option
log4j.rootLogger=INFO, stdout

View File

@ -12,5 +12,6 @@
<packaging>pom</packaging>
<modules>
<module>debezium-testing-testcontainers</module>
<module>debezium-testing-openshift</module>
</modules>
</project>

View File

@ -91,7 +91,7 @@
<version.fest>1.4</version.fest>
<version.jmh>1.21</version.jmh>
<version.mockito>3.0.0</version.mockito>
<version.awaitility>3.1.6</version.awaitility>
<version.awaitility>4.0.1</version.awaitility>
<version.testcontainers>1.12.5</version.testcontainers>
<version.jsonpath>2.4.0</version.jsonpath>
<version.okhttp>4.2.2</version.okhttp>
@ -117,6 +117,8 @@
<version.checkstyle.plugin>3.0.0</version.checkstyle.plugin>
<version.release.plugin>2.5.3</version.release.plugin>
<version.impsort>1.2.0</version.impsort>
<version.surefire.plugin>2.22.2</version.surefire.plugin>
<version.failsafe.plugin>${version.surefire.plugin}</version.failsafe.plugin>
<!-- Dockerfiles -->
<docker.maintainer>Debezium community</docker.maintainer>