DBZ-5772 Add JetStream sink connector

This commit is contained in:
Skezzowski 2022-11-02 13:50:57 +01:00 committed by Jiri Pechanec
parent 5c2c0f54be
commit 717bc83c39
8 changed files with 503 additions and 1 deletions

View File

@ -17,7 +17,7 @@
<version.pulsar>2.10.1</version.pulsar>
<version.eventhubs>5.12.1</version.eventhubs>
<version.pravega>0.9.1</version.pravega>
<version.nats>2.8.0</version.nats>
<version.nats>2.16.3</version.nats>
<version.stan>2.2.3</version.stan>
<version.commons.logging>1.2</version.commons.logging>

View File

@ -0,0 +1,169 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-nats-jetstream</artifactId>
<name>Debezium Server NATS JetStream Sink Adapter</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jboss.jandex</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<executions>
<execution>
<id>make-index</id>
<goals>
<goal>jandex</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>quick</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>quick</name>
</property>
</activation>
<properties>
<skipITs>true</skipITs>
<docker.skip>true</docker.skip>
</properties>
</profile>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Do not perform any Docker-related functionality
To use, specify "-DskipITs" on the Maven command line.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
<properties>
<docker.skip>true</docker.skip>
</properties>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.nats.jetstream;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
/**
* Implementation of the consumer that delivers the messages into a NATS Jetstream stream.
*
* @author Balázs Sipos
*/
@Named("nats-jetstream")
@Dependent
public class NatsJetStreamChangeConsumer extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(NatsJetStreamChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.nats-jetstream.";
private static final String PROP_URL = PROP_PREFIX + "url";
private static final String PROP_CREATE_STREAM = PROP_PREFIX + "create-stream";
private static final String PROP_SUBJECTS = PROP_PREFIX + "subjects";
private static final String PROP_STORAGE = PROP_PREFIX + "storage";
private Connection nc;
private JetStream js;
@ConfigProperty(name = PROP_CREATE_STREAM, defaultValue = "false")
boolean createStream;
@PostConstruct
void connect() {
// Read config
final Config config = ConfigProvider.getConfig();
String url = config.getValue(PROP_URL, String.class);
try {
// Setup NATS connection
io.nats.client.Options natsOptions = new io.nats.client.Options.Builder()
.servers(url.split(","))
.noReconnect()
.build();
nc = Nats.connect(natsOptions);
// Creating a basic stream, mostly for testing. If a user wants to configure their stream, it can be done
// via the nats cli.
if (createStream) {
String subjects = config.getOptionalValue(PROP_SUBJECTS, String.class).orElse("*.*.*");
String storage = config.getOptionalValue(PROP_STORAGE, String.class).orElse("memory");
StorageType storageType = storage.equals("file") ? StorageType.File : StorageType.Memory;
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name("DebeziumStream")
.description("The debezium stream, contains messages which are coming from debezium")
.subjects(subjects.split(","))
.storageType(storageType)
.build();
LOGGER.info("Creating stream with config: {}", streamConfig);
JetStreamManagement jsm = nc.jetStreamManagement();
jsm.addStream(streamConfig);
}
js = nc.jetStream();
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
@PreDestroy
void close() {
try {
if (nc != null) {
nc.close();
LOGGER.info("NATS connection closed.");
}
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records,
RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
for (ChangeEvent<Object, Object> rec : records) {
if (rec.value() != null) {
String subject = streamNameMapper.map(rec.destination());
byte[] recordBytes = getBytes(rec.value());
LOGGER.trace("Received event @ {} = '{}'", subject, getString(rec.value()));
try {
js.publish(subject, recordBytes);
}
catch (Exception e) {
throw new DebeziumException(e);
}
}
committer.markProcessed(rec);
}
committer.markBatchFinished();
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.nats.jetstream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.enterprise.event.Observes;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
/**
* Integration test that verifies basic reading from PostgreSQL database and writing to NATS Jetstream subject.
*
* @author Thiago Avancini
*/
@QuarkusTest
@QuarkusTestResource(PostgresTestResourceLifecycleManager.class)
@QuarkusTestResource(NatsJetStreamTestResourceLifecycleManager.class)
class NatsJetStreamIT {
private static final int MESSAGE_COUNT = 4;
private static final String SUBJECT_NAME = "testc.inventory.customers";
protected static Connection nc;
protected static JetStream js;
protected static Dispatcher d;
{
Testing.Files.delete(NatsJetStreamTestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(NatsJetStreamTestConfigSource.OFFSET_STORE_PATH);
}
private static final List<Message> messages = Collections.synchronizedList(new ArrayList<>());
void setupDependencies(@Observes ConnectorStartedEvent event) {
Testing.Print.enable();
// Setup NATS Jetstream connection
try {
nc = Nats.connect(NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl());
js = nc.jetStream();
}
catch (Exception e) {
Testing.print("Could not connect to NATS Jetstream");
}
// Setup message handler
try {
d = nc.createDispatcher();
js.subscribe(SUBJECT_NAME, d, messages::add, true);
}
catch (Exception e) {
Testing.print("Could not register message handler: " + e.getMessage());
}
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}
@AfterAll
static void stop() throws Exception {
if (d != null) {
d.unsubscribe(SUBJECT_NAME);
}
}
@Test
void testNatsStreaming() throws Exception {
Awaitility.await().atMost(Duration.ofSeconds(NatsJetStreamTestConfigSource.waitForSeconds())).until(() -> messages.size() >= MESSAGE_COUNT);
Assertions.assertThat(messages.size() >= MESSAGE_COUNT).isTrue();
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.nats.jetstream;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import io.debezium.server.TestConfigSource;
public class NatsJetStreamTestConfigSource extends TestConfigSource {
public NatsJetStreamTestConfigSource() {
Map<String, String> natsJetStreamTest = new HashMap<>();
natsJetStreamTest.put("debezium.sink.type", "nats-jetstream");
natsJetStreamTest.put("debezium.sink.nats-jetstream.url",
NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl());
natsJetStreamTest.put("debezium.sink.nats-jetstream.create-stream", "true");
natsJetStreamTest.put("debezium.sink.nats-jetstream.subjects", "asd,asd");
natsJetStreamTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
natsJetStreamTest.put("debezium.source.topic.prefix", "testc");
natsJetStreamTest.put("debezium.source.schema.include.list", "inventory");
natsJetStreamTest.put("debezium.source.table.include.list", "inventory.customers");
natsJetStreamTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
natsJetStreamTest.put("debezium.source.offset.flush.interval.ms", "0");
config = natsJetStreamTest;
}
@Override
public int getOrdinal() {
// Configuration property precedence is based on ordinal values and since we override the
// properties in TestConfigSource, we should give this a higher priority.
return super.getOrdinal() + 1;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.nats.jetstream;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
/**
* Manages the lifecycle of a NATS Streaming test resource.
*
* @author Thiago Avancini
*/
public class NatsJetStreamTestResourceLifecycleManager implements QuarkusTestResourceLifecycleManager {
public static final int NATS_PORT = 4222;
public static final String NATS_IMAGE = "nats:latest";
private static final AtomicBoolean running = new AtomicBoolean(false);
private static final GenericContainer<?> container = new GenericContainer<>(NATS_IMAGE)
.withExposedPorts(NATS_PORT)
.withCommand("-js")
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Server is ready.*"));
private static synchronized void start(boolean ignored) {
if (!running.get()) {
container.start();
running.set(true);
}
}
@Override
public Map<String, String> start() {
start(true);
Map<String, String> params = new ConcurrentHashMap<>();
return params;
}
@Override
public void stop() {
try {
container.stop();
}
catch (Exception e) {
// ignored
}
running.set(false);
}
public static String getNatsContainerUrl() {
start(true);
return String.format("nats://%s:%d", container.getHost(), container.getFirstMappedPort());
}
}

View File

@ -0,0 +1 @@
io.debezium.server.nats.jetstream.NatsJetStreamTestConfigSource

View File

@ -24,6 +24,7 @@
<module>debezium-server-kafka</module>
<module>debezium-server-pravega</module>
<module>debezium-server-nats-streaming</module>
<module>debezium-server-nats-jetstream</module>
</modules>
<dependencyManagement>