diff --git a/debezium-server/debezium-server-kafka/pom.xml b/debezium-server/debezium-server-kafka/pom.xml index 6452fdc3b..f5270e7c8 100644 --- a/debezium-server/debezium-server-kafka/pom.xml +++ b/debezium-server/debezium-server-kafka/pom.xml @@ -1,91 +1,117 @@ - - - io.debezium - debezium-server - 1.5.0-SNAPSHOT - ../pom.xml - - 4.0.0 - debezium-server-kafka - Debezium Server Kafka Sink Adapter - jar + + + + io.debezium + debezium-server + 1.5.0-SNAPSHOT + ../pom.xml + + 4.0.0 + debezium-server-kafka + Debezium Server Kafka Sink Adapter + jar - - true - + + false + + + + io.debezium + debezium-server-core + - - - io.debezium - debezium-server-core - - - - io.debezium - debezium-testing-testcontainers - test - - - io.debezium - debezium-server-core - test-jar - test - - - - - - io.quarkus - quarkus-maven-plugin - ${quarkus.version} - - - - build - - - - - - org.jboss.jandex - jandex-maven-plugin - - - make-index - - jandex - - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - integration-test - - integration-test - - - - verify - - verify - - - - - ${skipITs} - true - - IT - - - - - - - \ No newline at end of file + + + io.debezium + debezium-testing-testcontainers + test + + + io.debezium + debezium-server-core + test-jar + test + + + + + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + build + + + + + + org.jboss.jandex + jandex-maven-plugin + + + make-index + + jandex + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-test + + integration-test + + + + verify + + verify + + + + + ${skipITs} + true + + IT + + + + + + + + true + src/main/resources + + **/build.properties + + + + + + + + skip-integration-tests + + false + + skipITs + + + + true + + + + diff --git a/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java b/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java index 249b90557..aac2e2d33 100644 --- a/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java +++ b/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java @@ -6,10 +6,8 @@ package io.debezium.server.kafka; import java.time.Duration; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.CountDownLatch; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -20,7 +18,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; @@ -72,19 +69,27 @@ void stop() { producer.close(Duration.ofSeconds(5)); } catch (Throwable t) { - LOGGER.warn("Could not close producer", t); + LOGGER.warn("Could not close producer {}", t); } } } @Override public void handleBatch(final List> records, final RecordCommitter> committer) throws InterruptedException { - final List> futures = new ArrayList<>(); + final CountDownLatch latch = new CountDownLatch(records.size()); for (ChangeEvent record : records) { try { - // TODO: change log level to trace - LOGGER.info("Received event '{}'", record); - futures.add(producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value()))); + LOGGER.trace("Received event '{}'", record); + producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value()), (metadata, exception) -> { + if (exception != null) { + LOGGER.error("Failed to send record to {}:", record.destination(), exception); + throw new DebeziumException(exception); + } + else { + LOGGER.trace("Sent message with offset: {}", metadata.offset()); + latch.countDown(); + } + }); committer.markProcessed(record); } catch (Exception e) { @@ -92,17 +97,7 @@ public void handleBatch(final List> records, final R } } - final List offsets = new ArrayList<>(); - for (Future future : futures) { - try { - RecordMetadata meta = future.get(); - offsets.add(meta.offset()); - } - catch (InterruptedException | ExecutionException e) { - throw new DebeziumException(e); - } - } - LOGGER.trace("Sent messages with offsets: {}", offsets); + latch.await(); committer.markBatchFinished(); } } diff --git a/debezium-server/debezium-server-kafka/src/main/resources/META-INF/beans.xml b/debezium-server/debezium-server-kafka/src/main/resources/META-INF/beans.xml index 8b1378917..e69de29bb 100644 --- a/debezium-server/debezium-server-kafka/src/main/resources/META-INF/beans.xml +++ b/debezium-server/debezium-server-kafka/src/main/resources/META-INF/beans.xml @@ -1 +0,0 @@ - diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaITs.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java similarity index 96% rename from debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaITs.java rename to debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java index a5d837be9..3364e6397 100644 --- a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaITs.java +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java @@ -10,6 +10,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import javax.enterprise.event.Observes; @@ -39,7 +40,7 @@ @QuarkusTest @QuarkusTestResource(PostgresTestResourceLifecycleManager.class) @QuarkusTestResource(KafkaTestResourceLifecycleManager.class) -public class KafkaITs { +public class KafkaIT { private static final String TOPIC_NAME = "testc.inventory.customers"; private static final int MESSAGE_COUNT = 4; @@ -55,7 +56,9 @@ void setupDependencies(@Observes final ConnectorStartedEvent event) { Testing.Print.enable(); final Map configs = new ConcurrentHashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaTestResourceLifecycleManager.getBootstrapServers()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID()); consumer = new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()); } diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java index 2aa022f25..382407430 100644 --- a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java @@ -18,6 +18,10 @@ public KafkaTestConfigSource() { final Map kafkaConfig = new HashMap<>(); kafkaConfig.put("debezium.sink.type", "kafka"); + kafkaConfig.put("debezium.sink.kafka.producer.bootstrap.servers", KafkaTestResourceLifecycleManager.getBootstrapServers()); + kafkaConfig.put("debezium.sink.kafka.producer.key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + kafkaConfig.put("debezium.sink.kafka.producer.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + kafkaConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); kafkaConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestResourceLifecycleManager.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestResourceLifecycleManager.java index d077cce33..4fc59a4e2 100644 --- a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestResourceLifecycleManager.java +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestResourceLifecycleManager.java @@ -19,24 +19,23 @@ */ public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager { + @SuppressWarnings("deprecation") private static KafkaContainer kafkaContainer = new KafkaContainer(); @Override public Map start() { kafkaContainer.start(); - - Map props = new HashMap<>(); - return props; + return new HashMap<>(); } @Override public void stop() { - if (kafkaContainer != null) { - kafkaContainer.stop(); - } + kafkaContainer.stop(); } public static String getBootstrapServers() { + // if container is already started, start() will return early + kafkaContainer.start(); return kafkaContainer.getBootstrapServers(); } } diff --git a/debezium-server/debezium-server-kafka/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource b/debezium-server/debezium-server-kafka/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource new file mode 100644 index 000000000..8b6c1b181 --- /dev/null +++ b/debezium-server/debezium-server-kafka/src/test/resources/META-INF/services/org.eclipse.microprofile.config.spi.ConfigSource @@ -0,0 +1 @@ +io.debezium.server.kafka.KafkaTestConfigSource \ No newline at end of file diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index df1e0cce1..70b2e3e1b 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -556,6 +556,27 @@ By default the same name is used. |=== +==== Apache Kafka + +https://kafka.apache.org/[Apache Kafka] is a popular open-source platform for distributed event streaming. Debezium server supports publishing captured change events to a configured Kafka message broker. + +[cols="35%a,10%a,55%a",options="header"] +|=== +|Property +|Default +|Description + +|[[kafka-type]]<> +| +|Must be set to `kafka`. + +|[[kafka-producer]]<> +| +|The Kafka sink adapter supports pass-through configuration. +This means that all Kafka producer https://kafka.apache.org/documentation/#producerconfigs[configuration properties] are passed to the producer with the prefix removed. +At least `bootstrap.servers`, `key.serializer` and `value.serializer` properties must be provided. The `topic` is set by Debezium. + +|=== == Extensions