diff --git a/debezium-testing/debezium-testing-system/pom.xml b/debezium-testing/debezium-testing-system/pom.xml index 27dbb064e..1832cc27d 100644 --- a/debezium-testing/debezium-testing-system/pom.xml +++ b/debezium-testing/debezium-testing-system/pom.xml @@ -22,6 +22,7 @@ 2.4.1.Final 1.1.0 0.1.0 + 2.6.0.Beta1 false true @@ -142,7 +143,7 @@ sonatype-snapshots - https://s01.oss.sonatype.org/content/repositories/snapshots/ + https://oss.sonatype.org/content/repositories/snapshots/ always @@ -202,6 +203,13 @@ zip ${version.debezium.connector} + + io.debezium + debezium-connector-jdbc + plugin + zip + ${version.debezium.connector} + io.debezium debezium-scripting @@ -258,6 +266,22 @@ kafka-clients + + org.apache.kafka + connect-api + + + + org.apache.kafka + connect-runtime + + + ch.qos.reload4j + reload4j + + + + io.strimzi api @@ -286,6 +310,16 @@ io.apicurio apicurio-registry-operator-api-model ${version.apicurio.model} + + + org.apache.logging.log4j + log4j-api + + + org.slf4j + slf4j-simple + + @@ -410,6 +444,14 @@ db2 + + io.debezium + debezium-connector-jdbc + test-jar + test + ${version.debezium.jdbc.tests} + + org.testcontainers oracle-xe diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java index 5da9feb7f..673e45674 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/artifacts/OcpArtifactServerDeployer.java @@ -75,6 +75,7 @@ public OcpArtifactServerDeployer( @Override public OcpArtifactServerController deploy() throws Exception { LOGGER.info("Deploying debezium artifact server"); + LOGGER.debug("Artifact server spec: \n" + deployment.getSpec().toString()); deployment = ocp.apps().deployments().inNamespace(project).createOrReplace(deployment); service.getMetadata().setLabels(deployment.getMetadata().getLabels()); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java index 250a233a5..c8741def4 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/KafkaController.java @@ -9,6 +9,9 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import java.util.Properties; @@ -57,4 +60,17 @@ default Properties getDefaultConsumerProperties() { return consumerProps; } + + /** + * @return default kafka producer configuration + */ + default Properties getDefaultProducerProperties() { + Properties producerProps = new Properties(); + producerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress()); + producerProps.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(ACKS_CONFIG, "all"); + + return producerProps; + } } diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java index 1cdf09592..fdfa1dcfb 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/OcpKafkaController.java @@ -13,6 +13,9 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; import java.io.File; import java.io.IOException; @@ -168,6 +171,25 @@ public Properties getDefaultConsumerProperties() { return kafkaConsumerProps; } + @Override + public Properties getDefaultProducerProperties() { + Properties kafkaProducerProps = new Properties(); + try { + kafkaProducerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, getKafkaCaCertificate().getAbsolutePath()); + } + catch (IOException e) { + throw new RuntimeException(e); + } + kafkaProducerProps.put(BOOTSTRAP_SERVERS_CONFIG, getPublicBootstrapAddress()); + kafkaProducerProps.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProducerProps.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + kafkaProducerProps.put(ACKS_CONFIG, "all"); + kafkaProducerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + kafkaProducerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM"); + return kafkaProducerProps; + } + + private File getKafkaCaCertificate() throws IOException { // get kafka cluster ca secret Secret secret = ocp.secrets().inNamespace(project).withName(KAFKA_CERT_SECRET).get(); diff --git a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java index bea5b4469..9564e8bd5 100644 --- a/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java +++ b/debezium-testing/debezium-testing-system/src/main/java/io/debezium/testing/system/tools/kafka/builders/FabricKafkaConnectBuilder.java @@ -90,7 +90,9 @@ public FabricKafkaConnectBuilder withBuild(OcpArtifactServerController artifactS artifactServer.createDebeziumPlugin("postgres"), artifactServer.createDebeziumPlugin("mongodb"), artifactServer.createDebeziumPlugin("sqlserver"), - artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc")))); + artifactServer.createDebeziumPlugin("db2", List.of("jdbc/jcc")), + // jdbc sink connector, not to be confused with the libraries stored in jdbc directory used in db2 and oracle connectors + artifactServer.createDebeziumPlugin("jdbc"))); if (ConfigProperties.DATABASE_ORACLE) { plugins.add( diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/assertions/JdbcAssertions.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/assertions/JdbcAssertions.java new file mode 100644 index 000000000..1eb374db2 --- /dev/null +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/assertions/JdbcAssertions.java @@ -0,0 +1,57 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.system.assertions; + +import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_MYSQL_PASSWORD; +import static io.debezium.testing.system.tools.ConfigProperties.DATABASE_MYSQL_USERNAME; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.SQLException; + +import io.debezium.testing.system.tools.databases.SqlDatabaseClient; +import io.debezium.testing.system.tools.databases.SqlDatabaseController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JdbcAssertions { + SqlDatabaseController databaseController; + Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class); + + public JdbcAssertions(SqlDatabaseController databaseController) { + this.databaseController = databaseController; + } + + public void assertRowsCount(int expectedCount, String table) throws SQLException { + SqlDatabaseClient client = databaseController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD); + String sql = "SELECT count(*) FROM " + table; + int databaseCount = client.executeQuery("inventory", sql, rs -> { + try { + rs.next(); + return rs.getInt(1); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + }); + assertThat(databaseCount).withFailMessage("Expecting table '%s' to have <%d> rows but it had <%d>.", table, expectedCount, databaseCount) + .isEqualTo(expectedCount); + } + + public void assertRowsContain(String table, String column, String content) throws SQLException { + SqlDatabaseClient client = databaseController.getDatabaseClient(DATABASE_MYSQL_USERNAME, DATABASE_MYSQL_PASSWORD); + String sql = String.format("SELECT * FROM %s WHERE %s = \"%s\"", table, column, content); + boolean containsContent = client.executeQuery("inventory", sql, rs -> { + try { + return rs.next(); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + }); + assertThat(containsContent).withFailMessage("Table '%s' does not contain row with column '%s' containing <%s>.", table, column, content).isTrue(); + } + +} diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/JdbcSinkConnector.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/JdbcSinkConnector.java new file mode 100644 index 000000000..95a199215 --- /dev/null +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/connectors/JdbcSinkConnector.java @@ -0,0 +1,31 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.system.fixtures.connectors; + +import org.junit.jupiter.api.extension.ExtensionContext; + +import io.debezium.testing.system.resources.ConnectorFactories; +import io.debezium.testing.system.tools.databases.mysql.MySqlController; +import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; +import io.debezium.testing.system.tools.kafka.KafkaConnectController; +import io.debezium.testing.system.tools.kafka.KafkaController; + +import fixture5.annotations.FixtureContext; + +@FixtureContext(requires = { KafkaController.class, KafkaConnectController.class, MySqlController.class }, provides = { ConnectorConfigBuilder.class }) +public class JdbcSinkConnector extends ConnectorFixture { + + private static final String CONNECTOR_NAME = "inventory-connector-jdbc-sink"; + + public JdbcSinkConnector(ExtensionContext.Store store) { + super(CONNECTOR_NAME, MySqlController.class, store); + } + + @Override + public ConnectorConfigBuilder connectorConfig(String connectorName) { + return new ConnectorFactories(kafkaController).jdbcSink(dbController, connectorName); + } +} diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/databases/ocp/OcpMySql.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/databases/ocp/OcpMySql.java index 6d23e503c..0cac5400d 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/databases/ocp/OcpMySql.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/fixtures/databases/ocp/OcpMySql.java @@ -5,6 +5,7 @@ */ package io.debezium.testing.system.fixtures.databases.ocp; +import io.debezium.testing.system.assertions.JdbcAssertions; import org.junit.jupiter.api.extension.ExtensionContext; import io.debezium.testing.system.tools.ConfigProperties; @@ -14,7 +15,7 @@ import fixture5.annotations.FixtureContext; -@FixtureContext(requires = { OpenShiftClient.class }, provides = { MySqlController.class }) +@FixtureContext(requires = { OpenShiftClient.class }, provides = { MySqlController.class, JdbcAssertions.class }) public class OcpMySql extends OcpDatabaseFixture { public static final String DB_DEPLOYMENT_PATH = "/database-resources/mysql/master/master-deployment.yaml"; @@ -25,6 +26,12 @@ public OcpMySql(ExtensionContext.Store store) { super(MySqlController.class, store); } + @Override + public void setup() throws Exception { + super.setup(); + store(JdbcAssertions.class, new JdbcAssertions(dbController)); + } + @Override protected MySqlController databaseController() throws Exception { Class.forName("com.mysql.cj.jdbc.Driver"); diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java index f6ae07187..4c4480ef2 100644 --- a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/resources/ConnectorFactories.java @@ -171,4 +171,22 @@ public ConnectorConfigBuilder oracle(SqlDatabaseController controller, String co .put("log.mining.strategy", "online_catalog") .addOperationRouterForTable("u", "CUSTOMERS"); } + + public ConnectorConfigBuilder jdbcSink(SqlDatabaseController controller, String connectorName) { + ConnectorConfigBuilder cb = new ConnectorConfigBuilder(connectorName); + String dbHost = controller.getDatabaseHostname(); + int dbPort = controller.getDatabasePort(); + String connectionUrl = String.format("jdbc:mysql://%s:%s/inventory", dbHost, dbPort); + return cb + .put("connector.class", "io.debezium.connector.jdbc.JdbcSinkConnector") + .put("task.max", 1) + .put("connection.url", connectionUrl) + .put("connection.username", ConfigProperties.DATABASE_MYSQL_DBZ_USERNAME) + .put("connection.password", ConfigProperties.DATABASE_MYSQL_DBZ_PASSWORD) + .put("insert.mode", "upsert") + .put("primary.key.mode", "kafka") + .put("schema.evolution", "basic") + .put("database.time_zone", "UTC") + .put("topics", "jdbc_sink_test"); + } } diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/JdbcSinkTests.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/JdbcSinkTests.java new file mode 100644 index 000000000..33bff886a --- /dev/null +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/JdbcSinkTests.java @@ -0,0 +1,149 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.system.tests.jdbc.sink; + +import static io.debezium.testing.system.assertions.KafkaAssertions.awaitAssert; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; + +import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory; +import io.debezium.connector.jdbc.util.SinkRecordBuilder; +import io.debezium.testing.system.assertions.JdbcAssertions; +import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; +import io.debezium.testing.system.tools.kafka.KafkaConnectController; +import io.debezium.testing.system.tools.kafka.KafkaController; + +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class JdbcSinkTests { + protected final KafkaController kafkaController; + protected final KafkaConnectController connectController; + protected final JdbcAssertions assertions; + protected ConnectorConfigBuilder connectorConfig; + protected Producer kafkaProducer; + Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class); + + public JdbcSinkTests(KafkaController kafkaController, + KafkaConnectController connectController, + JdbcAssertions assertions, + ConnectorConfigBuilder connectorConfig) { + this.kafkaController = kafkaController; + this.connectController = connectController; + this.assertions = assertions; + this.connectorConfig = connectorConfig; + this.kafkaProducer = new KafkaProducer<>(kafkaController.getDefaultProducerProperties()); + } + + private void produceRecordToTopic(String topic, String fieldName, String fieldValue) { + String kafkaRecord = createRecord(fieldName, fieldValue); + LOGGER.info("Producing record to topic {}", topic); + LOGGER.debug(kafkaRecord); + ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaRecord); + kafkaProducer.send(producerRecord); + } + + private String createRecord(String fieldName, String fieldValue) { + DebeziumSinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + SinkRecord record = SinkRecordBuilder.update() //TODO: Change to create when fixed in JDBC connector testsuite + .flat(false) + .name("jdbc-connector-test") + .recordSchema(SchemaBuilder.struct().field(fieldName, Schema.STRING_SCHEMA).build()) + .sourceSchema(factory.basicSourceSchema()) + .after(fieldName, fieldValue) + .before(fieldName, fieldValue) + .source("ts_ms", (int) Instant.now().getEpochSecond()).build(); + byte[] recordInBytes; + try (JsonConverter converter = new JsonConverter()) { + Map config = new HashMap<>(); + config.put("converter.type", "value"); + converter.configure(config); + recordInBytes = converter.fromConnectData(null, record.valueSchema(), record.value()); + } + return new String(recordInBytes, StandardCharsets.UTF_8); + } + + @Test + @Order(10) + public void shouldHaveRegisteredConnector() { + + Request r = new Request.Builder().url(connectController.getApiURL().resolve("/connectors")).build(); + + awaitAssert(() -> { + try (Response res = new OkHttpClient().newCall(r).execute()) { + assertThat(res.body().string()).contains(connectorConfig.getConnectorName()); + } + }); + } + + @Test + @Order(20) + public void shouldStreamChanges() { + String topic = connectorConfig.getAsString("topics"); + produceRecordToTopic(topic, "name", "Jerry"); + + awaitAssert(() -> assertions.assertRowsCount(1, topic)); + awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Jerry")); + } + + @Test + @Order(30) + public void shouldBeDown() throws Exception { + String topic = connectorConfig.getAsString("topics"); + connectController.undeployConnector(connectorConfig.getConnectorName()); + produceRecordToTopic(topic, "name", "Nibbles"); + + awaitAssert(() -> assertions.assertRowsCount(1, topic)); + } + + @Test + @Order(40) + public void shouldResumeStreamingAfterRedeployment() throws Exception { + connectController.deployConnector(connectorConfig); + + String topic = connectorConfig.getAsString("topics"); + awaitAssert(() -> assertions.assertRowsCount(2, topic)); + awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Nibbles")); + } + + @Test + @Order(50) + public void shouldBeDownAfterCrash() { + connectController.destroy(); + String topic = connectorConfig.getAsString("topics"); + produceRecordToTopic(topic, "name", "Larry"); + + awaitAssert(() -> assertions.assertRowsCount(2, topic)); + } + + @Test + @Order(60) + public void shouldResumeStreamingAfterCrash() throws InterruptedException { + connectController.restore(); + + String topic = connectorConfig.getAsString("topics"); + awaitAssert(() -> assertions.assertRowsCount(3, topic)); + awaitAssert(() -> assertions.assertRowsContain(topic, "name", "Larry")); + } +} diff --git a/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/OcpJdbcSinkConnectorIT.java b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/OcpJdbcSinkConnectorIT.java new file mode 100644 index 000000000..4fb62ffc2 --- /dev/null +++ b/debezium-testing/debezium-testing-system/src/test/java/io/debezium/testing/system/tests/jdbc/sink/OcpJdbcSinkConnectorIT.java @@ -0,0 +1,45 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.system.tests.jdbc.sink; + +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.debezium.testing.system.assertions.JdbcAssertions; +import io.debezium.testing.system.fixtures.OcpClient; +import io.debezium.testing.system.fixtures.connectors.JdbcSinkConnector; +import io.debezium.testing.system.fixtures.databases.ocp.OcpMySql; +import io.debezium.testing.system.fixtures.kafka.OcpKafka; +import io.debezium.testing.system.fixtures.operator.OcpStrimziOperator; +import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder; +import io.debezium.testing.system.tools.kafka.KafkaConnectController; +import io.debezium.testing.system.tools.kafka.KafkaController; + +import fixture5.FixtureExtension; +import fixture5.annotations.Fixture; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@Tag("acceptance") +@Tag("sink") +@Tag("openshift") +@Fixture(OcpClient.class) +@Fixture(OcpStrimziOperator.class) +@Fixture(OcpKafka.class) +@Fixture(OcpMySql.class) +@Fixture(JdbcSinkConnector.class) +@ExtendWith(FixtureExtension.class) +public class OcpJdbcSinkConnectorIT extends JdbcSinkTests { + + public OcpJdbcSinkConnectorIT( + KafkaController kafkaController, + KafkaConnectController connectController, + JdbcAssertions assertions, + ConnectorConfigBuilder connectorConfig) { + super(kafkaController, connectController, assertions, connectorConfig); + } +}