diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java index 9f35ad93c..ad86335d6 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresShutdownIT.java @@ -15,6 +15,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; import io.debezium.config.CommonConnectorConfig; @@ -27,8 +29,10 @@ import io.debezium.heartbeat.DatabaseHeartbeatImpl; import io.debezium.heartbeat.Heartbeat; import io.debezium.jdbc.JdbcConfiguration; -import io.debezium.testing.testcontainers.PostgresInfrastructure; import io.debezium.util.Testing; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; /** * Integration test for {@link PostgresConnector} using an {@link EmbeddedEngine} and Testcontainers infrastructure for when Postgres is shutdown during streaming @@ -39,6 +43,11 @@ public class PostgresShutdownIT extends AbstractConnectorTest { * Specific tests that need to extend the initial DDL set should do it in a form of * TestHelper.execute(SETUP_TABLES_STMT + ADDITIONAL_STATEMENTS) */ + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresShutdownIT.class); + + private static final String POSTGRES_IMAGE = "debezium/example-postgres:1.3"; + private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);" + "INSERT INTO s2.a (aa) VALUES (1);"; private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + @@ -51,15 +60,23 @@ public class PostgresShutdownIT extends AbstractConnectorTest { "INSERT INTO s1.heartbeat (ts) VALUES (NOW());"; private static final String SETUP_TABLES_STMT = CREATE_TABLES_STMT + INSERT_STMT; - private PostgresInfrastructure infrastructure; + private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) + .asCompatibleSubstituteFor("postgres"); + + public static PostgreSQLContainer postgresContainer = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME) + .withDatabaseName("postgres") + .withUsername("postgres") + .withPassword("postgres") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)) + .withNetworkAliases("postgres"); + private String oldContainerPort; @Before public void setUp() { - infrastructure = PostgresInfrastructure.getDebeziumPostgresInfrastructure(); - infrastructure.startContainer(); + postgresContainer.start(); oldContainerPort = System.getProperty("database.port", "5432"); - System.setProperty("database.port", String.valueOf(infrastructure.getPostgresContainer().getMappedPort(5432))); + System.setProperty("database.port", String.valueOf(postgresContainer.getMappedPort(5432))); try { TestHelper.dropAllSchemas(); } @@ -74,7 +91,7 @@ public void tearDown() { stopConnector(); TestHelper.dropDefaultReplicationSlot(); TestHelper.dropPublication(); - infrastructure.getPostgresContainer().stop(); + postgresContainer.stop(); System.setProperty("database.port", oldContainerPort); } @@ -88,7 +105,7 @@ public void shouldStopOnPostgresFastShutdown() throws Exception { TestHelper.execute(INSERT_STMT); } Configuration.Builder configBuilder = TestHelper.defaultConfig() - .with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT, infrastructure.getPostgresContainer().getMappedPort(5432)) + .with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + JdbcConfiguration.PORT, postgresContainer.getMappedPort(5432)) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1") @@ -119,7 +136,7 @@ public void shouldStopOnPostgresFastShutdown() throws Exception { postgresConnection.singleResultMapper(rs -> rs.getString("ts"), "Could not fetch keepalive info"))); logger.info("Execute Postgres shutdown..."); - Container.ExecResult result = infrastructure.getPostgresContainer() + Container.ExecResult result = postgresContainer .execInContainer("su", "-", "postgres", "-c", "/usr/lib/postgresql/11/bin/pg_ctl -m fast -D /var/lib/postgresql/data stop"); logger.info(result.toString()); @@ -133,7 +150,7 @@ private void waitForPostgresShutdown() { Awaitility.await() .pollInterval(200, TimeUnit.MILLISECONDS) .atMost(60 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS) - .until(() -> !infrastructure.getPostgresContainer().isRunning()); + .until(() -> !postgresContainer.isRunning()); } } diff --git a/debezium-quarkus-outbox/integration-tests/pom.xml b/debezium-quarkus-outbox/integration-tests/pom.xml index 97a299cca..76fa057a8 100644 --- a/debezium-quarkus-outbox/integration-tests/pom.xml +++ b/debezium-quarkus-outbox/integration-tests/pom.xml @@ -35,6 +35,13 @@ ${project.version} test + + + io.debezium + debezium-testing-testcontainers + ${project.version} + test + @@ -94,6 +101,11 @@ debezium-quarkus-outbox-deployment test + + io.debezium + debezium-testing-testcontainers + test + diff --git a/debezium-quarkus-outbox/integration-tests/src/test/java/io/debezium/outbox/quarkus/it/DatabaseTestResource.java b/debezium-quarkus-outbox/integration-tests/src/test/java/io/debezium/outbox/quarkus/it/DatabaseTestResource.java index b756fd3ab..ca7f604ad 100644 --- a/debezium-quarkus-outbox/integration-tests/src/test/java/io/debezium/outbox/quarkus/it/DatabaseTestResource.java +++ b/debezium-quarkus-outbox/integration-tests/src/test/java/io/debezium/outbox/quarkus/it/DatabaseTestResource.java @@ -10,39 +10,36 @@ import java.util.Map; import org.testcontainers.containers.PostgreSQLContainer; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; - /** * @author Chris Cranford */ public class DatabaseTestResource implements QuarkusTestResourceLifecycleManager { - private static final String POSTGRES_USER = "postgres"; - private static final String POSTGRES_PASSWORD = "postgres"; - private static final String POSTGRES_DBNAME = "postgres"; - private static final String POSTGRES_IMAGE = "debezium/postgres:9.6"; + private static final String POSTGRES_IMAGE = "debezium/postgres:11"; + private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) .asCompatibleSubstituteFor("postgres"); - private static PostgreSQLContainer container; + private static PostgreSQLContainer postgresContainer; @Override public Map start() { try { - container = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME) + postgresContainer = new PostgreSQLContainer<>(POSTGRES_DOCKER_IMAGE_NAME) .waitingFor(Wait.forLogMessage(".*database system is ready to accept connections.*", 2)) - .withUsername(POSTGRES_USER) - .withPassword(POSTGRES_PASSWORD) - .withDatabaseName(POSTGRES_DBNAME) + .withUsername("postgres") + .withPassword("postgres") + .withDatabaseName("postgres") .withEnv("POSTGRES_INITDB_ARGS", "-E UTF8") .withEnv("LANG", "en_US.utf8") .withStartupTimeout(Duration.ofSeconds(30)); - container.start(); - return Collections.singletonMap("quarkus.datasource.jdbc.url", container.getJdbcUrl()); + postgresContainer.start(); + return Collections.singletonMap("quarkus.datasource.jdbc.url", postgresContainer.getJdbcUrl()); } catch (Exception e) { throw new RuntimeException(e); @@ -52,8 +49,8 @@ public Map start() { @Override public void stop() { try { - if (container != null) { - container.stop(); + if (postgresContainer != null) { + postgresContainer.stop(); } } catch (Exception e) { diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/PostgresInfrastructure.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/PostgresInfrastructure.java deleted file mode 100644 index 4123b57c3..000000000 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/PostgresInfrastructure.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.testing.testcontainers; - -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Stream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.PostgreSQLContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; - -/** - * Postgres Testcontainers infrastructure handling. - */ -public class PostgresInfrastructure { - - protected static final String POSTGRES_DEFAULT_IMAGE = "postgres:9.6.19"; - protected static final String POSTGRES_DEFAULT_DEBEZIUM_IMAGE = "debezium/example-postgres:1.3"; - - protected static final Map postgresInfrastructures = new HashMap<>(); - - private static final Logger LOGGER = LoggerFactory.getLogger(PostgresInfrastructure.class); - - protected final String postgresImageName; - protected final Network network = Network.newNetwork(); - private final PostgreSQLContainer postgresContainer; - - private PostgresInfrastructure(String postgresImageName) { - DockerImageName postgresDockerImageName = DockerImageName.parse(postgresImageName) - .asCompatibleSubstituteFor("postgres"); - - this.postgresImageName = postgresImageName; - postgresContainer = new PostgreSQLContainer<>(postgresDockerImageName) - .withNetwork(network) - .withDatabaseName("postgres") - .withUsername("postgres") - .withPassword("postgres") - .withLogConsumer(new Slf4jLogConsumer(LOGGER)) - .withNetworkAliases("postgres"); - } - - public static PostgresInfrastructure getDebeziumPostgresInfrastructure() { - return new PostgresInfrastructure(POSTGRES_DEFAULT_DEBEZIUM_IMAGE); - } - - public static PostgresInfrastructure getPostgresInfrastructure() { - return new PostgresInfrastructure(POSTGRES_DEFAULT_IMAGE); - } - - public static PostgresInfrastructure getInfrastructure(String postgresImage) { - if (postgresInfrastructures.containsKey(postgresImage)) { - return postgresInfrastructures.get(postgresImage); - } - final PostgresInfrastructure infrastructure = new PostgresInfrastructure(postgresImage); - postgresInfrastructures.put(postgresImage, infrastructure); - return infrastructure; - } - - public String getPostgresImageName() { - return postgresImageName; - } - - public PostgreSQLContainer getPostgresContainer() { - return postgresContainer; - } - - public void startContainer() { - Startables.deepStart(Stream.of(postgresContainer)).join(); - } - - public ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) { - final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer) - .with("database.server.name", "dbserver" + id) - .with("slot.name", "debezium_" + id); - - if (options != null && options.length > 0) { - for (int i = 0; i < options.length; i += 2) { - config.with(options[i], options[i + 1]); - } - } - return config; - } - -} diff --git a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java index 4da21f061..f2e43d719 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java +++ b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/ApicurioRegistryTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.rnorth.ducttape.unreliables.Unreliables; @@ -39,9 +40,9 @@ import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; -import org.testcontainers.utility.DockerImageName; import com.jayway.jsonpath.JsonPath; +import org.testcontainers.utility.DockerImageName; /** * An integration test verifying the Apicurio registry is interoperable with Debezium @@ -54,8 +55,9 @@ public class ApicurioRegistryTest { private static final String DEBEZIUM_VERSION = "1.3.0.Final"; private static final String APICURIO_VERSION = "1.3.2.Final"; + private static final String POSTGRES_IMAGE = "debezium/postgres:11"; - private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse("debezium/postgres:11") + private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) .asCompatibleSubstituteFor("postgres"); private static Network network = Network.newNetwork(); @@ -269,4 +271,25 @@ private ConnectorConfiguration getConfiguration(int id, String converter, String } return config; } + + @AfterClass + public static void stopContainers() { + try { + if (postgresContainer != null) { + postgresContainer.stop(); + } + if (apicurioContainer != null) { + apicurioContainer.stop(); + } + if (kafkaContainer != null) { + kafkaContainer.stop(); + } + if (debeziumContainer != null) { + debeziumContainer.stop(); + } + } + catch (Exception e) { + // ignored + } + } } diff --git a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/DebeziumContainerTest.java b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/DebeziumContainerTest.java index 2e569198e..f01ee50ab 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/DebeziumContainerTest.java +++ b/debezium-testing/debezium-testing-testcontainers/src/test/java/io/debezium/testing/testcontainers/DebeziumContainerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.rnorth.ducttape.unreliables.Unreliables; @@ -36,19 +37,22 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; -import org.testcontainers.utility.DockerImageName; import com.jayway.jsonpath.JsonPath; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; +import org.testcontainers.utility.DockerImageName; public class DebeziumContainerTest { + private static final String DEBEZIUM_VERSION = "1.3.0.Final"; + private static final String POSTGRES_IMAGE = "debezium/postgres:11"; + private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumContainerTest.class); - private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse("debezium/postgres:11") + private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE) .asCompatibleSubstituteFor("postgres"); private static final Network network = Network.newNetwork(); @@ -60,7 +64,7 @@ public class DebeziumContainerTest { .withNetwork(network) .withNetworkAliases("postgres"); - public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:1.1.1.Final") + public static DebeziumContainer debeziumContainer = new DebeziumContainer("debezium/connect:" + DEBEZIUM_VERSION) .withNetwork(network) .withKafka(kafkaContainer) .withLogConsumer(new Slf4jLogConsumer(LOGGER)) @@ -173,4 +177,22 @@ private String executeHttpRequest(String url) throws IOException { return response.body().string(); } } + + @AfterClass + public static void stopContainers() { + try { + if (postgresContainer != null) { + postgresContainer.stop(); + } + if (kafkaContainer != null) { + kafkaContainer.stop(); + } + if (debeziumContainer != null) { + debeziumContainer.stop(); + } + } + catch (Exception e) { + // ignored + } + } }