DBZ-2092 Modularization; test documentation

This commit is contained in:
Jiri Pechanec 2020-05-26 06:46:53 +02:00 committed by Gunnar Morling
parent db1ddedb5d
commit 60767a02a0
49 changed files with 1014 additions and 171 deletions

21
debezium-server/README.md Normal file
View File

@ -0,0 +1,21 @@
# Debezium Server
Debezium Server is a standalone Java application built on Qurkus framework.
The application itself contains the `core` module and a set of modules responsible for communication with different target systems.
The per-module integration tests depend on the availability of the external services.
It is thus recommended to execute integration tests per-module and set-up necessary pre-requisities beforehand.
## Amazon Kinesis
* Execute `aws configure` as described in AWS CLI [getting started](https://github.com/aws/aws-cli#getting-started) guide and setup the account.
* Create Kinesis stream `aws kinesis create-stream --stream-name testc.inventory.customers --shard-count 1`
* Build the module and execute the tests `mvn clean install -DskipITs=false -am -pl debezium-server-kinesis`
* Remove the stream `aws kinesis delete-stream --stream-name testc.inventory.customers`
## Google Cloud Pub/Sub
* Login into your Google Cloud account using `gcloud auth application-default login` as described in the [documentation](https://cloud.google.com/sdk/gcloud/reference/auth/application-default).
* Create a new topic `gcloud pubsub topics create testc.inventory.customers`
* Build the module and execute the tests `mvn clean install -DskipITs=false -am -pl debezium-server-pubsub`
* Remove the topic `gcloud pubsub topics delete testc.inventory.customers`

View File

@ -0,0 +1,123 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-core</artifactId>
<name>Debezium Standalone Quarkus Server Core</name>
<packaging>jar</packaging>
<dependencies>
<!-- Quarkus extensions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,85 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.eclipse.microprofile.config.spi.ConfigSource;
import io.debezium.util.Testing;
public class TestConfigSource implements ConfigSource {
public static final String OFFSETS_FILE = "file-connector-offsets.txt";
public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath(OFFSETS_FILE).toAbsolutePath();
public static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
final Map<String, String> integrationTest = new HashMap<>();
final Map<String, String> kinesisTest = new HashMap<>();
final Map<String, String> pubsubTest = new HashMap<>();
final Map<String, String> unitTest = new HashMap<>();
protected Map<String, String> config;
public TestConfigSource() {
integrationTest.put("debezium.sink.type", "test");
integrationTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
integrationTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
integrationTest.put("debezium.source.offset.flush.interval.ms", "0");
integrationTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
integrationTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
integrationTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
integrationTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
integrationTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
integrationTest.put("debezium.source.database.server.name", "testc");
integrationTest.put("debezium.source.schema.whitelist", "inventory");
integrationTest.put("debezium.source.table.whitelist", "inventory.customers");
unitTest.put("debezium.sink.type", "test");
unitTest.put("debezium.source.connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
unitTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
unitTest.put("debezium.source.offset.flush.interval.ms", "0");
unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString());
unitTest.put("debezium.source.topic", "topicX");
unitTest.put("debezium.format.schemas.enable", "true");
unitTest.put("debezium.format.value.schemas.enable", "false");
unitTest.put("debezium.transforms", "hoist");
unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value");
unitTest.put("debezium.transforms.hoist.field", "line");
if (isItTest()) {
config = integrationTest;
}
else {
config = unitTest;
}
}
public static boolean isItTest() {
return "IT".equals(System.getProperty("test.type"));
}
@Override
public Map<String, String> getProperties() {
return config;
}
@Override
public String getValue(String propertyName) {
return config.get(propertyName);
}
@Override
public String getName() {
return "test";
}
public static int waitForSeconds() {
return 60;
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.time.Duration;
import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
/**
* @author Jiri Pechanec
*/
public class TestDatabase {
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT = 5432;
private GenericContainer container;
public void start() {
try {
container = new FixedHostPortGenericContainer(POSTGRES_IMAGE)
.withFixedExposedPort(POSTGRES_PORT, POSTGRES_PORT)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withEnv("POSTGRES_USER", POSTGRES_USER)
.withEnv("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.withEnv("POSTGRES_DB", POSTGRES_DBNAME)
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));
container.start();
}
catch (Exception e) {
Assertions.fail(e);
}
}
public String getIp() {
return POSTGRES_HOST;
}
public int getPort() {
return POSTGRES_PORT;
}
public void stop() {
try {
if (container != null) {
container.stop();
}
}
catch (Exception e) {
// ignored
}
}
}

View File

@ -0,0 +1,96 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-distro</artifactId>
<name>Debezium Server Distribution</name>
<packaging>jar</packaging>
<properties>
<assembly.descriptor>server-distribution</assembly.descriptor>
</properties>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-kinesis</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-pubsub</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.parent.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptors>
<descriptor>src/main/resources/assemblies/${assembly.descriptor}.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -10,7 +10,7 @@
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>${project.artifactId}/lib</outputDirectory>
<outputDirectory>${project.parent.artifactId}/lib</outputDirectory>
<unpack>false</unpack>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
@ -30,9 +30,9 @@
</dependencySets>
<fileSets>
<fileSet>
<!-- Get the files from the top-level directory, which should be above the connectors -->
<directory>${project.basedir}/..</directory>
<outputDirectory>${project.artifactId}</outputDirectory>
<!-- Get the files from the top-level directory -->
<directory>${project.basedir}/../..</directory>
<outputDirectory>${project.parent.artifactId}</outputDirectory>
<includes>
<include>README*</include>
<include>CHANGELOG*</include>
@ -43,15 +43,16 @@
<useDefaultExcludes>true</useDefaultExcludes>
</fileSet>
<fileSet>
<!-- >directory>../${project.parent.artifactId}-core/target</directory-->
<directory>${project.build.directory}</directory>
<outputDirectory>${project.artifactId}</outputDirectory>
<outputDirectory>${project.parent.artifactId}</outputDirectory>
<includes>
<include>${project.artifactId}-${project.version}-runner.jar</include>
<include>*-runner.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.build.sourceDirectory}/../distro</directory>
<outputDirectory>${project.artifactId}</outputDirectory>
<directory>src/main/resources/distro</directory>
<outputDirectory>${project.parent.artifactId}</outputDirectory>
<includes>
<include>**/*</include>
</includes>

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.time.Duration;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
/**
* Integration test that verifies basic reading from PostgreSQL database.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class DebeziumServerIT {
private static final int MESSAGE_COUNT = 4;
protected static TestDatabase db = null;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
static void stop() {
if (db != null) {
db.stop();
}
}
@Inject
DebeziumServer server;
void setupDependencies(@Observes ConnectorStartedEvent event) {
if (!TestConfigSource.isItTest()) {
return;
}
db = new TestDatabase();
db.start();
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}
@Test
public void testPostgres() throws Exception {
Testing.Print.enable();
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
Assertions.assertThat(((String) testConsumer.getValues().get(MESSAGE_COUNT - 1)))
.contains("\"after\":{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"}");
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.Test;
import io.debezium.DebeziumException;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
/**
* Smoke test that verifies the basic functionality of Quarkus-based server.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class DebeziumServerTest {
private static final int MESSAGE_COUNT = 5;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
void setupDependencies(@Observes ConnectorStartedEvent event) {
Testing.Files.delete(TestConfigSource.TEST_FILE_PATH);
Testing.Files.createTestingFile(TestConfigSource.TEST_FILE_PATH);
appendLinesToSource(MESSAGE_COUNT);
Testing.Print.enable();
}
@Inject
DebeziumServer server;
@Test
public void testJson() throws Exception {
final TestConsumer testConsumer = (TestConsumer) server.getConsumer();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> (testConsumer.getValues().size() >= MESSAGE_COUNT));
Assertions.assertThat(testConsumer.getValues().size()).isEqualTo(MESSAGE_COUNT);
Assertions.assertThat(testConsumer.getValues().get(MESSAGE_COUNT - 1)).isEqualTo("{\"line\":\"" + MESSAGE_COUNT + "\"}");
}
static void appendLinesToSource(int numberOfLines) {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(i + 1);
}
try {
java.nio.file.Files.write(TestConfigSource.TEST_FILE_PATH, Collect.arrayListOf(lines), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
}
catch (IOException e) {
throw new DebeziumException(e);
}
}
static String generateLine(int lineNumber) {
return Integer.toString(lineNumber);
}
}

View File

@ -24,7 +24,7 @@ public class TestConfigSource implements ConfigSource {
final Map<String, String> kinesisTest = new HashMap<>();
final Map<String, String> pubsubTest = new HashMap<>();
final Map<String, String> unitTest = new HashMap<>();
final Map<String, String> config;
protected Map<String, String> config;
public TestConfigSource() {
pubsubTest.put("debezium.sink.type", "pubsub");

View File

@ -0,0 +1,57 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.util.Testing;
@Dependent
@Named("test")
public class TestConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
final List<Object> values = Collections.synchronizedList(new ArrayList<>());
@PostConstruct
void init() {
Testing.print("Test consumer constructed");
}
@PreDestroy
void close() {
Testing.print("Test consumer destroyed");
}
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
records.forEach(record -> {
Testing.print(record);
values.add(record.value());
try {
committer.markProcessed(record);
}
catch (InterruptedException e) {
throw new DebeziumException(e);
}
});
}
public List<Object> getValues() {
return values;
}
}

View File

@ -16,12 +16,12 @@
*/
public class TestDatabase {
static final String POSTGRES_USER = "postgres";
static final String POSTGRES_PASSWORD = "postgres";
static final String POSTGRES_DBNAME = "postgres";
static final String POSTGRES_IMAGE = "debezium/example-postgres";
static final String POSTGRES_HOST = "localhost";
static final Integer POSTGRES_PORT = 5432;
public static final String POSTGRES_USER = "postgres";
public static final String POSTGRES_PASSWORD = "postgres";
public static final String POSTGRES_DBNAME = "postgres";
public static final String POSTGRES_IMAGE = "debezium/example-postgres";
public static final String POSTGRES_HOST = "localhost";
public static final Integer POSTGRES_PORT = 5432;
private GenericContainer container;

View File

@ -0,0 +1,12 @@
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=DEBUG, stdout
log4j.additivity.io.debezium=false

View File

@ -0,0 +1,115 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-kinesis</artifactId>
<name>Debezium Standalone Quarkus Server Amazon Kinesis Sender</name>
<packaging>jar</packaging>
<properties>
<skipITs>true</skipITs>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
package io.debezium.server.kinesis;
import java.time.Duration;
import java.util.ArrayList;
@ -16,6 +16,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestDatabase;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
@ -47,7 +49,8 @@ public class KinesisIT {
protected static KinesisClient kinesis = null;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
Testing.Files.delete(KinesisTestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(KinesisTestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
@ -61,12 +64,8 @@ static void stop() {
DebeziumServer server;
void setupDependencies(@Observes ConnectorStartedEvent event) {
if (!TestConfigSource.isKinesisTest()) {
return;
}
kinesis = KinesisClient.builder()
.region(Region.of(TestConfigSource.KINESIS_REGION))
.region(Region.of(KinesisTestConfigSource.KINESIS_REGION))
.credentialsProvider(ProfileCredentialsProvider.create("default"))
.build();
@ -82,9 +81,6 @@ void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exceptio
@Test
public void testKinesis() throws Exception {
if (!TestConfigSource.isKinesisTest()) {
return;
}
Testing.Print.enable();
final GetShardIteratorResponse iteratorResponse = kinesis.getShardIterator(GetShardIteratorRequest.builder()
.streamName(STREAM_NAME)
@ -92,10 +88,10 @@ public void testKinesis() throws Exception {
.shardId("0")
.build());
final List<Record> records = new ArrayList<>();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
Awaitility.await().atMost(Duration.ofSeconds(KinesisTestConfigSource.waitForSeconds())).until(() -> {
final GetRecordsResponse recordsResponse = kinesis.getRecords(GetRecordsRequest.builder()
.shardIterator(iteratorResponse.shardIterator())
.limit(4)
.limit(MESSAGE_COUNT)
.build());
records.addAll(recordsResponse.records());
return records.size() >= MESSAGE_COUNT;

View File

@ -0,0 +1,39 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.kinesis;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
public class KinesisTestConfigSource extends TestConfigSource {
public static final String KINESIS_REGION = "eu-central-1";
final Map<String, String> kinesisTest = new HashMap<>();
public KinesisTestConfigSource() {
kinesisTest.put("debezium.sink.type", "kinesis");
kinesisTest.put("debezium.sink.kinesis.region", KINESIS_REGION);
kinesisTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
kinesisTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
kinesisTest.put("debezium.source.offset.flush.interval.ms", "0");
kinesisTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
kinesisTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
kinesisTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
kinesisTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
kinesisTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
kinesisTest.put("debezium.source.database.server.name", "testc");
kinesisTest.put("debezium.source.schema.whitelist", "inventory");
kinesisTest.put("debezium.source.table.whitelist", "inventory.customers");
config = kinesisTest;
}
}

View File

@ -0,0 +1 @@
io.debezium.server.kinesis.KinesisTestConfigSource

View File

@ -0,0 +1,12 @@
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=DEBUG, stdout
log4j.additivity.io.debezium=false

View File

@ -0,0 +1,129 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-server</artifactId>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server-pubsub</artifactId>
<name>Debezium Standalone Quarkus Server Google Cloud Pub/Sub Sender</name>
<packaging>jar</packaging>
<properties>
<skipITs>true</skipITs>
</properties>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
</project>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
package io.debezium.server.pubsub;
import java.io.IOException;
import java.time.Duration;
@ -29,6 +29,9 @@
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.TopicName;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
@ -53,6 +56,7 @@ public class PubSubIT {
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
Testing.Files.createTestingFile(PubSubTestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
@ -86,10 +90,6 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
}
void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
if (!TestConfigSource.isPubSubTest()) {
return;
}
Testing.Print.enable();
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
@ -114,9 +114,6 @@ void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exceptio
@Test
public void testPubSub() throws Exception {
if (!TestConfigSource.isPubSubTest()) {
return;
}
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> messages.size() >= MESSAGE_COUNT);
Assertions.assertThat(messages.size() >= MESSAGE_COUNT);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.pubsub;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import io.debezium.server.TestConfigSource;
import io.debezium.server.TestDatabase;
public class PubSubTestConfigSource extends TestConfigSource {
final Map<String, String> pubsubTest = new HashMap<>();
public PubSubTestConfigSource() {
pubsubTest.put("debezium.sink.type", "pubsub");
pubsubTest.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
pubsubTest.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
pubsubTest.put("debezium.source.offset.flush.interval.ms", "0");
pubsubTest.put("debezium.source.database.hostname", TestDatabase.POSTGRES_HOST);
pubsubTest.put("debezium.source.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
pubsubTest.put("debezium.source.database.user", TestDatabase.POSTGRES_USER);
pubsubTest.put("debezium.source.database.password", TestDatabase.POSTGRES_PASSWORD);
pubsubTest.put("debezium.source.database.dbname", TestDatabase.POSTGRES_DBNAME);
pubsubTest.put("debezium.source.database.server.name", "testc");
pubsubTest.put("debezium.source.schema.whitelist", "inventory");
pubsubTest.put("debezium.source.table.whitelist", "inventory.customers");
config = pubsubTest;
}
}

View File

@ -0,0 +1 @@
io.debezium.server.pubsub.PubSubTestConfigSource

View File

@ -0,0 +1,12 @@
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=DEBUG, stdout
log4j.additivity.io.debezium=false

View File

@ -9,16 +9,20 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-server</artifactId>
<name>Debezium Standalone Quarkus Server</name>
<packaging>jar</packaging>
<packaging>pom</packaging>
<properties>
<version.kinesis>2.13.13</version.kinesis>
<version.pubsub>5.1.0</version.pubsub>
<!-- Assembly configuration -->
<assembly.descriptor>server-distribution</assembly.descriptor>
</properties>
<modules>
<module>debezium-server-core</module>
<module>debezium-server-kinesis</module>
<module>debezium-server-pubsub</module>
<module>debezium-server-distro</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
@ -29,6 +33,29 @@
<scope>import</scope>
</dependency>
<!-- Debezium dependencies -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-kinesis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-pubsub</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-server-core</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
@ -75,23 +102,6 @@
</exclusions>
</dependency>
<!-- Target systems -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>io.quarkus</groupId>
@ -130,118 +140,4 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.version}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>integration-test</goal>
</goals>
</execution>
<execution>
<id>verify</id>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemProperties>
<test.type>IT</test.type>
</systemProperties>
</configuration>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/build.properties</include>
</includes>
</resource>
</resources>
</build>
<!--
Define several useful profiles
-->
<profiles>
<profile>
<id>assembly</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-sqlserver</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${version.assembly.plugin}</version>
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-assembly-descriptors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>default</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
<attach>true</attach> <!-- we want attach & deploy these to Maven -->
<descriptorRefs>
<descriptorRef>${assembly.descriptor}</descriptorRef>
</descriptorRefs>
<tarLongFileMode>posix</tarLongFileMode>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -342,7 +342,7 @@ Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is us
|[[pubsub-ordering]]<<pubsub-ordering, `debezium.sink.pubsub.ordering.enabled`>>
|`true`
|Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the same order as were sent for messages with the same order key.
|Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the https://googleapis.dev/java/google-api-grpc/latest/com/google/pubsub/v1/PubsubMessage.Builder.html#setOrderingKey-java.lang.String-[same order] as were sent for messages with the same order key.
This feature can be disabled.
|[[pubsub-null-key]]<<pubsub-null-key, `debezium.sink.pubsub.null.key`>>