DBZ-3382 Incoprate feedback, run ITs and add documentation

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
This commit is contained in:
Alfusainey Jallow 2021-05-03 17:29:41 +02:00 committed by Jiri Pechanec
parent ccfc3ccfe8
commit 40d023817d
8 changed files with 165 additions and 117 deletions

View File

@ -1,6 +1,5 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
@ -13,14 +12,14 @@
<packaging>jar</packaging>
<properties>
<skipITs>true</skipITs>
<skipITs>false</skipITs>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.debezium</groupId>
@ -86,6 +85,33 @@
</configuration>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
<profiles>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Do not perform any Docker-related functionality
To use, specify "-DskipITs" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
<properties>
<docker.skip>true</docker.skip>
</properties>
</profile>
</profiles>
</project>

View File

@ -6,10 +6,8 @@
package io.debezium.server.kafka;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -20,7 +18,6 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
@ -72,19 +69,27 @@ void stop() {
producer.close(Duration.ofSeconds(5));
}
catch (Throwable t) {
LOGGER.warn("Could not close producer", t);
LOGGER.warn("Could not close producer {}", t);
}
}
}
@Override
public void handleBatch(final List<ChangeEvent<Object, Object>> records, final RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
final List<Future<RecordMetadata>> futures = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(records.size());
for (ChangeEvent<Object, Object> record : records) {
try {
// TODO: change log level to trace
LOGGER.info("Received event '{}'", record);
futures.add(producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value())));
LOGGER.trace("Received event '{}'", record);
producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value()), (metadata, exception) -> {
if (exception != null) {
LOGGER.error("Failed to send record to {}:", record.destination(), exception);
throw new DebeziumException(exception);
}
else {
LOGGER.trace("Sent message with offset: {}", metadata.offset());
latch.countDown();
}
});
committer.markProcessed(record);
}
catch (Exception e) {
@ -92,17 +97,7 @@ public void handleBatch(final List<ChangeEvent<Object, Object>> records, final R
}
}
final List<Long> offsets = new ArrayList<>();
for (Future<RecordMetadata> future : futures) {
try {
RecordMetadata meta = future.get();
offsets.add(meta.offset());
}
catch (InterruptedException | ExecutionException e) {
throw new DebeziumException(e);
}
}
LOGGER.trace("Sent messages with offsets: {}", offsets);
latch.await();
committer.markBatchFinished();
}
}

View File

@ -10,6 +10,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.enterprise.event.Observes;
@ -39,7 +40,7 @@
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
public class KafkaITs {
public class KafkaIT {
private static final String TOPIC_NAME = "testc.inventory.customers";
private static final int MESSAGE_COUNT = 4;
@ -55,7 +56,9 @@ void setupDependencies(@Observes final ConnectorStartedEvent event) {
Testing.Print.enable();
final Map<String, Object> configs = new ConcurrentHashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaTestResourceLifecycleManager.getBootstrapServers());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
consumer = new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer());
}

View File

@ -18,6 +18,10 @@ public KafkaTestConfigSource() {
final Map<String, String> kafkaConfig = new HashMap<>();
kafkaConfig.put("debezium.sink.type", "kafka");
kafkaConfig.put("debezium.sink.kafka.producer.bootstrap.servers", KafkaTestResourceLifecycleManager.getBootstrapServers());
kafkaConfig.put("debezium.sink.kafka.producer.key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaConfig.put("debezium.sink.kafka.producer.value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaConfig.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
kafkaConfig.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());

View File

@ -19,24 +19,23 @@
*/
public class KafkaTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
@SuppressWarnings("deprecation")
private static KafkaContainer kafkaContainer = new KafkaContainer();
@Override
public Map<String, String> start() {
kafkaContainer.start();
Map<String, String> props = new HashMap<>();
return props;
return new HashMap<>();
}
@Override
public void stop() {
if (kafkaContainer != null) {
kafkaContainer.stop();
}
}
public static String getBootstrapServers() {
// if container is already started, start() will return early
kafkaContainer.start();
return kafkaContainer.getBootstrapServers();
}
}

View File

@ -0,0 +1 @@
io.debezium.server.kafka.KafkaTestConfigSource

View File

@ -556,6 +556,27 @@ By default the same name is used.
|===
==== Apache Kafka
https://kafka.apache.org/[Apache Kafka] is a popular open-source platform for distributed event streaming. Debezium server supports publishing captured change events to a configured Kafka message broker.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[kafka-type]]<<kafka-type, `debezium.sink.type`>>
|
|Must be set to `kafka`.
|[[kafka-producer]]<<kafka-producer, `debezium.sink.kafka.producer.*`>>
|
|The Kafka sink adapter supports pass-through configuration.
This means that all Kafka producer https://kafka.apache.org/documentation/#producerconfigs[configuration properties] are passed to the producer with the prefix removed.
At least `bootstrap.servers`, `key.serializer` and `value.serializer` properties must be provided. The `topic` is set by Debezium.
|===
== Extensions