diff --git a/COPYRIGHT.txt b/COPYRIGHT.txt index 773c73fc0..64d6f2324 100644 --- a/COPYRIGHT.txt +++ b/COPYRIGHT.txt @@ -4,6 +4,7 @@ Addison Higham Adrian Kreuziger Akshath Patkar Alex Mansell +Alex Soto Alexander Kovryga Alexander Schwartz Amit Sela diff --git a/documentation/modules/ROOT/nav.adoc b/documentation/modules/ROOT/nav.adoc index c0d6a131e..34fb30121 100644 --- a/documentation/modules/ROOT/nav.adoc +++ b/documentation/modules/ROOT/nav.adoc @@ -27,6 +27,7 @@ ** xref:integrations/serdes.adoc[Change Event SerDes] ** xref:integrations/outbox.adoc[Outbox Quarkus Extension] ** xref:integrations/cloudevents.adoc[CloudEvents] +** xref:integrations/testcontainers.adoc[Integration Testing with Testcontainers] * Operations ** xref:operations/logging.adoc[Logging] ** xref:operations/monitoring.adoc[Monitoring] diff --git a/documentation/modules/ROOT/pages/integrations/testcontainers.adoc b/documentation/modules/ROOT/pages/integrations/testcontainers.adoc new file mode 100644 index 000000000..86ec25e7a --- /dev/null +++ b/documentation/modules/ROOT/pages/integrations/testcontainers.adoc @@ -0,0 +1,201 @@ += Integration Testing with Testcontainers +include::../_attributes.adoc[] +:toc: +:toc-placement: macro +:linkattrs: +:icons: font +:source-highlighter: highlight.js + +toc::[] + +[NOTE] +==== +This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive. +Please let us know if you encounter any problems will using this extension. +==== + +== Overview + +When setting up change data capture pipelines with Debezium, +it's a good idea to also have some automated testing in place, in order to make sure that + +* the source database is set up so changes can be streamed off of it +* your connectors are configured correctly + +The Debezium extension for https://www.testcontainers.org/[Testcontainers] aims at simplying such tests, +by running all the required infrastructure (Apache Kafka, Kafka Connect etc.) +via Linux containers and making it easily accessible to Java-based tests. + +It applies sensible defaults as far as possible +(e.g. database credentials for connectors can be obtained from the configured database container), +allowing you to focus on the essential logic of your tests. + +== Getting Started + +In order to use Debezium's Testcontainers integration, add the following dependency to your project: + +[source,xml,subs="verbatim,attributes"] +---- + + io.debezium + debezium-testing-testcontainers + {debezium-version} + test + + + org.testcontainers + kafka + test + + + + + org.testcontainers + postgresql + test + +---- + +Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka. + +== Test Set-Up + +When writing an integration test for a Debezium connector configuration, +you'll also need to set up Apache Kafka and a database which should be the source of change events. +The existing Testcontainers support for https://www.testcontainers.org/modules/kafka/[Apache Kafka] and https://www.testcontainers.org/modules/databases/[databases] can be used for that. + +Together with Debezium's `DebeziumContainer` class, a typical set-up will look like this: + +[source,java,indent=0] +---- +public class DebeziumContainerTest { + + private static Network network = Network.newNetwork(); // <1> + + private static KafkaContainer kafkaContainer = new KafkaContainer() + .withNetwork(network); // <2> + + public static PostgreSQLContainer postgresContainer = + new PostgreSQLContainer<>("debezium/postgres:11") + .withNetwork(network) + .withNetworkAliases("postgres"); // <3> + + public static DebeziumContainer debeziumContainer = + new DebeziumContainer("{debezium-version}") + .withNetwork(network) + .withKafka(kafkaContainer) + .dependsOn(kafkaContainer); // <4> + + @BeforeClass + public static void startContainers() { // <5> + Startables.deepStart(Stream.of( + kafkaContainer, postgresContainer, debeziumContainer)) + .join(); + } +} +---- +<1> Define a Docker network to be used by all the services +<2> Set up a container for Apache Kafka +<3> Set up a container for Postgres 11 (using Debezium's Postgres container image) +<4> Set up a container for Kafka Connect with Debezium {debezium-version} +<5> Start all three containers + +== Test Implementation + +Having declared all the required containers, you can now register an instance of the Debezium Postgres connector, +insert some test data into Postgres +and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic: + +[source,java,indent=0] +---- +@Test +public void canRegisterPostgreSqlConnector() throws Exception { + try (Connection connection = getConnection(postgresContainer); + Statement statement = connection.createStatement(); + KafkaConsumer consumer = getConsumer( + kafkaContainer)) { + + statement.execute("create schema todo"); // <1> + statement.execute("create table todo.Todo (id int8 not null, " + + "title varchar(255), primary key (id))"); + statement.execute("alter table todo.Todo replica identity full"); + statement.execute("insert into todo.Todo values (1, " + + "'Learn CDC')"); + statement.execute("insert into todo.Todo values (2, " + + "'Learn Debezium')"); + + ConnectorConfiguration connector = ConnectorConfiguration.forJdbcContainer(postgresContainer) + .with("database.server.name", "dbserver1"); + + debeziumContainer.registerConnector("my-connector", + connector); // <2> + + consumer.subscribe(Arrays.asList("dbserver1.todo.todo")); + + List> changeEvents = + drain(consumer, 2); // <3> + + assertThat(JsonPath. read(changeEvents.get(0).key(), + "$.id")).isEqualTo(1); + assertThat(JsonPath. read(changeEvents.get(0).value(), + "$.op")).isEqualTo("r"); + assertThat(JsonPath. read(changeEvents.get(0).value(), + "$.after.title")).isEqualTo("Learn CDC"); + + assertThat(JsonPath. read(changeEvents.get(1).key(), + "$.id")).isEqualTo(2); + assertThat(JsonPath. read(changeEvents.get(1).value(), + "$.op")).isEqualTo("r"); + assertThat(JsonPath. read(changeEvents.get(1).value(), + "$.after.title")).isEqualTo("Learn Debezium"); + + consumer.unsubscribe(); + } +} + +// Helper methods below + +private Connection getConnection( + PostgreSQLContainer postgresContainer) + throws SQLException { + + return DriverManager.getConnection(postgresContainer.getJdbcUrl(), + postgresContainer.getUsername(), + postgresContainer.getPassword()); +} + +private KafkaConsumer getConsumer( + KafkaContainer kafkaContainer) { + + return new KafkaConsumer<>( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + kafkaContainer.getBootstrapServers(), + ConsumerConfig.GROUP_ID_CONFIG, + "tc-" + UUID.randomUUID(), + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "earliest"), + new StringDeserializer(), + new StringDeserializer()); +} + +private List> drain( + KafkaConsumer consumer, + int expectedRecordCount) { + + List> allRecords = new ArrayList<>(); + + Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { + consumer.poll(Duration.ofMillis(50)) + .iterator() + .forEachRemaining(allRecords::add); + + return allRecords.size() == expectedRecordCount; + }); + + return allRecords; +} +---- +<1> Create a table in the Postgres database and insert two records +<2> Register an instance of the Debezium Postgres connector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container +<3> Read two records from the change event topic in Kafka and assert their attributes diff --git a/support/checkstyle/pom.xml b/support/checkstyle/pom.xml index 1f9e8e4ef..1a5efb005 100644 --- a/support/checkstyle/pom.xml +++ b/support/checkstyle/pom.xml @@ -1,19 +1,18 @@ - - org.jboss - jboss-parent - 35 - - - - 4.0.0 - io.debezium - debezium-checkstyle - 1.1.0-SNAPSHOT - Debezium Checkstyle Rules - Contains the definitions for the Debezium commons code style and conventions + + org.jboss + jboss-parent + 35 + + + 4.0.0 + io.debezium + debezium-checkstyle + 1.1.0-SNAPSHOT + Debezium Checkstyle Rules + Contains the definitions for the Debezium commons code style and conventions @@ -23,13 +22,15 @@ 3.0.2 3.1.0 3.0.0 + 3.8.1 - - + + + org.apache.maven.plugins diff --git a/support/ide-configs/pom.xml b/support/ide-configs/pom.xml index 3f23c9b65..7e330fe13 100644 --- a/support/ide-configs/pom.xml +++ b/support/ide-configs/pom.xml @@ -19,6 +19,7 @@ UTF-8 3.0.2 + 3.8.1