4d15c40e89
The container needs to be marked as a compatible substitute for the official postgres image. Otherwise, with recent versions of testcontainers, the postgres container will fail to initialise with: `Failed to verify that image 'debezium/postgres:11' is a compatible substitute for 'postgres'.`.
206 lines
7.7 KiB
Plaintext
206 lines
7.7 KiB
Plaintext
[id="integration-testing-with-testcontainers"]
|
|
= Integration Testing with Testcontainers
|
|
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
[NOTE]
|
|
====
|
|
This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive.
|
|
Please let us know if you encounter any problems will using this extension.
|
|
====
|
|
|
|
== Overview
|
|
|
|
When setting up change data capture pipelines with {prodname},
|
|
it's a good idea to also have some automated testing in place, in order to make sure that
|
|
|
|
* the source database is set up so changes can be streamed off of it
|
|
* your connectors are configured correctly
|
|
|
|
The {prodname} extension for https://www.testcontainers.org/[Testcontainers] aims at simplying such tests,
|
|
by running all the required infrastructure (Apache Kafka, Kafka Connect etc.)
|
|
via Linux containers and making it easily accessible to Java-based tests.
|
|
|
|
It applies sensible defaults as far as possible
|
|
(e.g. database credentials for connectors can be obtained from the configured database container),
|
|
allowing you to focus on the essential logic of your tests.
|
|
|
|
== Getting Started
|
|
|
|
In order to use {prodname}'s Testcontainers integration, add the following dependency to your project:
|
|
|
|
[source,xml,subs="verbatim,attributes"]
|
|
----
|
|
<dependency>
|
|
<groupId>io.debezium</groupId>
|
|
<artifactId>debezium-testing-testcontainers</artifactId>
|
|
<version>{debezium-version}</version>
|
|
<scope>test</scope>
|
|
</dependency>
|
|
<dependency>
|
|
<groupId>org.testcontainers</groupId>
|
|
<artifactId>kafka</artifactId>
|
|
<scope>test</scope>
|
|
</dependency>
|
|
|
|
<!-- Add the TC dependency matching your database -->
|
|
<dependency>
|
|
<groupId>org.testcontainers</groupId>
|
|
<artifactId>postgresql</artifactId>
|
|
<scope>test</scope>
|
|
</dependency>
|
|
----
|
|
|
|
Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka.
|
|
|
|
== Test Set-Up
|
|
|
|
When writing an integration test for a {prodname} connector configuration,
|
|
you'll also need to set up Apache Kafka and a database which should be the source of change events.
|
|
The existing Testcontainers support for https://www.testcontainers.org/modules/kafka/[Apache Kafka] and https://www.testcontainers.org/modules/databases/[databases] can be used for that.
|
|
|
|
Together with {prodname}'s `DebeziumContainer` class, a typical set-up will look like this:
|
|
|
|
[source,java,indent=0,subs="verbatim,attributes"]
|
|
----
|
|
public class DebeziumContainerTest {
|
|
|
|
private static Network network = Network.newNetwork(); // <1>
|
|
|
|
private static KafkaContainer kafkaContainer = new KafkaContainer()
|
|
.withNetwork(network); // <2>
|
|
|
|
public static PostgreSQLContainer<?> postgresContainer =
|
|
new PostgreSQLContainer<>(
|
|
DockerImageName.parse("debezium/postgres:11")
|
|
.asCompatibleSubstituteFor("postgres"))
|
|
.withNetwork(network)
|
|
.withNetworkAliases("postgres"); // <3>
|
|
|
|
public static DebeziumContainer debeziumContainer =
|
|
new DebeziumContainer("debezium/connect:{debezium-version}")
|
|
.withNetwork(network)
|
|
.withKafka(kafkaContainer)
|
|
.dependsOn(kafkaContainer); // <4>
|
|
|
|
@BeforeClass
|
|
public static void startContainers() { // <5>
|
|
Startables.deepStart(Stream.of(
|
|
kafkaContainer, postgresContainer, debeziumContainer))
|
|
.join();
|
|
}
|
|
}
|
|
----
|
|
<1> Define a Docker network to be used by all the services
|
|
<2> Set up a container for Apache Kafka
|
|
<3> Set up a container for Postgres 11 (using {prodname}'s Postgres container image)
|
|
<4> Set up a container for Kafka Connect with {prodname} {debezium-version}
|
|
<5> Start all three containers
|
|
|
|
== Test Implementation
|
|
|
|
Having declared all the required containers, you can now register an instance of the {prodname} Postgres connector,
|
|
insert some test data into Postgres
|
|
and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic:
|
|
|
|
[source,java,indent=0]
|
|
----
|
|
@Test
|
|
public void canRegisterPostgreSqlConnector() throws Exception {
|
|
try (Connection connection = getConnection(postgresContainer);
|
|
Statement statement = connection.createStatement();
|
|
KafkaConsumer<String, String> consumer = getConsumer(
|
|
kafkaContainer)) {
|
|
|
|
statement.execute("create schema todo"); // <1>
|
|
statement.execute("create table todo.Todo (id int8 not null, " +
|
|
"title varchar(255), primary key (id))");
|
|
statement.execute("alter table todo.Todo replica identity full");
|
|
statement.execute("insert into todo.Todo values (1, " +
|
|
"'Learn CDC')");
|
|
statement.execute("insert into todo.Todo values (2, " +
|
|
"'Learn Debezium')");
|
|
|
|
ConnectorConfiguration connector = ConnectorConfiguration
|
|
.forJdbcContainer(postgresContainer)
|
|
.with("topic.prefix", "dbserver1");
|
|
|
|
debeziumContainer.registerConnector("my-connector",
|
|
connector); // <2>
|
|
|
|
consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));
|
|
|
|
List<ConsumerRecord<String, String>> changeEvents =
|
|
drain(consumer, 2); // <3>
|
|
|
|
assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
|
|
"$.id")).isEqualTo(1);
|
|
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
|
|
"$.op")).isEqualTo("r");
|
|
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
|
|
"$.after.title")).isEqualTo("Learn CDC");
|
|
|
|
assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
|
|
"$.id")).isEqualTo(2);
|
|
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
|
|
"$.op")).isEqualTo("r");
|
|
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
|
|
"$.after.title")).isEqualTo("Learn Debezium");
|
|
|
|
consumer.unsubscribe();
|
|
}
|
|
}
|
|
|
|
// Helper methods below
|
|
|
|
private Connection getConnection(
|
|
PostgreSQLContainer<?> postgresContainer)
|
|
throws SQLException {
|
|
|
|
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
|
|
postgresContainer.getUsername(),
|
|
postgresContainer.getPassword());
|
|
}
|
|
|
|
private KafkaConsumer<String, String> getConsumer(
|
|
KafkaContainer kafkaContainer) {
|
|
|
|
return new KafkaConsumer<>(
|
|
Map.of(
|
|
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
|
kafkaContainer.getBootstrapServers(),
|
|
ConsumerConfig.GROUP_ID_CONFIG,
|
|
"tc-" + UUID.randomUUID(),
|
|
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
|
"earliest"),
|
|
new StringDeserializer(),
|
|
new StringDeserializer());
|
|
}
|
|
|
|
private List<ConsumerRecord<String, String>> drain(
|
|
KafkaConsumer<String, String> consumer,
|
|
int expectedRecordCount) {
|
|
|
|
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
|
|
|
|
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
|
|
consumer.poll(Duration.ofMillis(50))
|
|
.iterator()
|
|
.forEachRemaining(allRecords::add);
|
|
|
|
return allRecords.size() == expectedRecordCount;
|
|
});
|
|
|
|
return allRecords;
|
|
}
|
|
----
|
|
<1> Create a table in the Postgres database and insert two records
|
|
<2> Register an instance of the {prodname} Postgres connector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container
|
|
<3> Read two records from the change event topic in Kafka and assert their attributes
|