DBZ-651 Rename of module and classes; Kinesis test

This commit is contained in:
Jiri Pechanec 2020-05-12 10:36:55 +02:00 committed by Gunnar Morling
parent 2b4c1889f8
commit 342c8b9c90
25 changed files with 169 additions and 39 deletions

View File

@ -12,7 +12,7 @@ env:
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=4.0 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-connector-mongodb -am -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dversion.mongo.server=3.2 -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-quarkus-outbox,debezium-standalone-quarkus -U -am -amd -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-quarkus-outbox,debezium-server -U -am -amd -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
- MAVEN_CLI: '"clean install -B -pl debezium-testing -am -amd -Passembly -Dformat.formatter.goal=validate -Dformat.imports.goal=check -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"'
sudo: required

View File

@ -7,7 +7,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-standalone-quarkus</artifactId>
<artifactId>debezium-server</artifactId>
<name>Debezium Standalone Quarkus Server</name>
<packaging>jar</packaging>

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Event;
@ -16,11 +16,11 @@
import org.slf4j.LoggerFactory;
import io.debezium.engine.DebeziumEngine;
import io.debezium.standalone.quarkus.events.ConnectorCompletedEvent;
import io.debezium.standalone.quarkus.events.ConnectorStartedEvent;
import io.debezium.standalone.quarkus.events.ConnectorStoppedEvent;
import io.debezium.standalone.quarkus.events.TaskStartedEvent;
import io.debezium.standalone.quarkus.events.TaskStoppedEvent;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.events.ConnectorStoppedEvent;
import io.debezium.server.events.TaskStartedEvent;
import io.debezium.server.events.TaskStoppedEvent;
/**
* The server lifecycle listener that published CDI events based on the lifecycle changes and also provides

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.util.Properties;
import java.util.Set;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.events;
package io.debezium.server.events;
import java.util.Optional;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.events;
package io.debezium.server.events;
/**
* Fired when the connector is started. The initialization is completed but the execution task

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.events;
package io.debezium.server.events;
/**
* Fired when the connector is stopped but the final execution completion state is not yet determined.

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.events;
package io.debezium.server.events;
/**
* Fired right after the connector execution code is started.

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.events;
package io.debezium.server.events;
/**
* Fired right after the connector execution code is stopped.

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.kinesis;
package io.debezium.server.kinesis;
import java.util.List;
@ -24,7 +24,7 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.standalone.quarkus.CustomConsumerBuilder;
import io.debezium.server.CustomConsumerBuilder;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
@ -40,9 +40,9 @@
*/
@Named("kinesis")
@Dependent
public class ChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
public class KinesisChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeConsumer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisChangeConsumer.class);
private static final String PROP_PREFIX = "kinesis.";
private static final String PROP_REGION_NAME = PROP_PREFIX + "region";

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.kinesis;
package io.debezium.server.kinesis;
/**
* Transforms the name of the record destination to the Kinesis stream name.

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus.pubsub;
package io.debezium.server.pubsub;
import java.util.List;
@ -22,7 +22,7 @@
*/
@Named("pub-sub")
@Dependent
public class ChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<?, ?>> {
public class PubSubChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<?, ?>> {
@Override
public void handleBatch(List<ChangeEvent<?, ?>> records, RecordCommitter<ChangeEvent<?, ?>> committer)

View File

@ -0,0 +1,104 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
/**
* Integration test that verifies basic reading from PostgreSQL database and writing to Kinesis stream.
*
* @author Jiri Pechanec
*/
@QuarkusTest
public class KinesisIT {
private static final int MESSAGE_COUNT = 4;
// The stream of this name must exist and be empty
private static final String STREAM_NAME = "testc.inventory.customers";
protected static TestDatabase db = null;
protected static KinesisClient kinesis = null;
{
Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
}
@AfterAll
static void stop() {
if (db != null) {
db.stop();
}
}
@Inject
Server server;
void setupDependencies(@Observes ConnectorStartedEvent event) {
if (!TestConfigSource.isE2eTest()) {
return;
}
kinesis = KinesisClient.builder()
.region(Region.of(TestConfigSource.KINESIS_REGION))
.credentialsProvider(ProfileCredentialsProvider.create("default"))
.build();
db = new TestDatabase();
db.start();
}
void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
if (!event.isSuccess()) {
throw (Exception) event.getError().get();
}
}
@Test
public void testKinesis() throws Exception {
if (!TestConfigSource.isE2eTest()) {
return;
}
Testing.Print.enable();
final GetShardIteratorResponse iteratorResponse = kinesis.getShardIterator(GetShardIteratorRequest.builder()
.streamName(STREAM_NAME)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.shardId("0")
.build());
final List<Record> records = new ArrayList<>();
Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
final GetRecordsResponse recordsResponse = kinesis.getRecords(GetRecordsRequest.builder()
.shardIterator(iteratorResponse.shardIterator())
.limit(4)
.build());
records.addAll(recordsResponse.records());
return records.size() >= MESSAGE_COUNT;
});
}
}

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.time.Duration;
@ -15,8 +15,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import io.debezium.standalone.quarkus.events.ConnectorCompletedEvent;
import io.debezium.standalone.quarkus.events.ConnectorStartedEvent;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -18,7 +18,7 @@
import org.junit.jupiter.api.Test;
import io.debezium.DebeziumException;
import io.debezium.standalone.quarkus.events.ConnectorStartedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import io.quarkus.test.junit.QuarkusTest;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.nio.file.Path;
import java.util.HashMap;
@ -16,14 +16,30 @@
public class TestConfigSource implements ConfigSource {
public static final String KINESIS_REGION = "eu-central-1";
public static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath("file-connector-offsets.txt").toAbsolutePath();
public static final Path TEST_FILE_PATH = Testing.Files.createTestingPath("file-connector-input.txt").toAbsolutePath();
final Map<String, String> integrationTest = new HashMap<>();
final Map<String, String> e2eTest = new HashMap<>();
final Map<String, String> unitTest = new HashMap<>();
final Map<String, String> config;
public TestConfigSource() {
e2eTest.put("debezium.consumer", "kinesis");
e2eTest.put("kinesis.region", KINESIS_REGION);
e2eTest.put("debezium.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
e2eTest.put("debezium." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
e2eTest.put("debezium.offset.flush.interval.ms", "0");
e2eTest.put("debezium.database.hostname", TestDatabase.POSTGRES_HOST);
e2eTest.put("debezium.database.port", Integer.toString(TestDatabase.POSTGRES_PORT));
e2eTest.put("debezium.database.user", TestDatabase.POSTGRES_USER);
e2eTest.put("debezium.database.password", TestDatabase.POSTGRES_PASSWORD);
e2eTest.put("debezium.database.dbname", TestDatabase.POSTGRES_DBNAME);
e2eTest.put("debezium.database.server.name", "testc");
e2eTest.put("debezium.schema.whitelist", "inventory");
e2eTest.put("debezium.table.whitelist", "inventory.customers");
integrationTest.put("debezium.consumer", "test");
integrationTest.put("debezium.connector.class", "io.debezium.connector.postgresql.PostgresConnector");
integrationTest.put("debezium." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
@ -48,13 +64,17 @@ public TestConfigSource() {
unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value");
unitTest.put("debezium.transforms.hoist.field", "line");
config = isItTest() ? integrationTest : unitTest;
config = isItTest() ? integrationTest : (isE2eTest() ? e2eTest : unitTest);
}
public static boolean isItTest() {
return "IT".equals(System.getProperty("test.type"));
}
public static boolean isE2eTest() {
return "E2E".equals(System.getProperty("test.type"));
}
@Override
public Map<String, String> getProperties() {
return config;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.util.ArrayList;
import java.util.Collections;

View File

@ -3,7 +3,7 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.standalone.quarkus;
package io.debezium.server;
import java.time.Duration;

View File

@ -0,0 +1 @@
io.debezium.server.TestConfigSource

View File

@ -0,0 +1,12 @@
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %m (%c)%n
# Root logger option
log4j.rootLogger=INFO, stdout
# Set up the default logging to be INFO level, then override specific units
log4j.logger.io.debezium=DEBUG, stdout
log4j.additivity.io.debezium=false

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://xmlns.jcp.org/xml/ns/javaee
http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd" bean-discovery-mode="all">
</beans>

View File

@ -1 +0,0 @@
io.debezium.standalone.quarkus.TestConfigSource

View File

@ -172,7 +172,7 @@
<module>debezium-connector-sqlserver</module>
<module>debezium-microbenchmark</module>
<module>debezium-quarkus-outbox</module>
<module>debezium-standalone-quarkus</module>
<module>debezium-server</module>
<module>debezium-testing</module>
</modules>