DBZ-2132 tools refactoring in preparation for Mongo and SQL Server ocp tests

This commit is contained in:
jcechace 2020-06-01 19:07:24 +02:00 committed by Jiri Pechanec
parent 6ed1ba88e3
commit 93a3a6c5bb
17 changed files with 388 additions and 93 deletions

View File

@ -13,8 +13,10 @@
<name>Debezium OpenShift integration test-suite</name>
<properties>
<version.fabric8.client>4.6.4</version.fabric8.client>
<version.strimzi>0.16.2</version.strimzi>
<version.fabric8.client>4.10.1</version.fabric8.client>
<version.commons.codec>1.14</version.commons.codec>
<version.commons.compress>1.20</version.commons.compress>
<version.strimzi>0.18.0</version.strimzi>
<version.junit5>5.5.1</version.junit5>
<version.junit5.pioneer>0.5.1</version.junit5.pioneer>
<version.assertj>3.11.1</version.assertj>
@ -27,6 +29,7 @@
<ocp.project.debezium>debezium</ocp.project.debezium>
<ocp.project.mysql>debezium-mysql</ocp.project.mysql>
<ocp.project.postgresql>debezium-postgresql</ocp.project.postgresql>
<ocp.project.mongo>debezium-mongo</ocp.project.mongo>
<!--Strimzi configuration-->
<strimzi.operator.connectors>true</strimzi.operator.connectors>
@ -39,13 +42,31 @@
<database.mysql.dbz.password>dbz</database.mysql.dbz.password>
<database.mysql.root.password>debezium</database.mysql.root.password>
<!--MySQL configuration-->
<!--PostgreSQL configuration-->
<database.postgresql.port>5432</database.postgresql.port>
<database.postgresql.username>debezium</database.postgresql.username>
<database.postgresql.password>debezium</database.postgresql.password>
<database.postgresql.dbz.username>debezium</database.postgresql.dbz.username>
<database.postgresql.dbz.password>debezium</database.postgresql.dbz.password>
<database.postgresql.dbname>debezium</database.postgresql.dbname>
<!--SqlServer configuration-->
<database.sqlserver.port>5432</database.sqlserver.port>
<database.sqlserver.username>sa</database.sqlserver.username>
<database.sqlserver.password>Debezium1$</database.sqlserver.password>
<database.sqlserver.dbz.username>sa</database.sqlserver.dbz.username>
<database.sqlserver.dbz.password>Debezium$1</database.sqlserver.dbz.password>
<database.sqlserver.dbname>TestDB</database.sqlserver.dbname>
<!--MongoDB configuration-->
<database.mongo.port>27017</database.mongo.port>
<database.mongo.username>admin</database.mongo.username>
<database.mongo.password>admin</database.mongo.password>
<database.mongo.dbz.username>debezium</database.mongo.dbz.username>
<database.mongo.dbz.password>dbz</database.mongo.dbz.password>
<database.mongo.dbz.dbname>inventory</database.mongo.dbz.dbname>
<database.mongo.dbz.login.dbname>admin</database.mongo.dbz.login.dbname>
</properties>
<dependencyManagement>
@ -67,6 +88,19 @@
<version>${version.fabric8.client}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${version.commons.codec}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${version.commons.compress}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@ -164,6 +198,16 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
</dependency>
</dependencies>
<profiles>
@ -277,6 +321,7 @@
<test.ocp.project.debezium>${ocp.project.debezium}</test.ocp.project.debezium>
<test.ocp.project.mysql>${ocp.project.mysql}</test.ocp.project.mysql>
<test.ocp.project.postgresql>${ocp.project.postgresql}</test.ocp.project.postgresql>
<test.ocp.project.mongo>${ocp.project.mongo}</test.ocp.project.mongo>
<test.strimzi.operator.connectors>${strimzi.operator.connectors}</test.strimzi.operator.connectors>
<test.database.mysql.host>${database.mysql.host}</test.database.mysql.host>
@ -293,6 +338,15 @@
<test.database.postgresql.dbz.username>${database.postgresql.dbz.username}</test.database.postgresql.dbz.username>
<test.database.postgresql.dbz.password>${database.postgresql.dbz.password}</test.database.postgresql.dbz.password>
<test.database.postgresql.dbname>${database.postgresql.dbname}</test.database.postgresql.dbname>
<test.database.mongo.host>${database.mongo.host}</test.database.mongo.host>
<test.database.mongo.port>${database.mongo.port}</test.database.mongo.port>
<test.database.mongo.username>${database.mongo.username}</test.database.mongo.username>
<test.database.mongo.password>${database.mongo.password}</test.database.mongo.password>
<test.database.mongo.dbz.username>${database.mongo.dbz.username}</test.database.mongo.dbz.username>
<test.database.mongo.dbz.password>${database.mongo.dbz.password}</test.database.mongo.dbz.password>
<test.database.mongo.dbz.dbname>${database.mongo.dbz.dbname}</test.database.mongo.dbz.dbname>
<test.database.mongo.dbz.login.dbname>${database.mongo.dbz.login.dbname}</test.database.mongo.dbz.login.dbname>
</systemPropertyVariables>
</configuration>
<executions>

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import java.util.Objects;
import java.util.function.Consumer;
/**
* An adaptation of {@link Consumer} which allows for exceptions to pass through
* @author Jakub Cechacek
*/
@FunctionalInterface
public interface Commands<T, E extends Throwable> {
/**
* Performs this operation on the given argument.
*
* @param t the input argument
* @throws E possibly thrown exception
*/
void execute(T t) throws E;
/**
* See {@link Consumer}
* @param after the operation to perform after this operation
* @return a composed {@code Consumer} that performs in sequence this
* operation followed by the {@code after} operation
* @throws NullPointerException if {@code after} is null
*/
default Commands<T, E> andThen(Commands<? super T, E> after) {
Objects.requireNonNull(after);
return (T t) -> {
execute(t);
after.execute(t);
};
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
/**
*
* @author Jakub Cechacek
*/
public interface DatabaseClient<C, E extends Throwable> {
void execute(Commands<C, E> commands) throws E;
}

View File

@ -5,10 +5,6 @@
*/
package io.debezium.testing.openshift.tools.databases;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -16,6 +12,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.debezium.testing.openshift.tools.WaitConditions;
import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
@ -25,16 +22,16 @@
*
* @author Jakub Cechacek
*/
public class DatabaseController {
public abstract class DatabaseController<C extends DatabaseClient<?, ?>> {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseController.class);
private final OpenShiftClient ocp;
private final String project;
private final String dbType;
private final OpenShiftUtils ocpUtils;
private Deployment deployment;
private String name;
private List<Service> services;
protected final OpenShiftClient ocp;
protected final String project;
protected final String dbType;
protected final OpenShiftUtils ocpUtils;
protected Deployment deployment;
protected String name;
protected List<Service> services;
public DatabaseController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
this.deployment = deployment;
@ -55,16 +52,7 @@ public String getDatabaseUrl() {
LoadBalancerIngress ingress = svc.getStatus().getLoadBalancer().getIngress().get(0);
String hostname = ingress.getHostname();
Integer port = svc.getSpec().getPorts().stream().filter(p -> p.getName().equals("db")).findAny().get().getPort();
return "jdbc:" + dbType + "://" + hostname + ":" + port + "/";
}
public void executeStatement(String database, String username, String password, String sql) throws SQLException {
try (Connection con = DriverManager.getConnection(getDatabaseUrl(), username, password)) {
con.setCatalog(database);
Statement stmt = con.createStatement();
stmt.execute(sql);
}
return constructDatabaseUrl(hostname, port);
}
public void reload() throws InterruptedException {
@ -73,12 +61,14 @@ public void reload() throws InterruptedException {
deployment = ocp.apps().deployments()
.inNamespace(project)
.withName(name)
.waitUntilCondition(this::deploymentAvailableCondition, 30, TimeUnit.SECONDS);
.waitUntilCondition(WaitConditions::deploymentAvailableCondition, 1, TimeUnit.MINUTES);
LOGGER.info("Deployment '" + name + "' is available");
initialize();
}
private boolean deploymentAvailableCondition(Deployment d) {
return d.getStatus() != null &&
d.getStatus().getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Available") && c.getStatus().equalsIgnoreCase("true"));
}
public abstract void initialize() throws InterruptedException;
public abstract C getDatabaseClient(String username, String password);
protected abstract String constructDatabaseUrl(String hostname, int port);
}

View File

@ -23,7 +23,7 @@
/**
* @author Jakub Cechacek
*/
public abstract class DatabaseDeployer<T> {
public abstract class DatabaseDeployer<T extends DatabaseDeployer, C extends DatabaseController> {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseDeployer.class);
private final OpenShiftClient ocp;
@ -64,7 +64,7 @@ public T withServices(Collection<Service> services) {
return getThis();
}
public DatabaseController deploy() {
public C deploy() {
if (deployment == null) {
throw new IllegalStateException("Deployment configuration not available");
}
@ -81,8 +81,10 @@ public DatabaseController deploy() {
this.deployment = dep;
this.services = svcs;
return new DatabaseController(dep, services, dbType, ocp);
return getController(dep, services, dbType, ocp);
}
public abstract T getThis();
public abstract C getController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp);
}

View File

@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author Jakub Cechacek
*/
public class SqlDatabaseClient implements DatabaseClient<Connection, SQLException> {
private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseController.class);
private String url;
private String username;
private String password;
public SqlDatabaseClient(String url, String username, String password) {
this.url = url;
this.username = username;
this.password = password;
}
public void execute(Commands<Connection, SQLException> commands) throws SQLException {
try (Connection con = DriverManager.getConnection(url, username, password)) {
commands.execute(con);
}
}
public void execute(String database, Commands<Connection, SQLException> commands) throws SQLException {
Commands<Connection, SQLException> withDatabase = con -> con.setCatalog(database);
execute(con -> withDatabase.andThen(commands).execute(con));
}
public void execute(String database, String command) throws SQLException {
LOGGER.info("Running SQL Command [%s]: %s", database, command);
execute(database, con -> {
Statement stmt = con.createStatement();
stmt.execute(command);
});
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
import java.util.List;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
*
* @author Jakub Cechacek
*/
public class SqlDatabaseController extends DatabaseController<SqlDatabaseClient> {
public SqlDatabaseController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
super(deployment, services, dbType, ocp);
}
protected String constructDatabaseUrl(String hostname, int port) {
return "jdbc:" + dbType + "://" + hostname + ":" + port + "/";
}
@Override
public void initialize() {
// no-op
}
@Override
public SqlDatabaseClient getDatabaseClient(String username, String password) {
return new SqlDatabaseClient(getDatabaseUrl(), username, password);
}
}

View File

@ -3,17 +3,23 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
package io.debezium.testing.openshift.tools.databases.mysql;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.databases.DatabaseDeployer;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseController;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
* @author Jakub Cechacek
*/
public class MySqlDeployer extends DatabaseDeployer<MySqlDeployer> {
public class MySqlDeployer extends DatabaseDeployer<MySqlDeployer, SqlDatabaseController> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlDeployer.class);
public MySqlDeployer(OpenShiftClient ocp) {
@ -23,4 +29,8 @@ public MySqlDeployer(OpenShiftClient ocp) {
public MySqlDeployer getThis() {
return this;
}
public SqlDatabaseController getController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
return new SqlDatabaseController(deployment, services, dbType, ocp);
}
}

View File

@ -3,18 +3,24 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.openshift.tools.databases;
package io.debezium.testing.openshift.tools.databases.postgresql;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.openshift.tools.databases.DatabaseDeployer;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseController;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.openshift.client.OpenShiftClient;
/**
*
* @author Jakub Cechacek
*/
public class PostgreSqlDeployer extends DatabaseDeployer<PostgreSqlDeployer> {
public class PostgreSqlDeployer extends DatabaseDeployer<PostgreSqlDeployer, SqlDatabaseController> {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSqlDeployer.class);
public PostgreSqlDeployer(OpenShiftClient ocp) {
@ -24,4 +30,8 @@ public PostgreSqlDeployer(OpenShiftClient ocp) {
public PostgreSqlDeployer getThis() {
return this;
}
public SqlDatabaseController getController(Deployment deployment, List<Service> services, String dbType, OpenShiftClient ocp) {
return new SqlDatabaseController(deployment, services, dbType, ocp);
}
}

View File

@ -57,7 +57,11 @@ public KafkaConnector getCustomResource() {
Map<String, Object> crConfig = new HashMap<>(config);
KafkaConnectorBuilder connectorBuilder = new KafkaConnectorBuilder();
return connectorBuilder.withNewSpec()
return connectorBuilder
.withNewMetadata()
.withLabels(new HashMap<>())
.endMetadata()
.withNewSpec()
.withClassName((String) crConfig.remove("connector.class"))
.withTasksMax((Integer) crConfig.remove("task.max"))
.withConfig(crConfig)

View File

@ -159,7 +159,7 @@ public Route exposeMetrics() {
Service service = ocp.services().inNamespace(project).withName(nameSvc).get();
metricsRoute = ocpUtils
.createRoute(project, name, nameSvc, "prometheus", service.getMetadata().getLabels());
.createRoute(project, name, nameSvc, "tcp-prometheus", service.getMetadata().getLabels());
httpUtils.awaitApi(getMetricsURL());
return metricsRoute;
@ -172,6 +172,7 @@ public Route exposeMetrics() {
* @throws IOException or request error
*/
public void deployConnector(String name, ConnectorConfigBuilder config) throws IOException, InterruptedException {
LOGGER.info("Deploying connector " + name);
if (useConnectorResources) {
deployConnectorCr(name, config);
}
@ -311,6 +312,24 @@ public void waitForPostgreSqlSnapshot(String connectorName) throws IOException {
waitForSnapshot(connectorName, "debezium_postgres_connector_metrics_snapshotcompleted");
}
/**
* Waits until snapshot phase of given SQL Server connector completes
* @param connectorName connector name
* @throws IOException on metric request error
*/
public void waitForSqlServerSnapshot(String connectorName) throws IOException {
waitForSnapshot(connectorName, "debezium_sql_server_connector_metrics_snapshotcompleted");
}
/**
* Waits until snapshot phase of given MongoDB connector completes
* @param connectorName connector name
* @throws IOException on metric request error
*/
public void waitForMongoSnapshot(String connectorName) throws IOException {
waitForSnapshot(connectorName, "debezium_mongodb_connector_metrics_snapshotcompleted");
}
/**
* @return URL of Connect API endpoint
*/

View File

@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.core.ThrowingRunnable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
@ -116,13 +117,24 @@ public static void teardown() {
protected void assertTopicsExist(String... names) {
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> {
await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> {
Set<String> topics = consumer.listTopics().keySet();
assertThat(topics).contains(names);
});
}
}
protected void awaitAssert(long timeout, TimeUnit unit, ThrowingRunnable assertion) {
await()
.pollDelay(2, TimeUnit.SECONDS)
.atMost(timeout, unit)
.untilAsserted(assertion);
}
protected void awaitAssert(ThrowingRunnable assertion) {
awaitAssert(1, TimeUnit.MINUTES, assertion);
}
protected void assertRecordsCount(String topic, int count) {
try (Consumer<String, String> consumer = new KafkaConsumer<>(KAFKA_CONSUMER_PROPS)) {
consumer.subscribe(Collections.singleton(topic));

View File

@ -25,8 +25,9 @@
import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.databases.DatabaseController;
import io.debezium.testing.openshift.tools.databases.MySqlDeployer;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseClient;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseController;
import io.debezium.testing.openshift.tools.databases.mysql.MySqlDeployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import okhttp3.OkHttpClient;
@ -48,7 +49,7 @@ public class MySqlConnectorIT extends ConnectorTestBase {
public static final String CONNECTOR_NAME = "inventory-connector-mysql";
private static MySqlDeployer dbDeployer;
private static DatabaseController dbController;
private static SqlDatabaseController dbController;
private static OkHttpClient httpClient = new OkHttpClient();
private static ConnectorFactories connectorFactories = new ConnectorFactories();
private static ConnectorConfigBuilder connectorConfig;
@ -76,6 +77,12 @@ public static void tearDownDatabase() throws IOException, InterruptedException {
dbController.reload();
}
private void insertCustomer(String firstName, String lastName, String email) throws SQLException {
SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD);
String sql = "INSERT INTO customers VALUES (default, '" + firstName + "', '" + lastName + "', '" + email + "')";
client.execute("inventory", sql);
}
@Test
@Order(1)
public void shouldHaveRegisteredConnector() {
@ -106,37 +113,31 @@ public void shouldCreateKafkaTopics() {
@Order(3)
public void shouldSnapshotChanges() throws IOException {
kafkaConnectController.waitForMySqlSnapshot(connectorName);
assertRecordsCount(connectorName + ".inventory.customers", 4);
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 4));
}
@Test
@Order(4)
public void shouldStreamChanges() throws SQLException {
String sql = "INSERT INTO customers VALUES (default, 'Tom', 'Tester', 'tom@test.com')";
dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 5);
assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com");
insertCustomer("Tom", "Tester", "tom@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com"));
}
@Test
@Order(5)
public void shouldBeDown() throws SQLException, IOException {
kafkaConnectController.undeployConnector(connectorName);
String sql = "INSERT INTO customers VALUES (default, 'Jerry', 'Tester', 'jerry@test.com')";
dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 5);
insertCustomer("Jerry", "Tester", "jerry@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
}
@Test
@Order(6)
public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
kafkaConnectController.deployConnector(connectorName, connectorConfig);
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com"));
}
@Test
@ -144,9 +145,8 @@ public void shouldResumeStreamingAfterRedeployment() throws IOException, Interru
public void shouldBeDownAfterCrash() throws SQLException {
operatorController.disable();
kafkaConnectController.destroy();
String sql = "INSERT INTO customers VALUES (default, 'Nibbles', 'Tester', 'nibbles@test.com')";
dbController.executeStatement("inventory", DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 6);
insertCustomer("Nibbles", "Tester", "nibbles@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
}
@Test
@ -154,11 +154,8 @@ public void shouldBeDownAfterCrash() throws SQLException {
public void shouldResumeStreamingAfterCrash() throws InterruptedException {
operatorController.enable();
kafkaConnectController.waitForConnectCluster();
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7));
assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com");
awaitAssert(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com"));
}
}

View File

@ -26,8 +26,9 @@
import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.resources.ConfigProperties;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.databases.DatabaseController;
import io.debezium.testing.openshift.tools.databases.PostgreSqlDeployer;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseClient;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseController;
import io.debezium.testing.openshift.tools.databases.postgresql.PostgreSqlDeployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import okhttp3.OkHttpClient;
@ -49,7 +50,7 @@ public class PostgreSqlConnectorIT extends ConnectorTestBase {
public static final String CONNECTOR_NAME = "inventory-connector-postgresql";
private static PostgreSqlDeployer dbDeployer;
private static DatabaseController dbController;
private static SqlDatabaseController dbController;
private static OkHttpClient httpClient = new OkHttpClient();
private static ConnectorFactories connectorFactories = new ConnectorFactories();
private static ConnectorConfigBuilder connectorConfig;
@ -79,6 +80,12 @@ public static void tearDownDatabase() throws IOException, InterruptedException {
dbController.reload();
}
private void insertCustomer(String firstName, String lastName, String email) throws SQLException {
SqlDatabaseClient client = dbController.getDatabaseClient(DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD);
String sql = "INSERT INTO inventory.customers VALUES (default, '" + firstName + "', '" + lastName + "', '" + email + "')";
client.execute(DATABASE_POSTGRESQL_DBZ_DBNAME, sql);
}
@Test
@Order(1)
public void shouldHaveRegisteredConnector() {
@ -107,37 +114,31 @@ public void shouldCreateKafkaTopics() {
@Order(3)
public void shouldContainRecordsInCustomersTopic() throws IOException {
kafkaConnectController.waitForPostgreSqlSnapshot(connectorName);
assertRecordsCount(connectorName + ".inventory.customers", 4);
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 4));
}
@Test
@Order(4)
public void shouldStreamChanges() throws SQLException {
String sql = "INSERT INTO inventory.customers VALUES (default, 'Tom', 'Tester', 'tom@test.com')";
dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 5);
assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com");
insertCustomer("Tom", "Tester", "tom@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "tom@test.com"));
}
@Test
@Order(5)
public void shouldBeDown() throws SQLException, IOException {
kafkaConnectController.undeployConnector(connectorName);
String sql = "INSERT INTO inventory.customers VALUES (default, 'Jerry', 'Tester', 'jerry@test.com')";
dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 5);
insertCustomer("Jerry", "Tester", "jerry@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 5));
}
@Test
@Order(6)
public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
kafkaConnectController.deployConnector(connectorName, connectorConfig);
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "jerry@test.com"));
}
@Test
@ -145,9 +146,8 @@ public void shouldResumeStreamingAfterRedeployment() throws IOException, Interru
public void shouldBeDownAfterCrash() throws SQLException {
operatorController.disable();
kafkaConnectController.destroy();
String sql = "INSERT INTO inventory.customers VALUES (default, 'Nibbles', 'Tester', 'nibbles@test.com')";
dbController.executeStatement(DATABASE_POSTGRESQL_DBZ_DBNAME, DATABASE_POSTGRESQL_USERNAME, DATABASE_POSTGRESQL_PASSWORD, sql);
assertRecordsCount(connectorName + ".inventory.customers", 6);
insertCustomer("Nibbles", "Tester", "nibbles@test.com");
awaitAssert(() -> assertRecordsCount(connectorName + ".inventory.customers", 6));
}
@Test
@ -155,10 +155,7 @@ public void shouldBeDownAfterCrash() throws SQLException {
public void shouldResumeStreamingAfterCrash() throws InterruptedException {
operatorController.enable();
kafkaConnectController.waitForConnectCluster();
await()
.atMost(30, TimeUnit.SECONDS)
.pollInterval(5, TimeUnit.SECONDS)
.untilAsserted(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7));
assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com");
awaitAssert(() -> assertMinimalRecordsCount(connectorName + ".inventory.customers", 7));
awaitAssert(() -> assertRecordsContain(connectorName + ".inventory.customers", "nibbles@test.com"));
}
}

View File

@ -23,6 +23,8 @@ public class ConfigProperties {
public static final String OCP_PROJECT_DBZ = stringProperty("test.ocp.project.debezium");
public static final String OCP_PROJECT_MYSQL = System.getProperty("test.ocp.project.mysql", "debezium-mysql");
public static final String OCP_PROJECT_POSTGRESQL = System.getProperty("test.ocp.project.postgresql", "debezium-postgresql");
public static final String OCP_PROJECT_SQLSERVER = System.getProperty("test.ocp.project.sqlserver", "debezium-sqlserver");
public static final String OCP_PROJECT_MONGO = System.getProperty("test.ocp.project.mongo", "debezium-mongo");
public static final Optional<String> OCP_PULL_SECRET_PATHS = stringOptionalProperty("test.ocp.pull.secret.paths");
public static final boolean STRIMZI_OPERATOR_CONNECTORS = booleanProperty("test.strimzi.operator.connectors");
@ -42,6 +44,23 @@ public class ConfigProperties {
public static final String DATABASE_POSTGRESQL_DBZ_DBNAME = System.getProperty("test.database.postgresql.dbname", "debezium");
public static final Optional<String> DATABASE_POSTGRESQL_HOST = stringOptionalProperty("test.database.postgresql.host");
public static final int DATABASE_SQLSERVER_PORT = Integer.parseInt(System.getProperty("test.database.sqlserver.port", "1433"));
public static final String DATABASE_SQLSERVER_USERNAME = System.getProperty("test.database.sqlserver.username", "sa");
public static final String DATABASE_SQLSERVER_SA_PASSWORD = System.getProperty("test.database.sqlserver.password", "Debezium1$");
public static final String DATABASE_SQLSERVER_DBZ_USERNAME = System.getProperty("test.database.sqlserver.dbz.username", DATABASE_SQLSERVER_USERNAME);
public static final String DATABASE_SQLSERVER_DBZ_PASSWORD = System.getProperty("test.database.sqlserver.dbz.password", DATABASE_SQLSERVER_SA_PASSWORD);
public static final String DATABASE_SQLSERVER_DBZ_DBNAME = System.getProperty("test.database.sqlserver.dbname", "testDB");
public static final Optional<String> DATABASE_SQLSERVER_HOST = stringOptionalProperty("test.database.sqlserver.host");
public static final int DATABASE_MONGO_PORT = Integer.parseInt(System.getProperty("test.database.mongo.port", "27017"));
public static final String DATABASE_MONGO_USERNAME = System.getProperty("test.database.mongo.username", "admin");
public static final String DATABASE_MONGO_SA_PASSWORD = System.getProperty("test.database.mongo.password", "admin");
public static final String DATABASE_MONGO_DBZ_USERNAME = System.getProperty("test.database.mongo.dbz.username", "debezium");
public static final String DATABASE_MONGO_DBZ_PASSWORD = System.getProperty("test.database.mongo.dbz.password", "dbz");
public static final String DATABASE_MONGO_DBZ_DBNAME = System.getProperty("test.database.mongo.dbname", "inventory");
public static final String DATABASE_MONGO_DBZ_LOGIN_DBNAME = System.getProperty("test.database.mongo.dbz.login.dbname", "admin");
public static final Optional<String> DATABASE_MONGO_HOST = stringOptionalProperty("test.database.mongo.host");
private static boolean booleanProperty(String key) {
String value = System.getProperty(key);
if (value == null || value.isEmpty() || value.equalsIgnoreCase("false") || value.equalsIgnoreCase("0")) {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.testing.openshift.resources;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MONGO_HOST;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_MYSQL_HOST;
import static io.debezium.testing.openshift.resources.ConfigProperties.DATABASE_POSTGRESQL_HOST;
@ -48,4 +49,33 @@ public ConnectorConfigBuilder postgresql() {
.put("slot.name", "debezium")
.put("plugin.name", "pgoutput");
}
public ConnectorConfigBuilder sqlserver() {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder();
String dbHost = DATABASE_POSTGRESQL_HOST.orElse("sqlserver." + ConfigProperties.OCP_PROJECT_SQLSERVER + ".svc.cluster.local");
return cb
.put("connector.class", "io.debezium.connector.sqlserver.SqlServerConnector")
.put("task.max", 1)
.put("database.hostname", dbHost)
.put("database.port", ConfigProperties.DATABASE_SQLSERVER_PORT)
.put("database.user", ConfigProperties.DATABASE_SQLSERVER_DBZ_USERNAME)
.put("database.password", ConfigProperties.DATABASE_SQLSERVER_DBZ_PASSWORD)
.put("database.dbname", ConfigProperties.DATABASE_SQLSERVER_DBZ_DBNAME)
.put("database.server.name", "sqlserverdb") // this should be overwritten with unique name
.put("database.history.kafka.bootstrap.servers", "debezium-kafka-cluster-kafka-bootstrap." + ConfigProperties.OCP_PROJECT_DBZ + ".svc.cluster.local:9092")
.put("database.history.kafka.topic", "schema-changes.inventory");
}
public ConnectorConfigBuilder mongo() {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder();
String dbHost = DATABASE_MONGO_HOST.orElse("mongo." + ConfigProperties.OCP_PROJECT_MONGO + ".svc.cluster.local");
return cb
.put("connector.class", "io.debezium.connector.mongodb.MongoDbConnector")
.put("task.max", 1)
.put("mongodb.hosts", "rs0/" + dbHost + ":" + ConfigProperties.DATABASE_MONGO_PORT)
.put("mongodb.user", ConfigProperties.DATABASE_MONGO_DBZ_USERNAME)
.put("mongodb.password", ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD)
.put("mongodb.name", "mongodb") // this should be overwritten with unique name
.put("database.whitelist", ConfigProperties.DATABASE_MONGO_DBZ_DBNAME); // might want to change
}
}

View File

@ -55,6 +55,8 @@ pipeline {
env.OCP_PROJECT_DEBEZIUM = "debezium-${BUILD_NUMBER}"
env.OCP_PROJECT_MYSQL = "debezium-${BUILD_NUMBER}-mysql"
env.OCP_PROJECT_POSTGRESQL = "debezium-${BUILD_NUMBER}-postgresql"
env.OCP_PROJECT_SQLSERVER = "debezium-${BUILD_NUMBER}-sqlserver"
env.OCP_PROJECT_MONGO = "debezium-${BUILD_NUMBER}-mongo"
}
withCredentials([
usernamePassword(credentialsId: "${OCP_CREDENTIALS}", usernameVariable: 'OCP_USERNAME', passwordVariable: 'OCP_PASSWORD'),
@ -67,16 +69,20 @@ pipeline {
oc new-project ${OCP_PROJECT_DEBEZIUM}
oc new-project ${OCP_PROJECT_MYSQL}
oc new-project ${OCP_PROJECT_POSTGRESQL}
oc new-project ${OCP_PROJECT_SQLSERVER}
'''
sh '''
set -x
sed -i "s/namespace: .*/namespace: ${OCP_PROJECT_DEBEZIUM}/" strimzi/install/cluster-operator/*RoleBinding*.yaml
oc apply -f ${STRZ_RESOURCES} -n ${OCP_PROJECT_DEBEZIUM}
'''
sh '''
set -x
oc adm policy add-scc-to-user anyuid system:serviceaccount:${OCP_PROJECT_SQLSERVER}:default
oc adm policy add-scc-to-user anyuid system:serviceaccount:${OCP_PROJECT_MONGO}:default
'''
sh '''
set -x
docker login -u=${QUAY_USERNAME} -p=${QUAY_PASSWORD} quay.io
'''
}