DBZ-2764 Centralize Postgres image for Testcontainers

This commit is contained in:
ani-sha 2020-12-14 12:19:31 +05:30 committed by Gunnar Morling
parent d9d00e8282
commit bd427786e9
6 changed files with 100 additions and 121 deletions

View File

@ -15,6 +15,8 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import io.debezium.config.CommonConnectorConfig;
@ -27,8 +29,10 @@
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.testing.testcontainers.PostgresInfrastructure;
import io.debezium.util.Testing;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
/**
* Integration test for {@link PostgresConnector} using an {@link EmbeddedEngine} and Testcontainers infrastructure for when Postgres is shutdown during streaming
@ -39,6 +43,11 @@ public class PostgresShutdownIT extends AbstractConnectorTest {
* Specific tests that need to extend the initial DDL set should do it in a form of
* TestHelper.execute(SETUP_TABLES_STMT + ADDITIONAL_STATEMENTS)
*/
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresShutdownIT.class);
private static final String POSTGRES_IMAGE = "debezium/example-postgres:1.3";
private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);" +
"INSERT INTO s2.a (aa) VALUES (1);";
private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
@ -51,15 +60,23 @@ public class PostgresShutdownIT extends AbstractConnectorTest {
"INSERT INTO s1.heartbeat (ts) VALUES (NOW());";
private static final String SETUP_TABLES_STMT = CREATE_TABLES_STMT + INSERT_STMT;
private PostgresInfrastructure infrastructure;
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE)
.asCompatibleSubstituteFor("postgres");
public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME)
.withDatabaseName("postgres")
.withUsername("postgres")
.withPassword("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withNetworkAliases("postgres");
private String oldContainerPort;
@Before
public void setUp() {
infrastructure = PostgresInfrastructure.getDebeziumPostgresInfrastructure();
infrastructure.startContainer();
postgresContainer.start();
oldContainerPort = System.getProperty("database.port", "5432");
System.setProperty("database.port", String.valueOf(infrastructure.getPostgresContainer().getMappedPort(5432)));
System.setProperty("database.port", String.valueOf(postgresContainer.getMappedPort(5432)));
try {
TestHelper.dropAllSchemas();
}
@ -74,7 +91,7 @@ public void tearDown() {
stopConnector();
TestHelper.dropDefaultReplicationSlot();
TestHelper.dropPublication();
infrastructure.getPostgresContainer().stop();
postgresContainer.stop();
System.setProperty("database.port", oldContainerPort);
}
@ -88,7 +105,7 @@ public void shouldStopOnPostgresFastShutdown() throws Exception {
TestHelper.execute(INSERT_STMT);
}
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT, infrastructure.getPostgresContainer().getMappedPort(5432))
.with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT, postgresContainer.getMappedPort(5432))
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
@ -119,7 +136,7 @@ public void shouldStopOnPostgresFastShutdown() throws Exception {
postgresConnection.singleResultMapper(rs -> rs.getString("ts"), "Could not fetch keepalive info")));
logger.info("Execute Postgres shutdown...");
Container.ExecResult result = infrastructure.getPostgresContainer()
Container.ExecResult result = postgresContainer
.execInContainer("su", "-", "postgres", "-c", "/usr/lib/postgresql/11/bin/pg_ctl -m fast -D /var/lib/postgresql/data stop");
logger.info(result.toString());
@ -133,7 +150,7 @@ private void waitForPostgresShutdown() {
Awaitility.await()
.pollInterval(200, TimeUnit.MILLISECONDS)
.atMost(60 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS)
.until(() -> !infrastructure.getPostgresContainer().isRunning());
.until(() -> !postgresContainer.isRunning());
}
}

View File

@ -35,6 +35,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
@ -94,6 +101,11 @@
<artifactId>debezium-quarkus-outbox-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -10,39 +10,36 @@
import java.util.Map;
import org.testcontainers.containers.PostgreSQLContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
/**
* @author Chris Cranford
*/
public class DatabaseTestResource implements QuarkusTestResourceLifecycleManager {
private static final String POSTGRES_USER = "postgres";
private static final String POSTGRES_PASSWORD = "postgres";
private static final String POSTGRES_DBNAME = "postgres";
private static final String POSTGRES_IMAGE = "debezium/postgres:9.6";
private static final String POSTGRES_IMAGE = "debezium/postgres:11";
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE)
.asCompatibleSubstituteFor("postgres");
private static PostgreSQLContainer<?> container;
private static PostgreSQLContainer<?> postgresContainer;
@Override
public Map<String, String> start() {
try {
container = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME)
postgresContainer = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME)
.waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2))
.withUsername(POSTGRES_USER)
.withPassword(POSTGRES_PASSWORD)
.withDatabaseName(POSTGRES_DBNAME)
.withUsername("postgres")
.withPassword("postgres")
.withDatabaseName("postgres")
.withEnv("POSTGRES_INITDB_ARGS", "-E UTF8")
.withEnv("LANG", "en_US.utf8")
.withStartupTimeout(Duration.ofSeconds(30));
container.start();
return Collections.singletonMap("quarkus.datasource.jdbc.url", container.getJdbcUrl());
postgresContainer.start();
return Collections.singletonMap("quarkus.datasource.jdbc.url", postgresContainer.getJdbcUrl());
}
catch (Exception e) {
throw new RuntimeException(e);
@ -52,8 +49,8 @@ public Map<String, String> start() {
@Override
public void stop() {
try {
if (container != null) {
container.stop();
if (postgresContainer != null) {
postgresContainer.stop();
}
}
catch (Exception e) {

View File

@ -1,92 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.testing.testcontainers;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
/**
* Postgres Testcontainers infrastructure handling.
*/
public class PostgresInfrastructure {
protected static final String POSTGRES_DEFAULT_IMAGE = "postgres:9.6.19";
protected static final String POSTGRES_DEFAULT_DEBEZIUM_IMAGE = "debezium/example-postgres:1.3";
protected static final Map<String, PostgresInfrastructure> postgresInfrastructures = new HashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresInfrastructure.class);
protected final String postgresImageName;
protected final Network network = Network.newNetwork();
private final PostgreSQLContainer<?> postgresContainer;
private PostgresInfrastructure(String postgresImageName) {
DockerImageName postgresDockerImageName = DockerImageName.parse(postgresImageName)
.asCompatibleSubstituteFor("postgres");
this.postgresImageName = postgresImageName;
postgresContainer = new PostgreSQLContainer<>(postgresDockerImageName)
.withNetwork(network)
.withDatabaseName("postgres")
.withUsername("postgres")
.withPassword("postgres")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withNetworkAliases("postgres");
}
public static PostgresInfrastructure getDebeziumPostgresInfrastructure() {
return new PostgresInfrastructure(POSTGRES_DEFAULT_DEBEZIUM_IMAGE);
}
public static PostgresInfrastructure getPostgresInfrastructure() {
return new PostgresInfrastructure(POSTGRES_DEFAULT_IMAGE);
}
public static PostgresInfrastructure getInfrastructure(String postgresImage) {
if (postgresInfrastructures.containsKey(postgresImage)) {
return postgresInfrastructures.get(postgresImage);
}
final PostgresInfrastructure infrastructure = new PostgresInfrastructure(postgresImage);
postgresInfrastructures.put(postgresImage, infrastructure);
return infrastructure;
}
public String getPostgresImageName() {
return postgresImageName;
}
public PostgreSQLContainer<?> getPostgresContainer() {
return postgresContainer;
}
public void startContainer() {
Startables.deepStart(Stream.of(postgresContainer)).join();
}
public ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver" + id)
.with("slot.name", "debezium_" + id);
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
return config;
}
}

View File

@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
@ -39,9 +40,9 @@
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.utility.DockerImageName;
import com.jayway.jsonpath.JsonPath;
import org.testcontainers.utility.DockerImageName;
/**
* An integration test verifying the Apicurio registry is interoperable with Debezium
@ -54,8 +55,9 @@ public class ApicurioRegistryTest {
private static final String DEBEZIUM_VERSION = "1.3.0.Final";
private static final String APICURIO_VERSION = "1.3.2.Final";
private static final String POSTGRES_IMAGE = "debezium/postgres:11";
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse("debezium/postgres:11")
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE)
.asCompatibleSubstituteFor("postgres");
private static Network network = Network.newNetwork();
@ -269,4 +271,25 @@ private ConnectorConfiguration getConfiguration(int id, String converter, String
}
return config;
}
@AfterClass
public static void stopContainers() {
try {
if (postgresContainer != null) {
postgresContainer.stop();
}
if (apicurioContainer != null) {
apicurioContainer.stop();
}
if (kafkaContainer != null) {
kafkaContainer.stop();
}
if (debeziumContainer != null) {
debeziumContainer.stop();
}
}
catch (Exception e) {
// ignored
}
}
}

View File

@ -25,6 +25,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
@ -36,19 +37,22 @@
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.utility.DockerImageName;
import com.jayway.jsonpath.JsonPath;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.testcontainers.utility.DockerImageName;
public class DebeziumContainerTest {
private static final String DEBEZIUM_VERSION = "1.3.0.Final";
private static final String POSTGRES_IMAGE = "debezium/postgres:11";
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumContainerTest.class);
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse("debezium/postgres:11")
private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE)
.asCompatibleSubstituteFor("postgres");
private static final Network network = Network.newNetwork();
@ -60,7 +64,7 @@ public class DebeziumContainerTest {
.withNetwork(network)
.withNetworkAliases("postgres");
public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:1.1.1.Final")
public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:" + DEBEZIUM_VERSION)
.withNetwork(network)
.withKafka(kafkaContainer)
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
@ -173,4 +177,22 @@ private String executeHttpRequest(String url) throws IOException {
return response.body().string();
}
}
@AfterClass
public static void stopContainers() {
try {
if (postgresContainer != null) {
postgresContainer.stop();
}
if (kafkaContainer != null) {
kafkaContainer.stop();
}
if (debeziumContainer != null) {
debeziumContainer.stop();
}
}
catch (Exception e) {
// ignored
}
}
}