DBZ-7592 Add JDBC sink system tests

This commit is contained in:
Jiri Novotny 2024-03-07 14:11:21 +01:00 committed by Ondrej Babec
parent 8d11b39c47
commit ab8bae5529
11 changed files with 393 additions and 3 deletions

View File

@ -22,6 +22,7 @@
<version.apicurio>2.4.1.Final</version.apicurio>
<version.apicurio.model>1.1.0</version.apicurio.model>
<version.fixture5>0.1.0</version.fixture5>
<version.debezium.jdbc.tests>2.6.0.Beta1</version.debezium.jdbc.tests>
<product.build>false</product.build>
<prepare.strimzi>true</prepare.strimzi>
@ -142,7 +143,7 @@
<repositories>
<repository>
<id>sonatype-snapshots</id>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<updatePolicy>always</updatePolicy>
</snapshots>
@ -202,6 +203,13 @@
<type>zip</type>
<version>${version.debezium.connector}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-jdbc</artifactId>
<classifier>plugin</classifier>
<type>zip</type>
<version>${version.debezium.connector}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-scripting</artifactId>
@ -258,6 +266,22 @@
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.reload4j</groupId>
<artifactId>reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>api</artifactId>
@ -286,6 +310,16 @@
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-operator-api-model</artifactId>
<version>${version.apicurio.model}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -410,6 +444,14 @@
<artifactId>db2</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-jdbc</artifactId>
<type>test-jar</type>
<scope>test</scope>
<version>${version.debezium.jdbc.tests}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>

View File

@ -75,6 +75,7 @@ public OcpArtifactServerDeployer(
@Override
public OcpArtifactServerController deploy() throws Exception {
LOGGER.info("Deploying debezium artifact server");
LOGGER.debug("Artifact server spec: \n" + deployment.getSpec().toString());
deployment = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment);
service.getMetadata().setLabels(deployment.getMetadata().getLabels());

View File

@ -9,6 +9,9 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import java.util.Properties;
@ -57,4 +60,17 @@ default Properties getDefaultConsumerProperties() {
return consumerProps;
}
/**
* @return default kafka producer configuration
*/
default Properties getDefaultProducerProperties() {
Properties producerProps = new Properties();
producerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress());
producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ACKS_CONFIG, "all");
return producerProps;
}
}

View File

@ -13,6 +13,9 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import java.io.File;
import java.io.IOException;
@ -168,6 +171,25 @@ public Properties getDefaultConsumerProperties() {
return kafkaConsumerProps;
}
@Override
public Properties getDefaultProducerProperties() {
Properties kafkaProducerProps = new Properties();
try {
kafkaProducerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getKafkaCaCertificate().getAbsolutePath());
}
catch (IOException e) {
throw new RuntimeException(e);
}
kafkaProducerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress());
kafkaProducerProps.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducerProps.put(ACKS_CONFIG, "all");
kafkaProducerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
kafkaProducerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
return kafkaProducerProps;
}
private File getKafkaCaCertificate() throws IOException {
// get kafka cluster ca secret
Secret secret = ocp.secrets().inNamespace(project).withName(KAFKA_CERT_SECRET).get();

View File

@ -90,7 +90,9 @@ public FabricKafkaConnectBuilder withBuild(OcpArtifactServerController artifactS
artifactServer.createDebeziumPlugin("postgres"),
artifactServer.createDebeziumPlugin("mongodb"),
artifactServer.createDebeziumPlugin("sqlserver"),
artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc"))));
artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc")),
// jdbc sink connector, not to be confused with the libraries stored in jdbc directory used in db2 and oracle connectors
artifactServer.createDebeziumPlugin("jdbc")));
if (ConfigProperties.DATABASE_ORACLE) {
plugins.add(

View File

@ -0,0 +1,57 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.assertions;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_MYSQL_PASSWORD;
import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_MYSQL_USERNAME;
import static org.assertj.core.api.Assertions.assertThat;
import java.sql.SQLException;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JdbcAssertions {
SqlDatabaseController databaseController;
Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class);
public JdbcAssertions(SqlDatabaseController databaseController) {
this.databaseController = databaseController;
}
public void assertRowsCount(int expectedCount, String table) throws SQLException {
SqlDatabaseClient client = databaseController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD);
String sql = "SELECT count(*) FROM " + table;
int databaseCount = client.executeQuery("inventory", sql, rs -> {
try {
rs.next();
return rs.getInt(1);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
});
assertThat(databaseCount).withFailMessage("Expecting table '%s' to have <%d> rows but it had <%d>.", table, expectedCount, databaseCount)
.isEqualTo(expectedCount);
}
public void assertRowsContain(String table, String column, String content) throws SQLException {
SqlDatabaseClient client = databaseController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD);
String sql = String.format("SELECT * FROM %s WHERE %s = \"%s\"", table, column, content);
boolean containsContent = client.executeQuery("inventory", sql, rs -> {
try {
return rs.next();
}
catch (SQLException e) {
throw new RuntimeException(e);
}
});
assertThat(containsContent).withFailMessage("Table '%s' does not contain row with column '%s' containing <%s>.", table, column, content).isTrue();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.fixtures.connectors;
import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.resources.ConnectorFactories;
import io.debezium.testing.system.tools.databases.mysql.MySqlController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import fixture5.annotations.FixtureContext;
@FixtureContext(requires = { KafkaController.class, KafkaConnectController.class, MySqlController.class }, provides = { ConnectorConfigBuilder.class })
public class JdbcSinkConnector extends ConnectorFixture<MySqlController> {
private static final String CONNECTOR_NAME = "inventory-connector-jdbc-sink";
public JdbcSinkConnector(ExtensionContext.Store store) {
super(CONNECTOR_NAME, MySqlController.class, store);
}
@Override
public ConnectorConfigBuilder connectorConfig(String connectorName) {
return new ConnectorFactories(kafkaController).jdbcSink(dbController, connectorName);
}
}

View File

@ -5,6 +5,7 @@
*/
package io.debezium.testing.system.fixtures.databases.ocp;
import io.debezium.testing.system.assertions.JdbcAssertions;
import org.junit.jupiter.api.extension.ExtensionContext;
import io.debezium.testing.system.tools.ConfigProperties;
@ -14,7 +15,7 @@
import fixture5.annotations.FixtureContext;
@FixtureContext(requires = { OpenShiftClient.class }, provides = { MySqlController.class })
@FixtureContext(requires = { OpenShiftClient.class }, provides = { MySqlController.class, JdbcAssertions.class })
public class OcpMySql extends OcpDatabaseFixture<MySqlController> {
public static final String DB_DEPLOYMENT_PATH = "/database-resources/mysql/master/master-deployment.yaml";
@ -25,6 +26,12 @@ public OcpMySql(ExtensionContext.Store store) {
super(MySqlController.class, store);
}
@Override
public void setup() throws Exception {
super.setup();
store(JdbcAssertions.class, new JdbcAssertions(dbController));
}
@Override
protected MySqlController databaseController() throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");

View File

@ -171,4 +171,22 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co
.put("log.mining.strategy", "online_catalog")
.addOperationRouterForTable("u", "CUSTOMERS");
}
public ConnectorConfigBuilder jdbcSink(SqlDatabaseController controller, String connectorName) {
ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName);
String dbHost = controller.getDatabaseHostname();
int dbPort = controller.getDatabasePort();
String connectionUrl = String.format("jdbc:mysql://%s:%s/inventory", dbHost, dbPort);
return cb
.put("connector.class", "io.debezium.connector.jdbc.JdbcSinkConnector")
.put("task.max", 1)
.put("connection.url", connectionUrl)
.put("connection.username", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME)
.put("connection.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD)
.put("insert.mode", "upsert")
.put("primary.key.mode", "kafka")
.put("schema.evolution", "basic")
.put("database.time_zone", "UTC")
.put("topics", "jdbc_sink_test");
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tests.jdbc.sink;
import static io.debezium.testing.system.assertions.KafkaAssertions.awaitAssert;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory;
import io.debezium.connector.jdbc.util.SinkRecordBuilder;
import io.debezium.testing.system.assertions.JdbcAssertions;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class JdbcSinkTests {
protected final KafkaController kafkaController;
protected final KafkaConnectController connectController;
protected final JdbcAssertions assertions;
protected ConnectorConfigBuilder connectorConfig;
protected Producer<String, String> kafkaProducer;
Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class);
public JdbcSinkTests(KafkaController kafkaController,
KafkaConnectController connectController,
JdbcAssertions assertions,
ConnectorConfigBuilder connectorConfig) {
this.kafkaController = kafkaController;
this.connectController = connectController;
this.assertions = assertions;
this.connectorConfig = connectorConfig;
this.kafkaProducer = new KafkaProducer<>(kafkaController.getDefaultProducerProperties());
}
private void produceRecordToTopic(String topic, String fieldName, String fieldValue) {
String kafkaRecord = createRecord(fieldName, fieldValue);
LOGGER.info("Producing record to topic {}", topic);
LOGGER.debug(kafkaRecord);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, kafkaRecord);
kafkaProducer.send(producerRecord);
}
private String createRecord(String fieldName, String fieldValue) {
DebeziumSinkRecordFactory factory = new DebeziumSinkRecordFactory();
SinkRecord record = SinkRecordBuilder.update() //TODO: Change to create when fixed in JDBC connector testsuite
.flat(false)
.name("jdbc-connector-test")
.recordSchema(SchemaBuilder.struct().field(fieldName, Schema.STRING_SCHEMA).build())
.sourceSchema(factory.basicSourceSchema())
.after(fieldName, fieldValue)
.before(fieldName, fieldValue)
.source("ts_ms", (int) Instant.now().getEpochSecond()).build();
byte[] recordInBytes;
try (JsonConverter converter = new JsonConverter()) {
Map<String, String> config = new HashMap<>();
config.put("converter.type", "value");
converter.configure(config);
recordInBytes = converter.fromConnectData(null, record.valueSchema(), record.value());
}
return new String(recordInBytes, StandardCharsets.UTF_8);
}
@Test
@Order(10)
public void shouldHaveRegisteredConnector() {
Request r = new Request.Builder().url(connectController.getApiURL().resolve("/connectors")).build();
awaitAssert(() -> {
try (Response res = new OkHttpClient().newCall(r).execute()) {
assertThat(res.body().string()).contains(connectorConfig.getConnectorName());
}
});
}
@Test
@Order(20)
public void shouldStreamChanges() {
String topic = connectorConfig.getAsString("topics");
produceRecordToTopic(topic, "name", "Jerry");
awaitAssert(() -> assertions.assertRowsCount(1, topic));
awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Jerry"));
}
@Test
@Order(30)
public void shouldBeDown() throws Exception {
String topic = connectorConfig.getAsString("topics");
connectController.undeployConnector(connectorConfig.getConnectorName());
produceRecordToTopic(topic, "name", "Nibbles");
awaitAssert(() -> assertions.assertRowsCount(1, topic));
}
@Test
@Order(40)
public void shouldResumeStreamingAfterRedeployment() throws Exception {
connectController.deployConnector(connectorConfig);
String topic = connectorConfig.getAsString("topics");
awaitAssert(() -> assertions.assertRowsCount(2, topic));
awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Nibbles"));
}
@Test
@Order(50)
public void shouldBeDownAfterCrash() {
connectController.destroy();
String topic = connectorConfig.getAsString("topics");
produceRecordToTopic(topic, "name", "Larry");
awaitAssert(() -> assertions.assertRowsCount(2, topic));
}
@Test
@Order(60)
public void shouldResumeStreamingAfterCrash() throws InterruptedException {
connectController.restore();
String topic = connectorConfig.getAsString("topics");
awaitAssert(() -> assertions.assertRowsCount(3, topic));
awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Larry"));
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.system.tests.jdbc.sink;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import io.debezium.testing.system.assertions.JdbcAssertions;
import io.debezium.testing.system.fixtures.OcpClient;
import io.debezium.testing.system.fixtures.connectors.JdbcSinkConnector;
import io.debezium.testing.system.fixtures.databases.ocp.OcpMySql;
import io.debezium.testing.system.fixtures.kafka.OcpKafka;
import io.debezium.testing.system.fixtures.operator.OcpStrimziOperator;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import fixture5.FixtureExtension;
import fixture5.annotations.Fixture;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Tag("acceptance")
@Tag("sink")
@Tag("openshift")
@Fixture(OcpClient.class)
@Fixture(OcpStrimziOperator.class)
@Fixture(OcpKafka.class)
@Fixture(OcpMySql.class)
@Fixture(JdbcSinkConnector.class)
@ExtendWith(FixtureExtension.class)
public class OcpJdbcSinkConnectorIT extends JdbcSinkTests {
public OcpJdbcSinkConnectorIT(
KafkaController kafkaController,
KafkaConnectController connectController,
JdbcAssertions assertions,
ConnectorConfigBuilder connectorConfig) {
super(kafkaController, connectController, assertions, connectorConfig);
}
}