[id="integration-testing-with-testcontainers"] = Integration Testing with Testcontainers :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] [NOTE] ==== This feature is currently in incubating state, i.e. exact semantics, configuration options, APIs etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems will using this extension. ==== == Overview When setting up change data capture pipelines with {prodname}, it's a good idea to also have some automated testing in place, in order to make sure that * the source database is set up so changes can be streamed off of it * your connectors are configured correctly The {prodname} extension for https://www.testcontainers.org/[Testcontainers] aims at simplying such tests, by running all the required infrastructure (Apache Kafka, Kafka Connect etc.) via Linux containers and making it easily accessible to Java-based tests. It applies sensible defaults as far as possible (e.g. database credentials for connectors can be obtained from the configured database container), allowing you to focus on the essential logic of your tests. == Getting Started In order to use {prodname}'s Testcontainers integration, add the following dependency to your project: [source,xml,subs="verbatim,attributes"] ---- io.debezium debezium-testing-testcontainers {debezium-version} test org.testcontainers kafka test org.testcontainers postgresql test ---- Depending on your testing strategy, you may also need the JDBC driver of your database and a client for Apache Kafka, so you can insert some test data and assert the corresponding change events in Kafka. == Test Set-Up When writing an integration test for a {prodname} connector configuration, you'll also need to set up Apache Kafka and a database which should be the source of change events. The existing Testcontainers support for https://www.testcontainers.org/modules/kafka/[Apache Kafka] and https://www.testcontainers.org/modules/databases/[databases] can be used for that. Together with {prodname}'s `DebeziumContainer` class, a typical set-up will look like this: [source,java,indent=0,subs="verbatim,attributes"] ---- public class DebeziumContainerTest { private static Network network = Network.newNetwork(); // <1> private static KafkaContainer kafkaContainer = new KafkaContainer() .withNetwork(network); // <2> public static PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>("debezium/postgres:11") .withNetwork(network) .withNetworkAliases("postgres"); // <3> public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:{debezium-version}") .withNetwork(network) .withKafka(kafkaContainer) .dependsOn(kafkaContainer); // <4> @BeforeClass public static void startContainers() { // <5> Startables.deepStart(Stream.of( kafkaContainer, postgresContainer, debeziumContainer)) .join(); } } ---- <1> Define a Docker network to be used by all the services <2> Set up a container for Apache Kafka <3> Set up a container for Postgres 11 (using {prodname}'s Postgres container image) <4> Set up a container for Kafka Connect with {prodname} {debezium-version} <5> Start all three containers == Test Implementation Having declared all the required containers, you can now register an instance of the {prodname} Postgres connector, insert some test data into Postgres and use the Apache Kafka client for reading the expected change event records from the corresponding Kafka topic: [source,java,indent=0] ---- @Test public void canRegisterPostgreSqlConnector() throws Exception { try (Connection connection = getConnection(postgresContainer); Statement statement = connection.createStatement(); KafkaConsumer consumer = getConsumer( kafkaContainer)) { statement.execute("create schema todo"); // <1> statement.execute("create table todo.Todo (id int8 not null, " + "title varchar(255), primary key (id))"); statement.execute("alter table todo.Todo replica identity full"); statement.execute("insert into todo.Todo values (1, " + "'Learn CDC')"); statement.execute("insert into todo.Todo values (2, " + "'Learn Debezium')"); ConnectorConfiguration connector = ConnectorConfiguration .forJdbcContainer(postgresContainer) .with("topic.prefix", "dbserver1"); debeziumContainer.registerConnector("my-connector", connector); // <2> consumer.subscribe(Arrays.asList("dbserver1.todo.todo")); List> changeEvents = drain(consumer, 2); // <3> assertThat(JsonPath. read(changeEvents.get(0).key(), "$.id")).isEqualTo(1); assertThat(JsonPath. read(changeEvents.get(0).value(), "$.op")).isEqualTo("r"); assertThat(JsonPath. read(changeEvents.get(0).value(), "$.after.title")).isEqualTo("Learn CDC"); assertThat(JsonPath. read(changeEvents.get(1).key(), "$.id")).isEqualTo(2); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.op")).isEqualTo("r"); assertThat(JsonPath. read(changeEvents.get(1).value(), "$.after.title")).isEqualTo("Learn Debezium"); consumer.unsubscribe(); } } // Helper methods below private Connection getConnection( PostgreSQLContainer postgresContainer) throws SQLException { return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword()); } private KafkaConsumer getConsumer( KafkaContainer kafkaContainer) { return new KafkaConsumer<>( Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), new StringDeserializer(), new StringDeserializer()); } private List> drain( KafkaConsumer consumer, int expectedRecordCount) { List> allRecords = new ArrayList<>(); Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> { consumer.poll(Duration.ofMillis(50)) .iterator() .forEachRemaining(allRecords::add); return allRecords.size() == expectedRecordCount; }); return allRecords; } ---- <1> Create a table in the Postgres database and insert two records <2> Register an instance of the {prodname} Postgres connector; the connector type as well as properties such as database host, database name, user etc. are derived from the database container <3> Read two records from the change event topic in Kafka and assert their attributes