DBZ-7223 Add the MongoDB sink connector

* add basic tests for MongoDB sink connector (MySQL to MongoDB 1 node ReplicaSet and multi-node sharded cluster)

closes https://issues.redhat.com/browse/DBZ-7223
This commit is contained in:
rkerner 2024-07-03 14:43:56 +02:00 committed by Jiri Pechanec
parent 6ba5a1e631
commit 56e63b98a8
24 changed files with 562 additions and 196 deletions

View File

@ -5,7 +5,7 @@
*/
package io.debezium.kcrestextension;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
/**
* Tests topic creation (which is enabled in Kafka version greater than 2.6.0) and transforms endpoints.
@ -78,19 +78,19 @@ public class DebeziumResourceIT {
@BeforeEach
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName());
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName());
}
@AfterEach
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testTopicCreationEndpoint() {
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.startContainers(DATABASE.NONE);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT)
.then().log().all()
@ -100,10 +100,10 @@ public void testTopicCreationEndpoint() {
@Test
public void testTopicCreationEndpointWhenExplicitlyDisabled() {
RestExtensionTestInfrastructure.getDebeziumContainer().withEnv("CONNECT_TOPIC_CREATION_ENABLE", "false");
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.getDebeziumContainer().withEnv("CONNECT_TOPIC_CREATION_ENABLE", "false");
TestInfrastructureHelper.startContainers(DATABASE.NONE);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT)
.then().log().all()
@ -114,9 +114,9 @@ public void testTopicCreationEndpointWhenExplicitlyDisabled() {
@Test
@Disabled("See DBZ-7416 https://issues.redhat.com/browse/DBZ-7416")
public void testTransformsEndpoint() {
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.startContainers(DATABASE.NONE);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().get(DebeziumResource.BASE_PATH + DebeziumResource.TRANSFORMS_ENDPOINT)
.then().log().all()
.statusCode(200)
@ -127,9 +127,9 @@ public void testTransformsEndpoint() {
@Test
public void testPredicatesEndpoint() {
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.startContainers(DATABASE.NONE);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().get(DebeziumResource.BASE_PATH + DebeziumResource.PREDICATES_ENDPOINT)
.then().log().all()
.statusCode(200)
@ -139,9 +139,9 @@ public void testPredicatesEndpoint() {
@Test
public void testConnectorPluginsEndpoint() {
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.startContainers(DATABASE.NONE);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().get(DebeziumResource.BASE_PATH + DebeziumResource.CONNECTOR_PLUGINS_ENDPOINT)
.then().log().all()
.statusCode(200)

View File

@ -5,7 +5,7 @@
*/
package io.debezium.kcrestextension;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.is;
@ -15,7 +15,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
/**
* Tests topic creation endpoint which is disabled in Kafka version less than 2.6.0.
@ -27,19 +27,19 @@ public class DebeziumResourceNoTopicCreationIT {
@BeforeEach
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName(), DEBEZIUM_CONTAINER_IMAGE_VERSION);
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName(), DEBEZIUM_CONTAINER_IMAGE_VERSION);
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@AfterEach
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testTopicCreationEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT)
.then().log().all()

View File

@ -27,7 +27,7 @@
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.restassured.http.ContentType;
/**
@ -44,13 +44,13 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMariaDbConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(RestExtensionTestInfrastructure.DATABASE.MARIADB);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMariaDbConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(TestInfrastructureHelper.DATABASE.MARIADB);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
@ -58,7 +58,7 @@ public void testValidConnection() {
ConnectorConfiguration config = getMariaDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -73,7 +73,7 @@ public void testInvalidHostnameConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -88,7 +88,7 @@ public void testInvalidHostnameConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + MariaDbConnector.class.getName() + "\"}")
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -108,7 +108,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getMariaDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -132,7 +132,7 @@ public void testFiltersWithValidTableIncludeList() {
.with("table.include.list", "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -152,7 +152,7 @@ public void testFiltersWithValidDatabaseIncludeList() {
.with("database.include.list", "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -176,7 +176,7 @@ public void testFiltersWithInvalidDatabaseIncludeListPattern() {
.with("database.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -196,7 +196,7 @@ public void testFiltersWithInvalidDatabaseExcludeListPattern() {
.with("database.exclude.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -215,16 +215,16 @@ public void testMetricsEndpoint() throws InterruptedException {
ConnectorConfiguration config = getMariaDbConnectorConfiguration(1);
var connectorName = "my-mariadb-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config);
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("mariadb", config.asProperties().getProperty("topic.prefix"));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("mariadb", config.asProperties().getProperty("topic.prefix"));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()
@ -237,12 +237,12 @@ public void testMetricsEndpoint() throws InterruptedException {
}
public static ConnectorConfiguration getMariaDbConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getMariaDbContainer())
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(TestInfrastructureHelper.getMariaDbContainer())
.with(MariaDbConnectorConfig.USER.name(), "debezium")
.with(MariaDbConnectorConfig.PASSWORD.name(), "dbz")
.with(MariaDbConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(MariaDbConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), TestInfrastructureHelper.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory")
.with(MariaDbConnectorConfig.SERVER_ID.name(), Long.valueOf(5555 + id - 1))
// basic container does not support SSL out of the box

View File

@ -17,7 +17,7 @@
import io.debezium.connector.mariadb.MariaDbConnector;
import io.debezium.connector.mariadb.Module;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
/**
* @author Chris Cranford
@ -33,19 +33,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMariaDbConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(RestExtensionTestInfrastructure.DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMariaDbConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(TestInfrastructureHelper.DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -56,7 +56,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMariaDbConnectorResource.BASE_PATH + DebeziumMariaDbConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -108,6 +108,17 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<!--

View File

@ -0,0 +1,122 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.client.MongoDatabase;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnector;
import io.debezium.connector.mongodb.sink.MongoDbSinkConnectorConfig;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public interface SinkConnectorIT {
MongoDbDeployment getMongoDbDeployment();
Logger LOGGER = LoggerFactory.getLogger(SinkConnectorIT.class);
String DATABASE_NAME = "inventory";
String SOURCE_CONNECTOR_NAME = "inventory";
String SINK_CONNECTOR_NAME = "mongodb-sink";
default Configuration.Builder mySqlSourceConfig() {
return Configuration.create()
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("database.hostname", "mysql")
.with("database.port", "3306")
.with("database.user", "debezium")
.with("database.password", "dbz")
.with("database.server.id", "148045")
.with("topic.prefix", "dbserver1")
.with("database.include.list", "inventory")
.with("schema.history.internal.kafka.bootstrap.servers", TestInfrastructureHelper.KAFKA_HOSTNAME + ":9092")
.with("schema.history.internal.kafka.topic", "schema-changes.inventory");
}
default void sendSourceData() {
TestInfrastructureHelper.defaultDebeziumContainer(Module.version());
TestInfrastructureHelper.startContainers(DATABASE.MYSQL);
final Configuration config = mySqlSourceConfig().build();
TestInfrastructureHelper.getDebeziumContainer().registerConnector(SOURCE_CONNECTOR_NAME, ConnectorConfiguration.from(config.asMap()));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorTaskState(SOURCE_CONNECTOR_NAME, 0, Connector.State.RUNNING);
try {
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("mysql", config.getString(CommonConnectorConfig.TOPIC_PREFIX));
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
TestInfrastructureHelper.getDebeziumContainer().deleteConnector(SOURCE_CONNECTOR_NAME);
}
private Configuration.Builder mongodbSinkConfig() {
return Configuration.create()
.with("connector.class", MongoDbSinkConnector.class)
.with(MongoDbSinkConnectorConfig.CONNECTION_STRING, getMongoDbDeployment().getConnectionString())
.with(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "dbserver1\\.inventory\\..*")
.with(MongoDbSinkConnectorConfig.SINK_DATABASE, DATABASE_NAME);
}
default void startSink(Function<Configuration.Builder, Configuration.Builder> custConfig) {
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), null);
TestInfrastructureHelper.startContainers(DATABASE.DEBEZIUM_ONLY);
final Configuration config = custConfig.apply(mongodbSinkConfig()).build();
TestInfrastructureHelper.getDebeziumContainer().registerConnector(SINK_CONNECTOR_NAME, ConnectorConfiguration.from(config.asMap()));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorTaskState(SINK_CONNECTOR_NAME, 0, Connector.State.RUNNING);
}
static void stopContainers(MongoDbDeployment mongo) {
TestInfrastructureHelper.stopContainers();
if (null != mongo) {
mongo.stop();
}
}
default List<String> listCollections(String dbName) {
try (var client = TestHelper.connect(getMongoDbDeployment())) {
MongoDatabase db1 = client.getDatabase(dbName);
var collections = db1.listCollectionNames();
collections.forEach((var collectionName) -> {
LOGGER.info("Found collection '{}' from database '{}'", collectionName, dbName);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("with documents:");
db1.getCollection(collectionName).find().forEach((var doc) -> LOGGER.debug("Found document: {}", doc));
}
});
return collections.into(new ArrayList<>());
}
}
default void checkSinkConnectorWritesRecords() {
startSink(Function.identity());
Awaitility.await()
.atMost(TestHelper.waitTimeForRecords() * 5L, TimeUnit.SECONDS)
.until(() -> listCollections(DATABASE_NAME).size() >= 6);
List<String> collections = listCollections(DATABASE_NAME);
LOGGER.debug("List collections: {}", Arrays.toString(collections.toArray()));
Assertions.assertThat(listCollections(DATABASE_NAME).size()).isEqualTo(6);
MatcherAssert.assertThat(collections, CoreMatchers.hasItems("dbserver1_inventory_addresses", "dbserver1_inventory_orders", "dbserver1_inventory_customers",
"dbserver1_inventory_products_on_hand", "dbserver1_inventory_geom", "dbserver1_inventory_products"));
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.TestHelper.cleanDatabase;
import static org.hamcrest.CoreMatchers.is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseProvider;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.debezium.testing.testcontainers.util.DockerUtils;
public class SinkConnectorReplicaSetIT extends AbstractMongoConnectorIT implements SinkConnectorIT {
protected static MongoDbDeployment mongo;
@Override
public MongoDbDeployment getMongoDbDeployment() {
return mongo;
}
@BeforeClass
public static void beforeAll() {
Assume.assumeThat("Skipping DebeziumMongoDbConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
DockerUtils.enableFakeDnsIfRequired();
mongo = MongoDbDatabaseProvider.externalOrDockerReplicaSet(TestInfrastructureHelper.getNetwork());
mongo.start();
}
@Before
public void beforeEach() {
sendSourceData();
}
@After
public void afterEach() {
cleanDatabase(mongo, DATABASE_NAME);
}
@AfterClass
public static void afterAll() {
SinkConnectorIT.stopContainers(mongo);
DockerUtils.disableFakeDns();
}
@Test
public void testSinkConnectorWritesRecordsToReplicaSet() {
checkSinkConnectorWritesRecords();
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.TestHelper.cleanDatabase;
import static org.hamcrest.CoreMatchers.is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseProvider;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseVersionResolver;
import io.debezium.connector.mongodb.junit.MongoDbPlatform;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.testing.testcontainers.MongoDbShardedCluster;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.debezium.testing.testcontainers.util.DockerUtils;
public class SinkConnectorShardedClusterIT extends AbstractShardedMongoConnectorIT implements SinkConnectorIT {
protected static MongoDbDeployment mongo;
@Override
public MongoDbDeployment getMongoDbDeployment() {
return mongo;
}
@BeforeClass
public static void beforeAll() {
Assume.assumeThat("Skipping DebeziumMongoDbConnectorResourceIT tests when assembly profile is not active!",
System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
Assume.assumeTrue(MongoDbDatabaseVersionResolver.getPlatform().equals(MongoDbPlatform.MONGODB_DOCKER));
DockerUtils.enableFakeDnsIfRequired();
mongo = MongoDbDatabaseProvider.mongoDbShardedCluster(TestInfrastructureHelper.getNetwork());
mongo.start();
}
@Before
public void beforeEach() {
sendSourceData();
var database = shardedDatabase();
var shardedCluster = (MongoDbShardedCluster) mongo;
shardedCluster.enableSharding(database);
shardedCollections().forEach((collection, key) -> {
shardedCluster.shardCollection(database, collection, key);
});
}
@After
public void afterEach() {
cleanDatabase(mongo, DATABASE_NAME);
}
@AfterClass
public static void afterAll() {
SinkConnectorIT.stopContainers(mongo);
DockerUtils.disableFakeDns();
}
@Test
public void testSinkConnectorWritesRecordsToShardedCluster() {
checkSinkConnectorWritesRecords();
}
}

View File

@ -46,7 +46,7 @@
*
*/
public class TestHelper {
protected final static Logger logger = LoggerFactory.getLogger(TestHelper.class);
protected final static Logger LOGGER = LoggerFactory.getLogger(TestHelper.class);
public static final List<Integer> MONGO_VERSION = getMongoVersion();
private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
@ -92,7 +92,7 @@ public static MongoDbConnection.ErrorHandler connectionErrorHandler(int numError
if (attempts.incrementAndGet() > numErrorsBeforeFailing) {
fail("Unable to connect to primary after " + numErrorsBeforeFailing + " errors trying to " + desc + ": " + error);
}
logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
};
}
@ -108,7 +108,7 @@ public static void cleanDatabase(MongoDbDeployment mongo, String dbName) {
try (var client = connect(mongo)) {
MongoDatabase db1 = client.getDatabase(dbName);
db1.listCollectionNames().forEach((String x) -> {
logger.info("Removing collection '{}' from database '{}'", x, dbName);
LOGGER.info("Removing collection '{}' from database '{}'", x, dbName);
db1.getCollection(x).drop();
});
}
@ -123,7 +123,7 @@ public static void cleanDatabases(MongoDbDeployment mongo) {
});
}
catch (Exception e) {
logger.error("Error while cleaning database", e);
LOGGER.error("Error while cleaning database", e);
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.connector.mongodb.junit;
import org.testcontainers.containers.Network;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
import io.debezium.testing.testcontainers.MongoDbShardedCluster;
@ -20,6 +22,8 @@ public final class MongoDbDatabaseProvider {
// Should be aligned with definition in pom.xml
public static final String MONGO_DOCKER_DESKTOP_PORT_DEFAULT = "27017:27117";
private static Network NETWORK = null;
private static MongoDbReplicaSet.Builder dockerReplicaSetBuilder() {
// will be used only in environment with docker desktop
var portResolver = ParsingPortResolver.parseProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, MONGO_DOCKER_DESKTOP_PORT_DEFAULT);
@ -36,7 +40,11 @@ private static MongoDbReplicaSet.Builder dockerReplicaSetBuilder() {
* @return MongoDb Replica set
*/
public static MongoDbReplicaSet dockerReplicaSet() {
return dockerReplicaSetBuilder().build();
var replicaSet = dockerReplicaSetBuilder();
if (NETWORK != null) {
replicaSet.network(NETWORK);
}
return replicaSet.build();
}
/**
@ -45,7 +53,11 @@ public static MongoDbReplicaSet dockerReplicaSet() {
* @return MongoDb Replica set
*/
public static MongoDbReplicaSet dockerAuthReplicaSet() {
return dockerReplicaSetBuilder().authEnabled(true).build();
var replicaSet = dockerReplicaSetBuilder().authEnabled(true);
if (NETWORK != null) {
replicaSet.network(NETWORK);
}
return replicaSet.build();
}
/**
@ -53,7 +65,7 @@ public static MongoDbReplicaSet dockerAuthReplicaSet() {
*
* @return MongoDb Replica set
*/
public static MongoDbShardedCluster mongoDbShardedCluster() {
private static MongoDbShardedCluster.Builder mongoDbShardedClusterBuilder() {
// will be used only in environment with docker desktop
var portResolver = ParsingPortResolver.parseProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, MONGO_DOCKER_DESKTOP_PORT_DEFAULT);
var shardSize = Integer.parseInt(System.getProperty(MONGO_SHARD_SIZE, "2"));
@ -63,7 +75,20 @@ public static MongoDbShardedCluster mongoDbShardedCluster() {
.shardCount(shardSize)
.replicaCount(replicaSize)
.routerCount(1)
.portResolver(portResolver)
.portResolver(portResolver);
}
public static MongoDbShardedCluster mongoDbShardedCluster() {
var cluster = mongoDbShardedClusterBuilder();
if (NETWORK != null) {
cluster.network(NETWORK);
}
return cluster.build();
}
public static MongoDbShardedCluster mongoDbShardedCluster(Network network) {
return mongoDbShardedClusterBuilder()
.network(network)
.build();
}
@ -78,6 +103,17 @@ public static MongoDbDeployment externalOrDockerReplicaSet() {
return platform.provider.get();
}
/**
* Creates MongoDB replica-set abstraction either for external database or a local MongoDB replica-set container with the provided container Network.
*
* @return MongoDb replica-set deployment
*/
public static MongoDbDeployment externalOrDockerReplicaSet(Network network) {
NETWORK = network;
var platform = MongoDbDatabaseVersionResolver.getPlatform();
return platform.provider.get();
}
private MongoDbDatabaseProvider() {
}
}

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mongodb.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -27,7 +27,7 @@
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.debezium.testing.testcontainers.util.DockerUtils;
import io.restassured.http.ContentType;
@ -45,15 +45,15 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
DockerUtils.enableFakeDnsIfRequired();
RestExtensionTestInfrastructure.startContainers(DATABASE.MONGODB);
RestExtensionTestInfrastructure.getMongoDbContainer().execMongoScript(INIT_SCRIPT_RESOURCE, INIT_SCRIPT_PATH);
TestInfrastructureHelper.startContainers(DATABASE.MONGODB);
TestInfrastructureHelper.getMongoDbContainer().execMongoScript(INIT_SCRIPT_RESOURCE, INIT_SCRIPT_PATH);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
DockerUtils.disableFakeDns();
}
@ -62,7 +62,7 @@ public void testValidConnection() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -78,7 +78,7 @@ public void testInvalidIpConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -93,7 +93,7 @@ public void testInvalidIpConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + MongoDbConnector.class.getName() + "\"}")
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -111,7 +111,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -132,7 +132,7 @@ public void testFiltersWithValidCollectionIncludeList() {
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name(), "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -150,7 +150,7 @@ public void testFiltersWithValidDatabaseIncludeList() {
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -171,7 +171,7 @@ public void testFiltersWithInvalidDatabaseIncludeListPattern() {
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -191,7 +191,7 @@ public void testFiltersWithInvalidDatabaseExcludeListPattern() {
.with(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -210,17 +210,17 @@ public void testMetricsEndpoint() throws InterruptedException {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1);
var connectorName = "my-mongodb-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config);
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("mongodb", config.asProperties().getProperty("topic.prefix"), "streaming",
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("mongodb", config.asProperties().getProperty("topic.prefix"), "streaming",
String.valueOf(0));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()
@ -234,7 +234,7 @@ public void testMetricsEndpoint() throws InterruptedException {
}
public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forMongoDbReplicaSet(RestExtensionTestInfrastructure.getMongoDbContainer())
final ConnectorConfiguration config = ConnectorConfiguration.forMongoDbReplicaSet(TestInfrastructureHelper.getMongoDbContainer())
.with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS.name(), 10000)
.with(MongoDbConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(MongoDbConnectorConfig.USER.name(), "debezium")

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mongodb.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
@ -18,7 +18,7 @@
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public class DebeziumMongoDbConnectorResourceNoDatabaseIT {
@ -31,19 +31,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMongoDbConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -54,7 +54,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mysql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -28,7 +28,7 @@
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.restassured.http.ContentType;
public class DebeziumMySqlConnectorResourceIT {
@ -41,13 +41,13 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.MYSQL);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.MYSQL);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
@ -55,7 +55,7 @@ public void testValidConnection() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -70,7 +70,7 @@ public void testInvalidHostnameConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -85,7 +85,7 @@ public void testInvalidHostnameConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + MySqlConnector.class.getName() + "\"}")
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -105,7 +105,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -129,7 +129,7 @@ public void testFiltersWithValidTableIncludeList() {
.with("table.include.list", "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -149,7 +149,7 @@ public void testFiltersWithValidDatabaseIncludeList() {
.with("database.include.list", "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -173,7 +173,7 @@ public void testFiltersWithInvalidDatabaseIncludeListPattern() {
.with("database.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -193,7 +193,7 @@ public void testFiltersWithInvalidDatabaseExcludeListPattern() {
.with("database.exclude.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -212,16 +212,16 @@ public void testMetricsEndpoint() throws InterruptedException {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1);
var connectorName = "my-mysql-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config);
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("mysql", config.asProperties().getProperty("topic.prefix"));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("mysql", config.asProperties().getProperty("topic.prefix"));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()
@ -234,12 +234,12 @@ public void testMetricsEndpoint() throws InterruptedException {
}
public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getMySqlContainer())
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(TestInfrastructureHelper.getMySqlContainer())
.with(MySqlConnectorConfig.USER.name(), "debezium")
.with(MySqlConnectorConfig.PASSWORD.name(), "dbz")
.with(MySqlConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(MySqlConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), TestInfrastructureHelper.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory")
.with(MySqlConnectorConfig.SERVER_ID.name(), Long.valueOf(5555 + id - 1));

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.mysql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
@ -18,7 +18,7 @@
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public class DebeziumMySqlConnectorResourceNoDatabaseIT {
@ -30,19 +30,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumMySqlConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -53,7 +53,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.oracle.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -26,7 +26,7 @@
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.restassured.http.ContentType;
public class DebeziumOracleConnectorResourceIT {
@ -38,17 +38,17 @@ public class DebeziumOracleConnectorResourceIT {
@BeforeClass
public static void checkConditionAndStart() {
Assume.assumeTrue("Skipping DebeziumOracleConnectorResourceIT tests when assembly profile is not active!", IS_ASSEMBLY_PROFILE_ACTIVE);
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.ORACLE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.ORACLE);
TestHelper.loadTestData(TestHelper.getOracleConnectorConfiguration(1), "rest/data.sql");
ORACLE_USERNAME = RestExtensionTestInfrastructure.getOracleContainer().getUsername();
ORACLE_USERNAME = TestInfrastructureHelper.getOracleContainer().getUsername();
running = true;
}
@AfterClass
public static void stop() {
if (IS_ASSEMBLY_PROFILE_ACTIVE && running) {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
}
@ -57,7 +57,7 @@ public void testValidConnection() {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -72,7 +72,7 @@ public void testInvalidHostnameConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -87,7 +87,7 @@ public void testInvalidHostnameConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + OracleConnector.class.getName() + "\"}")
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -108,7 +108,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -141,7 +141,7 @@ public void testFiltersWithValidTableIncludeList() {
.with("table.include.list", ORACLE_USERNAME.toUpperCase() + "\\.DEBEZIUM_TABLE.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -165,7 +165,7 @@ public void testFiltersWithInvalidTableIncludeList() {
.with("table.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -184,16 +184,16 @@ public void testMetricsEndpoint() throws InterruptedException {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1);
var connectorName = "my-oracle-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config.with(OracleConnectorConfig.USER.name(), TestHelper.CONNECTOR_USER));
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("oracle", config.asProperties().getProperty("topic.prefix"));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("oracle", config.asProperties().getProperty("topic.prefix"));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.oracle.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
@ -18,7 +18,7 @@
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public class DebeziumOracleConnectorResourceNoDatabaseIT {
@ -30,19 +30,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -53,7 +53,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -41,7 +41,7 @@
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.OracleContainer;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
@ -731,7 +731,7 @@ private static Configuration getTestConnectionConfiguration(ConnectorConfigurati
: connectionConfiguration.getString(PDB_NAME);
return connectionConfiguration.edit()
.with(JdbcConfiguration.HOSTNAME.name(), "localhost")
.with(JdbcConfiguration.PORT, RestExtensionTestInfrastructure.getOracleContainer().getMappedPort(OracleContainer.ORACLE_PORT))
.with(JdbcConfiguration.PORT, TestInfrastructureHelper.getOracleContainer().getMappedPort(OracleContainer.ORACLE_PORT))
.with(JdbcConfiguration.DATABASE, dbName)
.build();
}
@ -818,12 +818,12 @@ else if ("noncdb".equals(imageTagSuffix)) {
}
public static ConnectorConfiguration getOracleConnectorConfiguration(int id, String... options) {
OracleContainer oracleContainer = RestExtensionTestInfrastructure.getOracleContainer();
OracleContainer oracleContainer = TestInfrastructureHelper.getOracleContainer();
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(oracleContainer)
.with(OracleConnectorConfig.PDB_NAME.name(), oracleContainer.ORACLE_PDB_NAME)
.with(OracleConnectorConfig.DATABASE_NAME.name(), oracleContainer.ORACLE_DBNAME)
.with(OracleConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), TestInfrastructureHelper.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.oracle");
if (options != null && options.length > 0) {

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.postgresql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -25,7 +25,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.restassured.http.ContentType;
public class DebeziumPostgresConnectorResourceIT {
@ -39,13 +39,13 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.POSTGRES);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.POSTGRES);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
@ -53,7 +53,7 @@ public void testValidConnection() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -68,7 +68,7 @@ public void testInvalidHostnameConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -83,7 +83,7 @@ public void testInvalidHostnameConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + PostgresConnector.class.getName() + "\"}")
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -104,7 +104,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -127,7 +127,7 @@ public void testFiltersWithValidTableIncludeList() {
.with("table.include.list", "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -147,7 +147,7 @@ public void testFiltersWithValidSchemaIncludeList() {
.with("schema.include.list", "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -170,7 +170,7 @@ public void testFiltersWithInvalidSchemaIncludeListPattern() {
.with("schema.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -190,7 +190,7 @@ public void testFiltersWithInvalidSchemaExcludeListPattern() {
.with("schema.exclude.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -209,16 +209,16 @@ public void testMetricsEndpoint() throws InterruptedException {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1);
var connectorName = "my-postgres-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config);
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("postgres", config.asProperties().getProperty("topic.prefix"));
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("postgres", config.asProperties().getProperty("topic.prefix"));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()
@ -232,7 +232,7 @@ public void testMetricsEndpoint() throws InterruptedException {
}
private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getPostgresContainer())
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(TestInfrastructureHelper.getPostgresContainer())
.with(PostgresConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)
.with(PostgresConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(PostgresConnectorConfig.SLOT_NAME.name(), "debezium_" + id);

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.postgresql.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
@ -18,7 +18,7 @@
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public class DebeziumPostgresConnectorResourceNoDatabaseIT {
@ -31,19 +31,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumPostgresConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -54,7 +54,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.sqlserver.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItems;
@ -28,7 +28,7 @@
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
import io.restassured.http.ContentType;
public class DebeziumSqlServerConnectorResourceIT {
@ -41,13 +41,13 @@ public static void checkCondition() {
@Before
public void start() throws URISyntaxException {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.SQLSERVER);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.SQLSERVER);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
@ -55,7 +55,7 @@ public void testValidConnection() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -70,7 +70,7 @@ public void testInvalidHostnameConnection() {
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -86,7 +86,7 @@ public void testInvalidHostnameConnection() {
@Test
public void testInvalidConnection() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body("{\"connector.class\": \"" + SqlServerConnector.class.getName() + "\"}")
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_CONNECTION_ENDPOINT)
.then().log().all()
@ -107,7 +107,7 @@ public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -130,7 +130,7 @@ public void testFiltersWithValidTableIncludeList() {
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -151,7 +151,7 @@ public void testFiltersWithInvalidTableIncludeList() {
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -171,7 +171,7 @@ public void testFiltersWithInvalidSchemaExcludeList() {
.with(SqlServerConnectorConfig.TABLE_EXCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
@ -190,17 +190,17 @@ public void testMetricsEndpoint() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1);
var connectorName = "my-sqlserver-connector";
RestExtensionTestInfrastructure.getDebeziumContainer().registerConnector(
TestInfrastructureHelper.getDebeziumContainer().registerConnector(
connectorName,
config);
RestExtensionTestInfrastructure.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
RestExtensionTestInfrastructure.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
RestExtensionTestInfrastructure.getDebeziumContainer().waitForStreamingRunning("sql_server", config.asProperties().getProperty("topic.prefix"), "streaming",
TestInfrastructureHelper.getDebeziumContainer().ensureConnectorState(connectorName, Connector.State.RUNNING);
TestInfrastructureHelper.waitForConnectorTaskStatus(connectorName, 0, Connector.State.RUNNING);
TestInfrastructureHelper.getDebeziumContainer().waitForStreamingRunning("sql_server", config.asProperties().getProperty("topic.prefix"), "streaming",
String.valueOf(0));
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.CONNECTOR_METRICS_ENDPOINT, connectorName)
.then().log().all()
@ -217,10 +217,10 @@ public void testMetricsEndpoint() {
}
public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getSqlServerContainer())
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(TestInfrastructureHelper.getSqlServerContainer())
.with(ConnectorConfiguration.USER, "sa")
.with(ConnectorConfiguration.PASSWORD, "Password!")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), TestInfrastructureHelper.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory")
.with(SqlServerConnectorConfig.DATABASE_NAMES.name(), "testDB,testDB2")
.with(SqlServerConnectorConfig.SNAPSHOT_MODE.name(), "initial")

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.sqlserver.rest;
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasKey;
@ -18,7 +18,7 @@
import io.debezium.connector.sqlserver.Module;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.testing.testcontainers.testhelper.TestInfrastructureHelper;
public class DebeziumSqlServerConnectorResourceNoDatabaseIT {
@ -31,19 +31,19 @@ public static void checkCondition() {
@Before
public void start() {
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.NONE);
TestInfrastructureHelper.setupDebeziumContainer(Module.version(), DebeziumSqlServerConnectRestExtension.class.getName());
TestInfrastructureHelper.startContainers(DATABASE.NONE);
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
TestInfrastructureHelper.stopContainers();
}
@Test
public void testVersionEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VERSION_ENDPOINT)
.then().log().all()
@ -54,7 +54,7 @@ public void testVersionEndpoint() {
@Test
public void testSchemaEndpoint() {
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.port(TestInfrastructureHelper.getDebeziumContainer().getFirstMappedPort())
.when()
.get(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.SCHEMA_ENDPOINT)
.then().log().all()

View File

@ -49,6 +49,12 @@ static ConnectorConfiguration from(JsonNode configNode) {
return configuration;
}
public static ConnectorConfiguration from(Map<String, ?> map) {
final ConnectorConfiguration configuration = new ConnectorConfiguration();
map.forEach((key, val) -> configuration.configNode.put(key, val.toString()));
return configuration;
}
public static ConnectorConfiguration forJdbcContainer(JdbcDatabaseContainer<?> jdbcDatabaseContainer) {
ConnectorConfiguration configuration = new ConnectorConfiguration();

View File

@ -142,7 +142,7 @@ public static int waitTimeForRecords() {
}
public String getTarget() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(KAFKA_CONNECT_PORT);
return "http://" + getHost() + ":" + getMappedPort(KAFKA_CONNECT_PORT);
}
/**
@ -232,7 +232,7 @@ private void executePOSTRequestSuccessfully(final String payload, final String f
}
}
catch (IOException e) {
throw new RuntimeException("Error connecting to Debezium container", e);
throw new RuntimeException("Error connecting to Debezium container on URL: " + fullUrl, e);
}
}

View File

@ -10,6 +10,7 @@
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
@ -34,9 +35,9 @@
import io.debezium.testing.testcontainers.OracleContainer;
import io.debezium.testing.testcontainers.util.MoreStartables;
public class RestExtensionTestInfrastructure {
public class TestInfrastructureHelper {
public static final String KAFKA_HOSTNAME = "kafka-dbz-ui";
public static final String KAFKA_HOSTNAME = "kafka-dbz";
public static final int CI_CONTAINER_STARTUP_TIME = 90;
private static final String DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST = "latest";
@ -47,13 +48,16 @@ public enum DATABASE {
MONGODB,
ORACLE,
MARIADB,
NONE
NONE,
DEBEZIUM_ONLY
}
private static final Logger LOGGER = LoggerFactory.getLogger(RestExtensionTestInfrastructure.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TestInfrastructureHelper.class);
private static final Network NETWORK = Network.newNetwork();
private static final Pattern VERSION_PATTERN = Pattern.compile("^[1-9]\\d*\\.\\d+");
private static final GenericContainer<?> KAFKA_CONTAINER = new GenericContainer<>(
DockerImageName.parse("quay.io/debezium/kafka:" + DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST).asCompatibleSubstituteFor("kafka"))
.withNetworkAliases(KAFKA_HOSTNAME)
@ -113,6 +117,10 @@ public enum DATABASE {
.withNetworkAliases("oracledb")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
public static Network getNetwork() {
return NETWORK;
}
private static Supplier<Stream<Startable>> getContainers(DATABASE database) {
final Startable dbStartable;
switch (database) {
@ -135,6 +143,7 @@ private static Supplier<Stream<Startable>> getContainers(DATABASE database) {
dbStartable = MARIADB_CONTAINER;
break;
case NONE:
case DEBEZIUM_ONLY:
default:
dbStartable = null;
break;
@ -144,6 +153,9 @@ private static Supplier<Stream<Startable>> getContainers(DATABASE database) {
return () -> Stream.of(KAFKA_CONTAINER, dbStartable, DEBEZIUM_CONTAINER);
}
else {
if (DATABASE.DEBEZIUM_ONLY.equals(database)) {
return () -> Stream.of(DEBEZIUM_CONTAINER);
}
return () -> Stream.of(KAFKA_CONTAINER, DEBEZIUM_CONTAINER);
}
}
@ -173,9 +185,18 @@ public static void setupDebeziumContainer(String connectorVersion, String restEx
setupDebeziumContainer(connectorVersion, restExtensionClassses, DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST);
}
private static void waitForDebeziumContainerIsStopped() {
Awaitility.await()
.atMost(DebeziumContainer.waitTimeForRecords() * 5L, TimeUnit.SECONDS)
.until(() -> !TestInfrastructureHelper.getDebeziumContainer().isRunning());
}
public static void setupDebeziumContainer(String connectorVersion, String restExtensionClasses, String debeziumContainerImageVersion) {
if (null != DEBEZIUM_CONTAINER && DEBEZIUM_CONTAINER.isRunning()) {
DEBEZIUM_CONTAINER.stop();
waitForDebeziumContainerIsStopped();
}
final String registry = debeziumContainerImageVersion.startsWith("1.2") ? "" : "quay.io/";
final String debeziumVersion = debeziumContainerImageVersion.startsWith("1.2") ? "1.2.5.Final" : connectorVersion;
@ -186,12 +207,46 @@ public static void setupDebeziumContainer(String connectorVersion, String restEx
.withBuildArg("BASE_IMAGE", baseImageName)
.withBuildArg("DEBEZIUM_VERSION", debeziumVersion))
.withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true")
.withEnv("CONNECT_REST_EXTENSION_CLASSES", restExtensionClasses)
.withNetwork(NETWORK)
.withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.enableJMX()
.dependsOn(KAFKA_CONTAINER);
if (null != restExtensionClasses && !restExtensionClasses.isEmpty()) {
DEBEZIUM_CONTAINER.withEnv("CONNECT_REST_EXTENSION_CLASSES", restExtensionClasses);
}
}
public static void defaultDebeziumContainer(String debeziumContainerImageVersion) {
if (null != DEBEZIUM_CONTAINER && DEBEZIUM_CONTAINER.isRunning()) {
DEBEZIUM_CONTAINER.stop();
waitForDebeziumContainerIsStopped();
}
if (null == debeziumContainerImageVersion) {
debeziumContainerImageVersion = DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST;
}
else {
var matcher = VERSION_PATTERN.matcher(debeziumContainerImageVersion);
if (matcher.find()) {
debeziumContainerImageVersion = matcher.toMatchResult().group();
}
else {
throw new RuntimeException("Cannot parse version: " + debeziumContainerImageVersion);
}
}
final String registry = debeziumContainerImageVersion.startsWith("1.2") ? "" : "quay.io/";
String imageName = registry + "debezium/connect:" + debeziumContainerImageVersion;
DEBEZIUM_CONTAINER = new DebeziumContainer(DockerImageName.parse(imageName))
.withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true")
.withNetwork(NETWORK)
.withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092")
.withLogConsumer(new Slf4jLogConsumer(LOGGER))
.enableJMX()
.dependsOn(KAFKA_CONTAINER);
}
public static void defaultDebeziumContainer() {
defaultDebeziumContainer(null);
}
public static GenericContainer<?> getKafkaContainer() {
@ -231,6 +286,6 @@ public static void waitForConnectorTaskStatus(String connectorName, int taskNumb
// this needs to be set to at least a minimum of ~65-70 seconds because PostgreSQL now
// retries on certain failure conditions with a 10s between them.
.atMost(120, TimeUnit.SECONDS)
.until(() -> RestExtensionTestInfrastructure.getDebeziumContainer().getConnectorTaskState(connectorName, taskNumber) == state);
.until(() -> TestInfrastructureHelper.getDebeziumContainer().getConnectorTaskState(connectorName, taskNumber) == state);
}
}