DBZ-2112 Added Pulsar support

This commit is contained in:
Jiri Pechanec 2020-05-29 06:46:56 +02:00 committed by Gunnar Morling
parent 69681da40b
commit 5aaf4d420d
19 changed files with 560 additions and 423 deletions

View File

@ -13,7 +13,7 @@ env:
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=4.0 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=3.2 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-quarkus-outbox -U -am -amd -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-server -U -am -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-server -am -amd -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-testing -am -amd -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
sudo: required

View File

@ -5,13 +5,19 @@
*/
package io.debezium.server;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
/**
* Basic services provided to all change consumers.
*
@ -34,4 +40,39 @@ void init() {
}
LOGGER.info("Using '{}' stream name mapper", streamNameMapper);
}
}
protected Map<String, Object> getConfigSubset(Config config, String prefix) {
final Map<String, Object> ret = new HashMap<>();
for (String propName : config.getPropertyNames()) {
if (propName.startsWith(prefix)) {
final String newPropName = propName.substring(prefix.length());
ret.put(newPropName, config.getValue(propName, String.class));
}
}
return ret;
}
protected byte[] getBytes(Object object) {
if (object instanceof byte[]) {
return (byte[]) object;
}
else if (object instanceof String) {
return ((String) object).getBytes();
}
throw new DebeziumException(unsupportedTypeMessage(object));
}
protected String getString(Object object) {
if (object instanceof String) {
return (String) object;
}
throw new DebeziumException(unsupportedTypeMessage(object));
}
protected String unsupportedTypeMessage(Object object) {
final String type = (object == null) ? "null" : object.getClass().getName();
return "Unexpected data type '" + type + "'";
}
}

View File

@ -71,6 +71,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-server-pubsub</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-pulsar</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -1,72 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.time.Duration;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
/**
* Integration test that verifies basic reading from PostgreSQL database.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class DebeziumServerIT {
private static final int MESSAGE_COUNT = 4;
protected static TestDatabase db = null;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
static void stop() {
if (db != null) {
db.stop();
}
}
@Inject
DebeziumServer server;
void setupDependencies(@Observes ConnectorStartedEvent event) {
if (!TestConfigSource.isItTest()) {
return;
}
db = new TestDatabase();
db.start();
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}
@Test
public void testPostgres() throws Exception {
Testing.Print.enable();
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
Assertions.assertThat(((String) testConsumer.getValues().get(MESSAGE_COUNT - 1)))
.contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}");
}
}

View File

@ -1,74 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.Test;
import io.debezium.DebeziumException;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
/**
* Smoke test that verifies the basic functionality of Quarkus-based server.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class DebeziumServerTest {
private static final int MESSAGE_COUNT = 5;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
void setupDependencies(@Observes ConnectorStartedEvent event) {
Testing.Files.delete(TestConfigSource.TEST_FILE_PATH);
Testing.Files.createTestingFile(TestConfigSource.TEST_FILE_PATH);
appendLinesToSource(MESSAGE_COUNT);
Testing.Print.enable();
}
@Inject
DebeziumServer server;
@Test
public void testJson() throws Exception {
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
Assertions.assertThat(testConsumer.getValues().get(MESSAGE_COUNT - 1)).isEqualTo("{\"line\":\"" + MESSAGE_COUNT + "\"}");
}
static void appendLinesToSource(int numberOfLines) {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(i + 1);
}
try {
java.nio.file.Files.write(TestConfigSource.TEST_FILE_PATH, Collect.arrayListOf(lines), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
catch (IOException e) {
throw new DebeziumException(e);
}
}
static String generateLine(int lineNumber) {
return Integer.toString(lineNumber);
}
}

View File

@ -1,126 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.eclipse.microprofile.config.spi.ConfigSource;
import io.debezium.util.Testing;
public class TestConfigSource implements ConfigSource {
public static final String KINESIS_REGION = "eu-central-1";
public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
public static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
final Map<String, String> integrationTest = new HashMap<>();
final Map<String, String> kinesisTest = new HashMap<>();
final Map<String, String> pubsubTest = new HashMap<>();
final Map<String, String> unitTest = new HashMap<>();
protected Map<String, String> config;
public TestConfigSource() {
pubsubTest.put("debezium.sink.type", "pubsub");
pubsubTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
pubsubTest.put("debezium.source.offset.flush.interval.ms", "0");
pubsubTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
pubsubTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
pubsubTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
pubsubTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
pubsubTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
pubsubTest.put("debezium.source.database.server.name", "testc");
pubsubTest.put("debezium.source.schema.whitelist", "inventory");
pubsubTest.put("debezium.source.table.whitelist", "inventory.customers");
kinesisTest.put("debezium.sink.type", "kinesis");
kinesisTest.put("debezium.sink.kinesis.region", KINESIS_REGION);
kinesisTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
kinesisTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
kinesisTest.put("debezium.source.offset.flush.interval.ms", "0");
kinesisTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
kinesisTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
kinesisTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
kinesisTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
kinesisTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
kinesisTest.put("debezium.source.database.server.name", "testc");
kinesisTest.put("debezium.source.schema.whitelist", "inventory");
kinesisTest.put("debezium.source.table.whitelist", "inventory.customers");
integrationTest.put("debezium.sink.type", "test");
integrationTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
integrationTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
integrationTest.put("debezium.source.offset.flush.interval.ms", "0");
integrationTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
integrationTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
integrationTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
integrationTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
integrationTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
integrationTest.put("debezium.source.database.server.name", "testc");
integrationTest.put("debezium.source.schema.whitelist", "inventory");
integrationTest.put("debezium.source.table.whitelist", "inventory.customers");
unitTest.put("debezium.sink.type", "test");
unitTest.put("debezium.source.connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
unitTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
unitTest.put("debezium.source.offset.flush.interval.ms", "0");
unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString());
unitTest.put("debezium.source.topic", "topicX");
unitTest.put("debezium.format.schemas.enable", "true");
unitTest.put("debezium.format.value.schemas.enable", "false");
unitTest.put("debezium.transforms", "hoist");
unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value");
unitTest.put("debezium.transforms.hoist.field", "line");
if (isItTest()) {
config = integrationTest;
}
else if (isKinesisTest()) {
config = kinesisTest;
}
else if (isPubSubTest()) {
config = pubsubTest;
}
else {
config = unitTest;
}
}
public static boolean isItTest() {
return "IT".equals(System.getProperty("test.type"));
}
public static boolean isKinesisTest() {
return "KINESIS".equals(System.getProperty("test.type"));
}
public static boolean isPubSubTest() {
return "PUBSUB".equals(System.getProperty("test.type"));
}
@Override
public Map<String, String> getProperties() {
return config;
}
@Override
public String getValue(String propertyName) {
return config.get(propertyName);
}
@Override
public String getName() {
return "test";
}
public static int waitForSeconds() {
return 60;
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.util.Testing;
@Dependent
@Named("test")
public class TestConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
final List<Object> values = Collections.synchronizedList(new ArrayList<>());
@PostConstruct
void init() {
Testing.print("Test consumer constructed");
}
@PreDestroy
void close() {
Testing.print("Test consumer destroyed");
}
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
records.forEach(record -> {
Testing.print(record);
values.add(record.value());
try {
committer.markProcessed(record);
}
catch (InterruptedException e) {
throw new DebeziumException(e);
}
});
}
public List<Object> getValues() {
return values;
}
}

View File

@ -1,65 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.time.Duration;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
/**
* @author Jiri Pechanec
*/
public class TestDatabase {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT = 5432;
private GenericContainer container;
public void start() {
try {
container = new FixedHostPortGenericContainer(POSTGRES_IMAGE)
.withFixedExposedPort(POSTGRES_PORT, POSTGRES_PORT)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));
container.start();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public String getIp() {
return POSTGRES_HOST;
}
public int getPort() {
return POSTGRES_PORT;
}
public void stop() {
try {
if (container != null) {
container.stop();
}
}
catch (Exception e) {
// ignored
}
}
}

View File

@ -20,7 +20,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
@ -89,28 +88,6 @@ void close() {
}
}
private byte[] getByte(Object object) {
if (object instanceof byte[]) {
return (byte[]) object;
}
else if (object instanceof String) {
return ((String) object).getBytes();
}
throw new DebeziumException(unsupportedTypeMessage(object));
}
private String getString(Object object) {
if (object instanceof String) {
return (String) object;
}
throw new DebeziumException(unsupportedTypeMessage(object));
}
public String unsupportedTypeMessage(Object object) {
final String type = (object == null) ? "null" : object.getClass().getName();
return "Unexpected data type '" + type + "'";
}
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
@ -119,7 +96,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitt
final PutRecordRequest putRecord = PutRecordRequest.builder()
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
.streamName(streamNameMapper.map(record.destination()))
.data(SdkBytes.fromByteArray(getByte(record.value())))
.data(SdkBytes.fromByteArray(getBytes(record.value())))
.build();
client.putRecord(putRecord);
committer.markProcessed(record);

View File

@ -0,0 +1,180 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-pulsar</artifactId>
<name>Debezium Server Pulsar Sink Adapter</name>
<packaging>jar</packaging>
<properties>
<pulsar.port.native>6650</pulsar.port.native>
<pulsar.port.http>8080</pulsar.port.http>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
<pulsar.port.native>${pulsar.port.native}</pulsar.port.native>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<watchInterval>500</watchInterval>
<logDate>default</logDate>
<verbose>true</verbose>
<images>
<image>
<!-- A Docker image using the all in one Pulsar server -->
<name>apachepulsar/pulsar:${version.pulsar}</name>
<run>
<namingStrategy>none</namingStrategy>
<cmd>bin/pulsar standalone</cmd>
<ports>
<port>${pulsar.port.native}:6650</port>
<port>${pulsar.port.http}:8080</port>
</ports>
<log>
<prefix>pulsar</prefix>
<enabled>true</enabled>
<color>yellow</color>
</log>
<wait>
<time>30000</time> <!-- 30 seconds max -->
<log>(?s)org.apache.pulsar.broker.PulsarService - messaging service is ready</log>
</wait>
</run>
</image>
</images>
</configuration>
<!--
Connect this plugin to the maven lifecycle around the integration-test phase:
start the container in pre-integration-test and stop it in post-integration-test.
-->
<executions>
<execution>
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
<goal>start</goal>
</goals>
</execution>
<execution>
<id>stop</id>
<phase>post-integration-test</phase>
<goals>
<goal>stop</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@ -0,0 +1,147 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.pulsar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.server.BaseChangeConsumer;
/**
* Implementation of the consumer that delivers the messages into a Pulsar destination.
*
* @author Jiri Pechanec
*
*/
@Named("pulsar")
@Dependent
public class PulsarChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.pulsar.";
private static final String PROP_CLIENT_PREFIX = PROP_PREFIX + "client.";
private static final String PROP_PRODUCER_PREFIX = PROP_PREFIX + "producer.";
public static interface ProducerBuilder {
Producer<Object> get(String topicName, Object value);
}
private final Map<String, Producer<?>> producers = new HashMap<>();
private PulsarClient pulsarClient;
private Map<String, Object> producerConfig;
@ConfigProperty(name = PROP_PREFIX + "null.key", defaultValue = "default")
String nullKey;
@PostConstruct
void connect() {
final Config config = ConfigProvider.getConfig();
try {
pulsarClient = PulsarClient.builder()
.loadConf(getConfigSubset(config, PROP_CLIENT_PREFIX))
.build();
}
catch (PulsarClientException e) {
throw new DebeziumException(e);
}
producerConfig = getConfigSubset(config, PROP_PRODUCER_PREFIX);
}
@PreDestroy
void close() {
producers.values().forEach(producer -> {
try {
producer.close();
}
catch (Exception e) {
LOGGER.warn("Exception while closing producer: {}", e);
}
});
try {
pulsarClient.close();
}
catch (Exception e) {
LOGGER.warn("Exception while closing client: {}", e);
}
}
private Producer<?> createProducer(String topicName, Object value) {
try {
if (value instanceof String) {
return pulsarClient.newProducer(Schema.STRING)
.loadConf(producerConfig)
.topic(topicName)
.create();
}
else {
return pulsarClient.newProducer()
.loadConf(producerConfig)
.topic(topicName)
.create();
}
}
catch (PulsarClientException e) {
throw new DebeziumException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);
final String topicName = streamNameMapper.map(record.destination());
final Producer<?> producer = producers.computeIfAbsent(topicName, (topic) -> createProducer(topic, record.value()));
final String key = getString(record.key());
@SuppressWarnings("rawtypes")
final TypedMessageBuilder message;
if (record.value() instanceof String) {
message = producer.newMessage(Schema.STRING);
}
else {
message = producer.newMessage();
}
message
.key(key == null ? nullKey : key)
.value(record.value());
try {
final MessageId messageId = message.send();
LOGGER.trace("Sent message with id: {}", messageId);
}
catch (PulsarClientException e) {
throw new DebeziumException(e);
}
committer.markProcessed(record);
}
committer.markBatchFinished();
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.pulsar;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
/**
* Integration test that verifies basic reading from PostgreSQL database and writing to a Google Cloud PubSub stream.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class PulsarIT {
private static final int MESSAGE_COUNT = 4;
private static final String TOPIC_NAME = "testc.inventory.customers";
protected static TestDatabase db = null;
protected static PulsarClient pulsarClient;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(PulsarTestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
static void stop() throws IOException {
if (db != null) {
db.stop();
}
}
@Inject
DebeziumServer server;
void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
Testing.Print.enable();
pulsarClient = PulsarClient.builder()
.serviceUrl(PulsarTestConfigSource.getServiceUrl())
.build();
db = new TestDatabase();
db.start();
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}
@Test
public void testPulsar() throws Exception {
final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(TOPIC_NAME)
.subscriptionName("test-" + UUID.randomUUID())
.subscribe();
final List<Message<String>> records = new ArrayList<>();
Awaitility.await().atMost(Duration.ofSeconds(PulsarTestConfigSource.waitForSeconds())).until(() -> {
records.add(consumer.receive());
return records.size() >= MESSAGE_COUNT;
});
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.pulsar;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
public class PulsarTestConfigSource extends TestConfigSource {
final Map<String, String> pulsarTest = new HashMap<>();
public PulsarTestConfigSource() {
pulsarTest.put("debezium.sink.type", "pulsar");
pulsarTest.put("debezium.sink.pulsar.client.serviceUrl", getServiceUrl());
pulsarTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
pulsarTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
pulsarTest.put("debezium.source.offset.flush.interval.ms", "0");
pulsarTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
pulsarTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
pulsarTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
pulsarTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
pulsarTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
pulsarTest.put("debezium.source.database.server.name", "testc");
pulsarTest.put("debezium.source.schema.whitelist", "inventory");
pulsarTest.put("debezium.source.table.whitelist", "inventory.customers");
config = pulsarTest;
}
public static String getServiceUrl() {
return "pulsar://localhost:" + System.getProperty("pulsar.port.native", "6650");
}
}

View File

@ -0,0 +1 @@
io.debezium.server.pulsar.PulsarTestConfigSource

View File

@ -14,12 +14,14 @@
<properties>
<version.kinesis>2.13.13</version.kinesis>
<version.pubsub>5.1.0</version.pubsub>
<version.pulsar>2.5.2</version.pulsar>
</properties>
<modules>
<module>debezium-server-core</module>
<module>debezium-server-kinesis</module>
<module>debezium-server-pubsub</module>
<module>debezium-server-pulsar</module>
<module>debezium-server-distro</module>
</modules>
@ -49,6 +51,11 @@
<artifactId>debezium-server-pubsub</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
@ -69,6 +76,11 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${version.pulsar}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -4,6 +4,7 @@
:linkattrs:
:icons: font
:toc:
:toclevels: 3
:toc-placement: macro
toc::[]
@ -298,7 +299,7 @@ So this string will be used as message key for messages from tables without prim
|===
==== Injection points
===== Injection points
The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
@ -353,7 +354,7 @@ This is not supported by Pub/Sub so a surrogate key must be used.
|===
==== Injection points
===== Injection points
The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
@ -375,6 +376,42 @@ When the alternative implementations are not available then the default ones are
|===
==== Apache Pulsar
Apache Pulsar is high-perfromance, low-latency server for server-to-server messagging.
Pulsar exposes a REST APIs and a native endopint provides a (not-only) Java client that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|=======================
|Property
|Default
|Description
|[[pulsar-type]]<<pulsar-type, `debezium.sink.type`>>
|
|Must be set to `pulsar`.
|[[pulsar-client]]<<pulsar-client, `debezium.sink.pulsar.client.*`>>
|
|The Pulsar module supports pass-through configuration.
The client http://pulsar.apache.org/docs/en/2.2.0/client-libraries-java/#client-configuration[configuration properties] are passed to the client with the prefix removed.
At least `serviceUrl` must be provided.
|[[pulsar-producer]]<<pulsar-producer, `debezium.sink.pulsar.producer.*`>>
|
|The Pulsar module supports pass-through configuration.
The message producer http://pulsar.apache.org/docs/en/2.2.0/client-libraries-java/#client-configuration[configuration properties] are passed to the producer with the prefix removed.
The `topic` is set by Debezium.
|[[pulsar-null-key]]<<pulsar-null-key, `debezium.sink.pulsar.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pulsar so a surrogate key must be used.
|=======================
== Extensions
{prodname} Server uses the https://quarkus.io/[Quarkus framework] and relies on dependency injection to enable developer to extend its behaviour.