DBZ-1722 Adding DebeziumContainer support

This commit is contained in:
Alex Soto 2020-01-15 23:30:40 +01:00 committed by Gunnar Morling
parent 5d7ae4a46c
commit 3304e24b80
11 changed files with 636 additions and 0 deletions

View File

@ -0,0 +1,62 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-testcontainers-testing</artifactId>
<name>Debezium Testcontainers Testing</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,63 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Class that represents the config element of the configuration document.
*/
public class Configuration {
private final ObjectMapper mapper = new ObjectMapper();
private final ObjectNode configNode;
protected Configuration() {
this.configNode = this.mapper.createObjectNode();
}
public static Configuration create() {
return new Configuration();
}
static Configuration from(JsonNode configNode) {
final Configuration configuration = new Configuration();
configNode.fields().forEachRemaining(e -> configuration.configNode.set(e.getKey(), e.getValue()));
return configuration;
}
public Configuration with(String key, String value) {
this.configNode.put(key, value);
return this;
}
public Configuration with(String key, Integer value) {
this.configNode.put(key, value);
return this;
}
public Configuration with(String key, Long value) {
this.configNode.put(key, value);
return this;
}
public Configuration with(String key, Boolean value) {
this.configNode.put(key, value);
return this;
}
public Configuration with(String key, Double value) {
this.configNode.put(key, value);
return this;
}
ObjectNode getConfiguration() {
return configNode;
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import java.io.IOException;
import java.io.InputStream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Class that represents the configuration document.
*/
public class Connector {
private static final String NAME = "name";
private static final String CONFIGURATION = "config";
private String name;
private Configuration configuration;
private final static ObjectMapper mapper = new ObjectMapper();
private Connector(final String name, final Configuration configuration) {
this.name = name;
this.configuration = configuration;
}
/**
* Loads configuration values from Debezium JSON configuration file.
* @param inputStream of JSON configuration file.
* @return Connector configuration.
*/
public static Connector fromJson(final InputStream inputStream) {
try {
final ObjectNode connectorConfiguration = mapper.readValue(inputStream, ObjectNode.class);
final String name = connectorConfiguration.get("name").asText();
final Configuration config = Configuration.from(connectorConfiguration.get("config"));
return new Connector(name, config);
}
catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
public static Connector from(String name, Configuration configuration) {
return new Connector(name, configuration);
}
public void appendOrOverrideConfiguration(Configuration newConfiguration) {
final ObjectNode configurationNode = this.configuration.getConfiguration();
final ObjectNode newConfigurationNode = newConfiguration.getConfiguration();
newConfigurationNode.fields().forEachRemaining(e -> configurationNode.set(e.getKey(), e.getValue()));
}
public String toJson() {
final JsonNode conf = mapper.valueToTree(this.configuration.getConfiguration());
final ObjectNode connector = mapper.createObjectNode();
connector.put(NAME, this.name);
connector.set(CONFIGURATION, conf);
try {
return mapper.writeValueAsString(connector);
}
catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}
public String getName() {
return this.name;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import java.util.HashMap;
import java.util.Map;
/**
* Class that resolves the connector class from JDBC driver.
*/
public class ConnectorResolver {
private static final Map<String, String> driverConnector = new HashMap<>();
static {
driverConnector.put("org.postgresql.Driver", "io.debezium.connector.postgresql.PostgresConnector");
driverConnector.put("com.mysql.cj.jdbc.Driver", "io.debezium.connector.mysql.MySqlConnector");
driverConnector.put("com.mysql.jdbc.Driver", "io.debezium.connector.mysql.MySqlConnector");
driverConnector.put("com.microsoft.sqlserver.jdbc.SQLServerDriver",
"io.debezium.connector.sqlserver.SqlServerConnector");
driverConnector.put("oracle.jdbc.OracleDriver", "io.debezium.connector.oracle.OracleConnector");
}
public static String getConnectorByJdbcDriver(String jdbcDriver) {
if (driverConnector.containsKey(jdbcDriver)) {
return driverConnector.get(jdbcDriver);
}
throw new IllegalArgumentException(String.format("%s JDBC driver is passed but only %s are supported.", jdbcDriver, driverConnector.keySet().toString()));
}
}

View File

@ -0,0 +1,111 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
/**
* Debezium Container main class.
*/
public class DebeziumContainer extends GenericContainer<DebeziumContainer> {
private final OkHttpClient client = new OkHttpClient();
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
public DebeziumContainer(final String version) {
super("debezium/connect:" + version);
setWaitStrategy(
Wait.forHttp("/connectors")
.forPort(8083)
.forStatusCode(200));
withEnv("GROUP_ID", "1");
withEnv("CONFIG_STORAGE_TOPIC", "debezium_connect_config");
withEnv("OFFSET_STORAGE_TOPIC", "debezium_connect_offsets");
withEnv("STATUS_STORAGE_TOPIC", "debezium_connect_status");
withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "false");
withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false");
withExposedPorts(8083);
}
public DebeziumContainer(final String version, final KafkaContainer kafkaContainer) {
this(version);
withKafka(kafkaContainer);
}
public DebeziumContainer withKafka(final KafkaContainer kafkaContainer) {
return withKafka(kafkaContainer.getNetwork(), kafkaContainer.getNetworkAliases().get(0) + ":9092");
}
public DebeziumContainer withKafka(final Network network, final String bootstrapServers) {
withNetwork(network);
withEnv("BOOTSTRAP_SERVERS", bootstrapServers);
return self();
}
public void registerConnector(final Connector connector) throws IOException {
this.registerConnector(connector.getName(), connector.toJson());
}
public void registerConnector(final String connectorName, final String payload) throws IOException {
final String fullUrl = "http://" + getTarget() + "/connectors/";
registerConnectorToDebezium(payload, fullUrl);
// To avoid a 409 error code meanwhile connector is being configured.
// This is just a guard, probably in most of use cases you won't need that as preparation time of the test might be enough to configure connector.
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> isConnectorConfigured(connectorName));
}
private void registerConnectorToDebezium(final String payload, final String fullUrl) throws IOException {
final RequestBody body = RequestBody.create(payload, JSON);
final Request request = new Request.Builder().url(fullUrl).post(body).build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful())
throw new IOException("Unexpected code " + response + "Message: " + response.body().string());
}
}
private boolean isConnectorConfigured(String connectorName) throws IOException {
final Request request = new Request.Builder()
.url("http://" + getConnectorTarget(connectorName))
.build();
try (Response response = client.newCall(request).execute()) {
return response.isSuccessful();
}
}
public String getConnectors() {
return getTarget() + "/connectors/";
}
public String getConnectorTarget(String connectorName) {
return getConnectors() + connectorName;
}
public String getTarget() {
return getContainerIpAddress() + ":" + getMappedPort(8083);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import java.util.List;
import org.testcontainers.containers.JdbcDatabaseContainer;
/**
* Specific configuration class that is auto-configured with given JDBCDatabaseContainer.
*/
public class JdbcConfiguration extends Configuration {
private static final String CONNECTOR = "connector.class";
private static final String HOSTNAME = "database.hostname";
private static final String PORT = "database.port";
private static final String USER = "database.user";
private static final String PASSWORD = "database.password";
private static final String DBNAME = "database.dbname";
public static JdbcConfiguration fromJdbcContainer(JdbcDatabaseContainer<?> jdbcDatabaseContainer) {
final JdbcConfiguration jdbcConfiguration = new JdbcConfiguration();
jdbcConfiguration.with(HOSTNAME, jdbcDatabaseContainer.getContainerInfo().getConfig().getHostName());
final List<Integer> exposedPorts = jdbcDatabaseContainer.getExposedPorts();
jdbcConfiguration.with(PORT, exposedPorts.get(0));
jdbcConfiguration.with(USER, jdbcDatabaseContainer.getUsername());
jdbcConfiguration.with(PASSWORD, jdbcDatabaseContainer.getPassword());
final String driverClassName = jdbcDatabaseContainer.getDriverClassName();
jdbcConfiguration.with(CONNECTOR, ConnectorResolver.getConnectorByJdbcDriver(driverClassName));
// This property is valid for all databases except MySQL
if (!isMySQL(driverClassName)) {
jdbcConfiguration.with(DBNAME, jdbcDatabaseContainer.getDatabaseName());
}
return jdbcConfiguration;
}
private static boolean isMySQL(String driverClassName) {
return "com.mysql.cj.jdbc.Driver".equals(driverClassName) || "com.mysql.jdbc.Driver".equals(driverClassName);
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import static org.fest.assertions.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.junit.Test;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerConfig;
public class ConnectorConfigurationTest {
private ObjectMapper mapper = new ObjectMapper();
@Test
public void shouldSerializeConnectorConfiguration() throws IOException {
final Configuration configuration = Configuration.create();
configuration.with("connector.class", "foo");
configuration.with("database.hostname", "bar");
final Connector connector = Connector.from("myconnector", configuration);
final String json = connector.toJson();
final ObjectNode connectionNode = mapper.readValue(json, ObjectNode.class);
assertThat(connectionNode.get("name").asText()).isEqualTo("myconnector");
final JsonNode configNode = connectionNode.get("config");
assertThat(configNode.get("connector.class").asText()).isEqualTo("foo");
assertThat(configNode.get("database.hostname").asText()).isEqualTo("bar");
}
@Test
public void shouldLoadConnectorConfigurationFromFile() throws IOException {
final InputStream configFile = ConnectorConfigurationTest.class.getClassLoader().getResourceAsStream("config.json");
final Connector connector = Connector.fromJson(configFile);
final String json = connector.toJson();
final ObjectNode connectionNode = mapper.readValue(json, ObjectNode.class);
assertThat(connectionNode.get("name").asText()).isEqualTo("inventory-connector");
final JsonNode configNode = connectionNode.get("config");
assertThat(configNode.get("connector.class").asText()).isEqualTo("io.debezium.connector.mysql.MySqlConnector");
assertThat(configNode.get("database.hostname").asText()).isEqualTo("192.168.99.100");
}
@Test
public void shouldOverrideConfigurationFromJdbcContainer() throws IOException {
final ContainerConfig containerConfig = mock(ContainerConfig.class);
when(containerConfig.getHostName()).thenReturn("localhost");
final InspectContainerResponse inspectContainerResponse = mock(InspectContainerResponse.class);
when(inspectContainerResponse.getConfig()).thenReturn(containerConfig);
final JdbcDatabaseContainer<?> jdbcDatabaseContainer = mock(JdbcDatabaseContainer.class);
when(jdbcDatabaseContainer.getDriverClassName()).thenReturn("org.postgresql.Driver");
when(jdbcDatabaseContainer.getDatabaseName()).thenReturn("db");
when(jdbcDatabaseContainer.getPassword()).thenReturn("");
when(jdbcDatabaseContainer.getUsername()).thenReturn("");
when(jdbcDatabaseContainer.getExposedPorts()).thenReturn(Arrays.asList(9090));
when(jdbcDatabaseContainer.getContainerInfo()).thenReturn(inspectContainerResponse);
final InputStream configFile = ConnectorConfigurationTest.class.getClassLoader().getResourceAsStream("config.json");
final Connector connector = Connector.fromJson(configFile);
connector.appendOrOverrideConfiguration(JdbcConfiguration.fromJdbcContainer(jdbcDatabaseContainer));
final String json = connector.toJson();
final ObjectNode connectionNode = mapper.readValue(json, ObjectNode.class);
assertThat(connectionNode.get("name").asText()).isEqualTo("inventory-connector");
final JsonNode configNode = connectionNode.get("config");
assertThat(configNode.get("connector.class").asText()).isEqualTo("io.debezium.connector.postgresql.PostgresConnector");
assertThat(configNode.get("database.hostname").asText()).isEqualTo("localhost");
assertThat(configNode.get("database.dbname").asText()).isEqualTo("db");
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import static org.fest.assertions.Assertions.assertThat;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
public class DebeziumContainerTest {
private static PostgreSQLContainer postgreContainer = new PostgreSQLContainer<>("debezium/postgres:11");
private static KafkaContainer kafkaContainer = new KafkaContainer("5.3.1");
private static DebeziumContainer debeziumContainer = new DebeziumContainer("1.0.0.Final");
@BeforeClass
public static void initializeInfrastructure() {
kafkaContainer.start();
postgreContainer.withNetwork(kafkaContainer.getNetwork()).start();
debeziumContainer.withKafka(kafkaContainer).start();
}
@AfterClass
public static void stopInfrastructure() {
debeziumContainer.stop();
postgreContainer.stop();
kafkaContainer.stop();
}
@Test
public void shouldRegisterPostgreSQLConnector() throws IOException {
final Configuration configuration = JdbcConfiguration.fromJdbcContainer(postgreContainer);
configuration.with("tasks.max", 1);
configuration.with("database.server.name", "dbserver1");
configuration.with("table.whitelist", "public.outbox");
final Connector connector = Connector.from("my-connector", configuration);
debeziumContainer.registerConnector(connector);
final String connectorInfo = readConnector(debeziumContainer.getConnectorTarget("my-connector"));
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode connectorInfoNode = mapper.readValue(connectorInfo, ObjectNode.class);
final ArrayNode tasks = (ArrayNode) connectorInfoNode.get("tasks");
assertThat(tasks.size()).isEqualTo(1);
final JsonNode expectedTask = tasks.get(0);
assertThat(expectedTask.get("connector").asText()).isEqualTo("my-connector");
}
private String readConnector(String url) throws IOException {
final OkHttpClient client = new OkHttpClient();
final Request request = new Request.Builder().url("http://" + url).build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}

View File

@ -0,0 +1,7 @@
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "192.168.99.100"
}
}

16
debezium-testing/pom.xml Normal file
View File

@ -0,0 +1,16 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-testing</artifactId>
<name>Debezium Testing</name>
<packaging>pom</packaging>
<modules>
<module>debezium-testcontainers-testing</module>
</modules>
</project>

32
pom.xml
View File

@ -83,6 +83,8 @@
<!-- Quarkus --> <!-- Quarkus -->
<quarkus.version>1.1.1.Final</quarkus.version> <quarkus.version>1.1.1.Final</quarkus.version>
<!-- HTTP client -->
<version.okhttp>4.2.2</version.okhttp>
<!-- Testing --> <!-- Testing -->
<version.junit>4.12</version.junit> <version.junit>4.12</version.junit>
@ -90,6 +92,7 @@
<version.jmh>1.21</version.jmh> <version.jmh>1.21</version.jmh>
<version.mockito>3.0.0</version.mockito> <version.mockito>3.0.0</version.mockito>
<version.awaitility>3.1.6</version.awaitility> <version.awaitility>3.1.6</version.awaitility>
<version.testcontainers>1.12.3</version.testcontainers>
<!-- Maven Plugins --> <!-- Maven Plugins -->
<version.compiler.plugin>3.8.1</version.compiler.plugin> <version.compiler.plugin>3.8.1</version.compiler.plugin>
@ -158,6 +161,7 @@
<module>debezium-connector-sqlserver</module> <module>debezium-connector-sqlserver</module>
<module>debezium-microbenchmark</module> <module>debezium-microbenchmark</module>
<module>debezium-quarkus-outbox</module> <module>debezium-quarkus-outbox</module>
<module>debezium-testing</module>
</modules> </modules>
<distributionManagement> <distributionManagement>
@ -343,6 +347,34 @@
<version>${version.log4j}</version> <version>${version.log4j}</version>
</dependency> </dependency>
<!-- Testing utilities -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${version.testcontainers}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${version.testcontainers}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>${version.testcontainers}</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>${version.okhttp}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<!-- Test dependencies --> <!-- Test dependencies -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>