DBZ-1722 Adding docs
This commit is contained in:
parent
711d55e836
commit
aa5a250a6b
@ -4,6 +4,7 @@ Addison Higham
|
||||
Adrian Kreuziger
|
||||
Akshath Patkar
|
||||
Alex Mansell
|
||||
Alex Soto
|
||||
Alexander Kovryga
|
||||
Alexander Schwartz
|
||||
Amit Sela
|
||||
|
@ -27,6 +27,7 @@
|
||||
** xref:integrations/serdes.adoc[Change Event SerDes]
|
||||
** xref:integrations/outbox.adoc[Outbox Quarkus Extension]
|
||||
** xref:integrations/cloudevents.adoc[CloudEvents]
|
||||
** xref:integrations/testcontainers.adoc[Integration Testing with Testcontainers]
|
||||
* Operations
|
||||
** xref:operations/logging.adoc[Logging]
|
||||
** xref:operations/monitoring.adoc[Monitoring]
|
||||
|
@ -0,0 +1,201 @@
|
||||
= Integration Testing with Testcontainers
|
||||
include::../_attributes.adoc[]
|
||||
:toc:
|
||||
:toc-placement: macro
|
||||
:linkattrs:
|
||||
:icons: font
|
||||
:source-highlighter: highlight.js
|
||||
|
||||
toc::[]
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive.
|
||||
Please let us know if you encounter any problems will using this extension.
|
||||
====
|
||||
|
||||
== Overview
|
||||
|
||||
When setting up change data capture pipelines with Debezium,
|
||||
it's a good idea to also have some automated testing in place, in order to make sure that
|
||||
|
||||
* the source database is set up so changes can be streamed off of it
|
||||
* your connectors are configured correctly
|
||||
|
||||
The Debezium extension for https://www.testcontainers.org/[Testcontainers] aims at simplying such tests,
|
||||
by running all the required infrastructure (Apache Kafka, Kafka Connect etc.)
|
||||
via Linux containers and making it easily accessible to Java-based tests.
|
||||
|
||||
It applies sensible defaults as far as possible
|
||||
(e.g. database credentials for connectors can be obtained from the configured database container),
|
||||
allowing you to focus on the essential logic of your tests.
|
||||
|
||||
== Getting Started
|
||||
|
||||
In order to use Debezium's Testcontainers integration, add the following dependency to your project:
|
||||
|
||||
[source,xml,subs="verbatim,attributes"]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>io.debezium</groupId>
|
||||
<artifactId>debezium-testing-testcontainers</artifactId>
|
||||
<version>{debezium-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>kafka</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Add the TC dependency matching your database -->
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka.
|
||||
|
||||
== Test Set-Up
|
||||
|
||||
When writing an integration test for a Debezium connector configuration,
|
||||
you'll also need to set up Apache Kafka and a database which should be the source of change events.
|
||||
The existing Testcontainers support for https://www.testcontainers.org/modules/kafka/[Apache Kafka] and https://www.testcontainers.org/modules/databases/[databases] can be used for that.
|
||||
|
||||
Together with Debezium's `DebeziumContainer` class, a typical set-up will look like this:
|
||||
|
||||
[source,java,indent=0]
|
||||
----
|
||||
public class DebeziumContainerTest {
|
||||
|
||||
private static Network network = Network.newNetwork(); // <1>
|
||||
|
||||
private static KafkaContainer kafkaContainer = new KafkaContainer()
|
||||
.withNetwork(network); // <2>
|
||||
|
||||
public static PostgreSQLContainer<?> postgresContainer =
|
||||
new PostgreSQLContainer<>("debezium/postgres:11")
|
||||
.withNetwork(network)
|
||||
.withNetworkAliases("postgres"); // <3>
|
||||
|
||||
public static DebeziumContainer debeziumContainer =
|
||||
new DebeziumContainer("{debezium-version}")
|
||||
.withNetwork(network)
|
||||
.withKafka(kafkaContainer)
|
||||
.dependsOn(kafkaContainer); // <4>
|
||||
|
||||
@BeforeClass
|
||||
public static void startContainers() { // <5>
|
||||
Startables.deepStart(Stream.of(
|
||||
kafkaContainer, postgresContainer, debeziumContainer))
|
||||
.join();
|
||||
}
|
||||
}
|
||||
----
|
||||
<1> Define a Docker network to be used by all the services
|
||||
<2> Set up a container for Apache Kafka
|
||||
<3> Set up a container for Postgres 11 (using Debezium's Postgres container image)
|
||||
<4> Set up a container for Kafka Connect with Debezium {debezium-version}
|
||||
<5> Start all three containers
|
||||
|
||||
== Test Implementation
|
||||
|
||||
Having declared all the required containers, you can now register an instance of the Debezium Postgres connector,
|
||||
insert some test data into Postgres
|
||||
and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic:
|
||||
|
||||
[source,java,indent=0]
|
||||
----
|
||||
@Test
|
||||
public void canRegisterPostgreSqlConnector() throws Exception {
|
||||
try (Connection connection = getConnection(postgresContainer);
|
||||
Statement statement = connection.createStatement();
|
||||
KafkaConsumer<String, String> consumer = getConsumer(
|
||||
kafkaContainer)) {
|
||||
|
||||
statement.execute("create schema todo"); // <1>
|
||||
statement.execute("create table todo.Todo (id int8 not null, " +
|
||||
"title varchar(255), primary key (id))");
|
||||
statement.execute("alter table todo.Todo replica identity full");
|
||||
statement.execute("insert into todo.Todo values (1, " +
|
||||
"'Learn CDC')");
|
||||
statement.execute("insert into todo.Todo values (2, " +
|
||||
"'Learn Debezium')");
|
||||
|
||||
ConnectorConfiguration connector = ConnectorConfiguration.forJdbcContainer(postgresContainer)
|
||||
.with("database.server.name", "dbserver1");
|
||||
|
||||
debeziumContainer.registerConnector("my-connector",
|
||||
connector); // <2>
|
||||
|
||||
consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));
|
||||
|
||||
List<ConsumerRecord<String, String>> changeEvents =
|
||||
drain(consumer, 2); // <3>
|
||||
|
||||
assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
|
||||
"$.id")).isEqualTo(1);
|
||||
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
|
||||
"$.op")).isEqualTo("r");
|
||||
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
|
||||
"$.after.title")).isEqualTo("Learn CDC");
|
||||
|
||||
assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
|
||||
"$.id")).isEqualTo(2);
|
||||
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
|
||||
"$.op")).isEqualTo("r");
|
||||
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
|
||||
"$.after.title")).isEqualTo("Learn Debezium");
|
||||
|
||||
consumer.unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
// Helper methods below
|
||||
|
||||
private Connection getConnection(
|
||||
PostgreSQLContainer<?> postgresContainer)
|
||||
throws SQLException {
|
||||
|
||||
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
|
||||
postgresContainer.getUsername(),
|
||||
postgresContainer.getPassword());
|
||||
}
|
||||
|
||||
private KafkaConsumer<String, String> getConsumer(
|
||||
KafkaContainer kafkaContainer) {
|
||||
|
||||
return new KafkaConsumer<>(
|
||||
ImmutableMap.of(
|
||||
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
kafkaContainer.getBootstrapServers(),
|
||||
ConsumerConfig.GROUP_ID_CONFIG,
|
||||
"tc-" + UUID.randomUUID(),
|
||||
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
"earliest"),
|
||||
new StringDeserializer(),
|
||||
new StringDeserializer());
|
||||
}
|
||||
|
||||
private List<ConsumerRecord<String, String>> drain(
|
||||
KafkaConsumer<String, String> consumer,
|
||||
int expectedRecordCount) {
|
||||
|
||||
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
|
||||
|
||||
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
|
||||
consumer.poll(Duration.ofMillis(50))
|
||||
.iterator()
|
||||
.forEachRemaining(allRecords::add);
|
||||
|
||||
return allRecords.size() == expectedRecordCount;
|
||||
});
|
||||
|
||||
return allRecords;
|
||||
}
|
||||
----
|
||||
<1> Create a table in the Postgres database and insert two records
|
||||
<2> Register an instance of the Debezium Postgres connector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container
|
||||
<3> Read two records from the change event topic in Kafka and assert their attributes
|
@ -14,7 +14,6 @@
|
||||
<name>Debezium Checkstyle Rules</name>
|
||||
<description>Contains the definitions for the Debezium commons code style and conventions</description>
|
||||
|
||||
|
||||
<properties>
|
||||
<!-- Instruct the build to use only UTF-8 encoding for source code -->
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
@ -23,7 +22,9 @@
|
||||
<version.jar.plugin>3.0.2</version.jar.plugin>
|
||||
<version.source.plugin>3.1.0</version.source.plugin>
|
||||
<version.checkstyle.plugin>3.0.0</version.checkstyle.plugin>
|
||||
<version.compiler.plugin>3.8.1</version.compiler.plugin>
|
||||
</properties>
|
||||
|
||||
<!--
|
||||
This module is referenced by Debezium's parent POM, so it needs to be built before the parent
|
||||
or any other module. Therefore, this module cannot reference the parent, and needs to define enough
|
||||
|
@ -19,6 +19,7 @@
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
|
||||
<version.jar.plugin>3.0.2</version.jar.plugin>
|
||||
<version.compiler.plugin>3.8.1</version.compiler.plugin>
|
||||
</properties>
|
||||
|
||||
<build>
|
||||
|
Loading…
Reference in New Issue
Block a user