DBZ-6762 Add "validate filters" endpoint/s for connector-specific Connect REST Extensions

closes to https://issues.redhat.com/browse/DBZ-6762
This commit is contained in:
rkerner 2023-11-07 20:13:17 +01:00 committed by Jiri Pechanec
parent ae6e7d03cc
commit aeaba3ae70
30 changed files with 1169 additions and 111 deletions

View File

@ -32,7 +32,6 @@ jobs:
rest-extension-changed: ${{ steps.changed-files-rest-extension.outputs.any_changed }}
schema-generator-changed: ${{ steps.changed-files-schema-generator.outputs.any_changed }}
debezium-testing-changed: ${{ steps.changed-files-debezium-testing.outputs.any_changed }}
debezium-ui-changed: ${{ steps.changed-files-debezium-ui.outputs.any_changed }}
mysql-ddl-parser-changed: ${{ steps.changed-files-mysql-ddl-parser.outputs.any_changed }}
oracle-ddl-parser-changed: ${{ steps.changed-files-oracle-ddl-parser.outputs.any_changed }}
documentation-only-changed: ${{ steps.changed-files-documentation.outputs.only_changed}}
@ -1036,30 +1035,3 @@ jobs:
-Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-DskipNonCore
build_ui:
needs: [check_style, file_changes]
if: ${{ needs.file_changes.outputs.debezium-ui-changed == 'true' }}
name: "UI"
runs-on: ubuntu-latest
steps:
- name: Checkout Action (UI)
uses: actions/checkout@v4
with:
repository: debezium/debezium-ui
path: debezium-ui
- name: Set up Java 17
uses: actions/setup-java@v3
with:
distribution: 'temurin'
java-version: 17
- name: Cache Maven Repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}
restore-keys: |
maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }}

View File

@ -14,8 +14,6 @@
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:

View File

@ -5,6 +5,9 @@
*/
package io.debezium.connector.mongodb.rest;
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -13,19 +16,26 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.SchemaResource;
import io.debezium.rest.model.DataCollection;
/**
* A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension
* A JAX-RS Resource class defining endpoints of the Debezium MongoDB Connect REST Extension
*
*/
@Path(DebeziumMongoDbConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource {
public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource {
public static final String BASE_PATH = "/debezium/mongodb";
public static final String VERSION_ENDPOINT = "/version";
@ -41,6 +51,26 @@ public Connector getConnector() {
return new MongoDbConnector();
}
protected MongoDbConnection getConnection(Configuration configuration) {
MongoDbTaskContext context = new MongoDbTaskContext(configuration);
ReplicaSet replicaSet = new ReplicaSet(context.getConnectionContext().connectionString());
return context.getConnectionContext().connect(replicaSet, context.filters(), (s, throwable) -> {
throw new DebeziumException(s, throwable);
});
}
@Override
public List<DataCollection> getMatchingCollections(Configuration configuration) {
try (MongoDbConnection primary = getConnection(configuration)) {
return primary.collections().stream()
.map(collectionId -> new DataCollection(collectionId.replicaSetName(), collectionId.dbName(), collectionId.name()))
.collect(Collectors.toList());
}
catch (InterruptedException e) {
throw new DebeziumException(e);
}
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/mongodb.json";

View File

@ -105,6 +105,105 @@ public void testInvalidConnection() {
Map.of("property", MongoDbConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required")));
}
@Test
public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(3))
.body("matchingCollections",
hasItems(
Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"),
Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"),
Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
}
@Test
public void testFiltersWithValidCollectionIncludeList() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name(), "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(1))
.body("matchingCollections",
hasItems(Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
}
@Test
public void testFiltersWithValidDatabaseIncludeList() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1)
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(3))
.body("matchingCollections",
hasItems(
Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"),
Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"),
Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products")));
}
@Test
public void testFiltersWithInvalidDatabaseIncludeListPattern() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1)
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name()))
.body("message", equalTo(
"The 'database.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
@Test
public void testFiltersWithInvalidDatabaseExcludeListPattern() {
ConnectorConfiguration config = getMongoDbConnectorConfiguration(1)
.with(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name()))
.body("message", equalTo(
"The 'database.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forMongoDbReplicaSet(RestExtensionTestInfrastructure.getMongoDbContainer())
.with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS.name(), 10000)

View File

@ -13,9 +13,7 @@
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
* Kafka Connect's REST interface.
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:

View File

@ -5,6 +5,12 @@
*/
package io.debezium.connector.mysql.rest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -13,10 +19,17 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.Module;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.SchemaResource;
import io.debezium.rest.model.DataCollection;
/**
* A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension
@ -25,12 +38,10 @@
@Path(DebeziumMySqlConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource {
public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource {
public static final String BASE_PATH = "/debezium/mysql";
public static final String VERSION_ENDPOINT = "/version";
public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties";
public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector";
@Override
public String getSchemaFilePath() {
@ -42,6 +53,40 @@ public Connector getConnector() {
return new MySqlConnector();
}
@Override
public List<DataCollection> getMatchingCollections(Configuration configuration) {
final MySqlConnectorConfig config = new MySqlConnectorConfig(configuration);
final MySqlConnection.MySqlConnectionConfiguration connectionConfig = new MySqlConnection.MySqlConnectionConfiguration(configuration);
try (MySqlConnection connection = new MySqlConnection(connectionConfig)) {
Set<TableId> tables;
final List<String> databaseNames = new ArrayList<>();
connection.query("SHOW DATABASES", rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
});
List<DataCollection> allMatchingTables = new ArrayList<>();
for (String databaseName : databaseNames) {
if (!config.getTableFilters().databaseFilter().test(databaseName)) {
continue;
}
tables = connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" });
var matchingTables = tables.stream()
.filter(tableId -> config.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.map(tableId -> new DataCollection(tableId.catalog(), tableId.table()))
.collect(Collectors.toList());
allMatchingTables.addAll(matchingTables);
}
return allMatchingTables;
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {

View File

@ -98,6 +98,113 @@ public void testInvalidConnection() {
Map.of("property", MySqlConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required")));
}
@Test
public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(6))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"),
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
Map.of("namespace", "inventory", "name", "addresses", "identifier", "inventory.addresses"),
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithValidTableIncludeList() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1)
.with("table.include.list", "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(2))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithValidDatabaseIncludeList() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1)
.with("database.include.list", "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(6))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"),
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
Map.of("namespace", "inventory", "name", "addresses", "identifier", "inventory.addresses"),
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithInvalidDatabaseIncludeListPattern() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1)
.with("database.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("database.include.list"))
.body("message", equalTo(
"The 'database.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
@Test
public void testFiltersWithInvalidDatabaseExcludeListPattern() {
ConnectorConfiguration config = getMySqlConnectorConfiguration(1)
.with("database.exclude.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("database.exclude.list"))
.body("message", equalTo(
"The 'database.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getMySqlContainer())
.with(MySqlConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)

View File

@ -556,6 +556,7 @@
<database.username>${database.username}</database.username>
<database.password>${database.password}</database.password>
<database.dbname>${database.dbname}</database.dbname>
<database.pdb.name>${database.pdb.name}</database.pdb.name>
</systemPropertyVariables>
</configuration>
</plugin>

View File

@ -13,9 +13,7 @@
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
* Kafka Connect's REST interface.
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:

View File

@ -5,6 +5,11 @@
*/
package io.debezium.connector.oracle.rest;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -13,10 +18,18 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.SchemaResource;
import io.debezium.rest.model.DataCollection;
import io.debezium.util.Strings;
/**
* A JAX-RS Resource class defining endpoints of the Debezium Oracle Connect REST Extension
@ -25,12 +38,10 @@
@Path(DebeziumOracleConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource {
public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource {
public static final String BASE_PATH = "/debezium/oracle";
public static final String VERSION_ENDPOINT = "/version";
public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties";
public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector";
@Override
public String getSchemaFilePath() {
@ -42,6 +53,30 @@ public Connector getConnector() {
return new OracleConnector();
}
@Override
public List<DataCollection> getMatchingCollections(Configuration configuration) {
final OracleConnectorConfig oracleConfig = new OracleConnectorConfig(configuration);
final String databaseName = oracleConfig.getCatalogName();
try (OracleConnection connection = new OracleConnection(oracleConfig.getJdbcConfig(), false)) {
if (!Strings.isNullOrBlank(oracleConfig.getPdbName())) {
connection.setSessionToPdb(oracleConfig.getPdbName());
}
Set<TableId> tables;
// @TODO: we need to expose a better method from the connector, particularly getAllTableIds
// the following's performance is acceptable when using PDBs but not as ideal with non-PDB
tables = connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" });
return tables.stream()
.filter(tableId -> oracleConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.map(tableId -> new DataCollection(tableId.schema(), tableId.table()))
.collect(Collectors.toList());
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
@GET
@Path(VERSION_ENDPOINT)
public String getConnectorVersion() {

View File

@ -15,42 +15,45 @@
import java.util.Locale;
import java.util.Map;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.connector.oracle.Module;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.restassured.http.ContentType;
public class DebeziumOracleConnectorResourceIT {
@BeforeClass
public static void checkCondition() {
Assume.assumeThat("Skipping DebeziumOracleConnectorResourceIT tests when assembly profile is not active!", System.getProperty("isAssemblyProfileActive", "false"),
is("true"));
}
private static final boolean IS_ASSEMBLY_PROFILE_ACTIVE = System.getProperty("isAssemblyProfileActive", "false").equalsIgnoreCase("true");
private static String ORACLE_USERNAME;
private static boolean running = false;
@Before
public void start() {
@BeforeClass
public static void checkConditionAndStart() {
Assume.assumeTrue("Skipping DebeziumOracleConnectorResourceIT tests when assembly profile is not active!", IS_ASSEMBLY_PROFILE_ACTIVE);
RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName());
RestExtensionTestInfrastructure.startContainers(DATABASE.ORACLE);
TestHelper.loadTestData(TestHelper.getOracleConnectorConfiguration(1), "rest/data.sql");
ORACLE_USERNAME = RestExtensionTestInfrastructure.getOracleContainer().getUsername();
running = true;
}
@After
public void stop() {
RestExtensionTestInfrastructure.stopContainers();
@AfterClass
public static void stop() {
if (IS_ASSEMBLY_PROFILE_ACTIVE && running) {
RestExtensionTestInfrastructure.stopContainers();
}
}
@Test
public void testValidConnection() {
ConnectorConfiguration config = getOracleConnectorConfiguration(1);
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
@ -64,7 +67,7 @@ public void testValidConnection() {
@Test
public void testInvalidHostnameConnection() {
ConnectorConfiguration config = getOracleConnectorConfiguration(1).with(OracleConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz");
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1).with(OracleConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz");
Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English
given()
@ -99,20 +102,80 @@ public void testInvalidConnection() {
Map.of("property", OracleConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required")));
}
public static ConnectorConfiguration getOracleConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getOracleContainer())
.with(OracleConnectorConfig.PDB_NAME.name(), "ORCLPDB1")
.with(OracleConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory");
@Test
public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1);
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(10))
.body("matchingCollections",
hasItems(
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "ORDERS", "identifier", ORACLE_USERNAME.toUpperCase() + ".ORDERS"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PEOPLE", "identifier", ORACLE_USERNAME.toUpperCase() + ".PEOPLE"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "TEST", "identifier", ORACLE_USERNAME.toUpperCase() + ".TEST"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PRODUCTS", "identifier", ORACLE_USERNAME.toUpperCase() + ".PRODUCTS"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "CUSTOMERS", "identifier", ORACLE_USERNAME.toUpperCase() + ".CUSTOMERS"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE3", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE3"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE2", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE2"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE1", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE1"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PRODUCTS_ON_HAND", "identifier",
ORACLE_USERNAME.toUpperCase() + ".PRODUCTS_ON_HAND"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TEST", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TEST")));
}
return config;
@Test
public void testFiltersWithValidTableIncludeList() {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1)
.with("table.include.list", ORACLE_USERNAME.toUpperCase() + "\\.DEBEZIUM_TABLE.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(3))
.body("matchingCollections",
hasItems(
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE3", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE3"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE1", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE1"),
Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE2", "identifier",
ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE2")));
}
@Test
public void testFiltersWithInvalidTableIncludeList() {
ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1)
.with("table.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("table.include.list"))
.body("message", equalTo(
"The 'table.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
}

View File

@ -5,7 +5,11 @@
*/
package io.debezium.connector.oracle.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
@ -16,7 +20,10 @@
import org.awaitility.Awaitility;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
@ -26,15 +33,20 @@
import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider;
import io.debezium.connector.oracle.rest.DebeziumOracleConnectorResourceIT;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.storage.kafka.history.KafkaSchemaHistory;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.OracleContainer;
import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
public class TestHelper {
private static final String PDB_NAME = "pdb.name";
private static final String DATABASE_PREFIX = "database.";
private static final String DATABASE_PREFIX = CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
private static final String DATABASE_ADMIN_PREFIX = "database.admin.";
public static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath();
@ -75,7 +87,9 @@ public class TestHelper {
*/
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
private static Map<String, Field> cacheMappings = new HashMap<>();
private static final Logger LOGGER = LoggerFactory.getLogger(TestHelper.class);
private static final Map<String, Field> cacheMappings = new HashMap<>();
static {
cacheMappings.put(CacheProvider.TRANSACTIONS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS);
@ -660,4 +674,118 @@ public static Scn getCurrentScn() throws SQLException {
return admin.getCurrentScn();
}
}
// Below are test helper methods for integration tests using the Testcointainers based OracleContainer instance:
private static Configuration getTestConnectionConfiguration(ConnectorConfiguration config) {
var connectionConfiguration = Configuration.from(config.asProperties()).subset(CommonConnectorConfig.DATABASE_CONFIG_PREFIX, true);
var dbName = Strings.isNullOrEmpty(connectionConfiguration.getString(PDB_NAME))
? connectionConfiguration.getString(JdbcConfiguration.DATABASE)
: connectionConfiguration.getString(PDB_NAME);
return connectionConfiguration.edit()
.with(JdbcConfiguration.HOSTNAME.name(), "localhost")
.with(JdbcConfiguration.PORT, RestExtensionTestInfrastructure.getOracleContainer().getMappedPort(OracleContainer.ORACLE_PORT))
.with(JdbcConfiguration.DATABASE, dbName)
.build();
}
// expects user passed in the config to be any local user account on the Oracle DB instance
private static OracleConnection createConnection(ConnectorConfiguration config, boolean autoCommit) {
Configuration connectionConfiguration = getTestConnectionConfiguration(config);
OracleConnection connection = new OracleConnection(JdbcConfiguration.adapt(connectionConfiguration));
try {
connection.setAutoCommit(autoCommit);
return connection;
}
catch (SQLException e) {
throw new RuntimeException("Failed to create connection", e);
}
}
// Will only work for SQL files that use ";" as ending of an SQL statement, other ";" can't be used in the SQL code
private static String[] getResourceSqlFileContent(String file) {
try (var is = DebeziumOracleConnectorResourceIT.class.getClassLoader().getResourceAsStream(file)) {
if (null == is) {
throw new IllegalArgumentException("File not found. (" + file + ")");
}
try (
var streamReader = new InputStreamReader(is, StandardCharsets.UTF_8);
var reader = new BufferedReader(streamReader)) {
List<String> sqlStatements = new ArrayList<>();
var sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
if (line.endsWith(";")) {
sb.append(line, 0, line.length() - 1);
sqlStatements.add(sb.toString());
sb = new StringBuilder();
}
else {
sb.append(line).append(" ");
}
}
return sqlStatements.toArray(new String[0]);
}
}
catch (IOException e) {
throw new DebeziumException(e);
}
}
public static void loadTestData(ConnectorConfiguration config, String sqlFile) {
try (var conn = TestHelper.createConnection(config, false)) {
conn.execute(getResourceSqlFileContent(sqlFile));
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
private static void patchConnectorConfigurationForContainer(ConnectorConfiguration connectorConfiguration, OracleContainer oracleContainer) {
var oracleImageName = oracleContainer.getDockerImageName();
if (!oracleImageName.startsWith(OracleContainer.DEFAULT_IMAGE_NAME.getUnversionedPart())) {
return;
}
String imageTag = "latest";
String imageTagSuffix = "";
if (oracleImageName.contains(":")) {
imageTag = oracleImageName.split(":")[1];
}
if (imageTag.contains("-")) {
imageTagSuffix = imageTag.substring(imageTag.lastIndexOf("-") + 1);
}
String pdbName = connectorConfiguration.asProperties().getProperty(OracleConnectorConfig.PDB_NAME.name());
if (!imageTag.contains("-") || "xs".equals(imageTagSuffix)) {
if (!Strings.isNullOrEmpty(pdbName)) {
connectorConfiguration.with(OracleConnectorConfig.DATABASE_NAME.name(), pdbName);
}
}
else if ("noncdb".equals(imageTagSuffix)) {
if (!Strings.isNullOrEmpty(pdbName)) {
connectorConfiguration.remove(OracleConnectorConfig.PDB_NAME.name());
}
}
else {
throw new RuntimeException("Invalid or unknown image tag '" + imageTagSuffix + "' for Oracle container image: " + oracleImageName);
}
}
public static ConnectorConfiguration getOracleConnectorConfiguration(int id, String... options) {
OracleContainer oracleContainer = RestExtensionTestInfrastructure.getOracleContainer();
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(oracleContainer)
.with(OracleConnectorConfig.PDB_NAME.name(), oracleContainer.ORACLE_PDB_NAME)
.with(OracleConnectorConfig.DATABASE_NAME.name(), oracleContainer.ORACLE_DBNAME)
.with(OracleConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092")
.with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.oracle");
if (options != null && options.length > 0) {
for (int i = 0; i < options.length; i += 2) {
config.with(options[i], options[i + 1]);
}
}
patchConnectorConfigurationForContainer(config, oracleContainer);
return config;
}
}

View File

@ -0,0 +1,121 @@
CREATE TABLE PEOPLE(name VARCHAR2(10));
ALTER TABLE PEOPLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
CREATE TABLE debezium_table1 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), PRIMARY KEY (id));
ALTER TABLE debezium_table1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
CREATE TABLE debezium_table2 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), PRIMARY KEY (id));
ALTER TABLE debezium_table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
CREATE TABLE debezium_table3 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), birth_date date, PRIMARY KEY (id));
ALTER TABLE debezium_table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
CREATE TABLE TEST
(COLUMN1 NUMBER(19) NOT NULL,
COLUMN2 VARCHAR2(255),
PRIMARY KEY (COLUMN1));
ALTER TABLE TEST ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
CREATE TABLE DEBEZIUM_TEST
(id NUMBER(19) NOT NULL,
col1 NUMERIC(4,2),
col2 VARCHAR2(255) DEFAULT 'debezium' NOT NULL,
col3 NVARCHAR2(255) NOT NULL,
col4 CHAR(4),
col5 NCHAR(4),
col6 FLOAT(126),
col7 DATE,
col8 TIMESTAMP,
col9 blob,
col10 clob,
col12 NUMBER(1,0),
col13 DATE NOT NULL,
PRIMARY KEY (id));
ALTER TABLE DEBEZIUM_TEST ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO PEOPLE (name) VALUES ('Larry');
INSERT INTO PEOPLE (name) VALUES ('Bruno');
INSERT INTO PEOPLE (name) VALUES ('Gerald');
CREATE TABLE products (
id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) NOT NULL PRIMARY KEY,
name VARCHAR2(255) NOT NULL,
description VARCHAR2(512),
weight FLOAT
);
ALTER TABLE products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO products
VALUES (NULL,'scooter','Small 2-wheel scooter',3.14);
INSERT INTO products
VALUES (NULL,'car battery','12V car battery',8.1);
INSERT INTO products
VALUES (NULL,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8);
INSERT INTO products
VALUES (NULL,'hammer','12oz carpenter''s hammer',0.75);
INSERT INTO products
VALUES (NULL,'hammer','14oz carpenter''s hammer',0.875);
INSERT INTO products
VALUES (NULL,'hammer','16oz carpenter''s hammer',1.0);
INSERT INTO products
VALUES (NULL,'rocks','box of assorted rocks',5.3);
INSERT INTO products
VALUES (NULL,'jacket','water resistent black wind breaker',0.1);
INSERT INTO products
VALUES (NULL,'spare tire','24 inch spare tire',22.2);
CREATE TABLE products_on_hand (
product_id NUMBER(4) NOT NULL PRIMARY KEY,
quantity NUMBER(4) NOT NULL,
FOREIGN KEY (product_id) REFERENCES products(id)
);
ALTER TABLE products_on_hand ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO products_on_hand VALUES (101, 3);
INSERT INTO products_on_hand VALUES (102, 8);
INSERT INTO products_on_hand VALUES (103, 18);
INSERT INTO products_on_hand VALUES (104, 4);
INSERT INTO products_on_hand VALUES (105, 5);
INSERT INTO products_on_hand VALUES (106, 0);
INSERT INTO products_on_hand VALUES (107, 44);
INSERT INTO products_on_hand VALUES (108, 2);
INSERT INTO products_on_hand VALUES (109, 5);
CREATE TABLE customers (
id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
first_name VARCHAR2(255) NOT NULL,
last_name VARCHAR2(255) NOT NULL,
email VARCHAR2(255) NOT NULL UNIQUE
);
ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO customers
VALUES (NULL,'Sally','Thomas','sally.thomas@acme.com');
INSERT INTO customers
VALUES (NULL,'George','Bailey','gbailey@foobar.com');
INSERT INTO customers
VALUES (NULL,'Edward','Walker','ed@walker.com');
INSERT INTO customers
VALUES (NULL,'Anne','Kretchmar','annek@noanswer.org');
CREATE TABLE debezium.orders (
id NUMBER(6) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 10001) NOT NULL PRIMARY KEY,
order_date DATE NOT NULL,
purchaser NUMBER(4) NOT NULL,
quantity NUMBER(4) NOT NULL,
product_id NUMBER(4) NOT NULL,
FOREIGN KEY (purchaser) REFERENCES customers(id),
FOREIGN KEY (product_id) REFERENCES products(id)
);
ALTER TABLE orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
INSERT INTO orders
VALUES (NULL, '16-JAN-2016', 1001, 1, 102);
INSERT INTO orders
VALUES (NULL, '17-JAN-2016', 1002, 2, 105);
INSERT INTO orders
VALUES (NULL, '19-FEB-2016', 1002, 2, 106);
INSERT INTO orders
VALUES (NULL, '21-FEB-2016', 1003, 1, 107);
COMMIT;

View File

@ -1174,7 +1174,7 @@ private static class SystemTablesPredicate implements TableFilter {
protected static final List<String> SYSTEM_SCHEMAS = Arrays.asList("pg_catalog", "information_schema");
// these are tables that may be placed in the user's schema but are system tables. This typically includes modules
// that install system tables such as the GEO module
protected static final List<String> SYSTEM_TABLES = Arrays.asList("spatial_ref_sys");
protected static final List<String> SYSTEM_TABLES = List.of("spatial_ref_sys");
protected static final String TEMP_TABLE_SCHEMA_PREFIX = "pg_temp";
@Override

View File

@ -13,9 +13,7 @@
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
* Kafka Connect's REST interface.
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:

View File

@ -5,6 +5,11 @@
*/
package io.debezium.connector.postgresql.rest;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -13,10 +18,17 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.TableId;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.SchemaResource;
import io.debezium.rest.model.DataCollection;
/**
* A JAX-RS Resource class defining endpoints of the Debezium Postgres Connect REST Extension
@ -25,7 +37,7 @@
@Path(DebeziumPostgresConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource {
public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource {
public static final String BASE_PATH = "/debezium/postgres";
public static final String VERSION_ENDPOINT = "/version";
@ -41,6 +53,25 @@ public Connector getConnector() {
return new PostgresConnector();
}
@Override
public List<DataCollection> getMatchingCollections(Configuration configuration) {
PostgresConnectorConfig config = new PostgresConnectorConfig(configuration);
try (PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) {
Set<TableId> tables;
try {
tables = connection.readTableNames(config.databaseName(), null, null, new String[]{ "TABLE" });
return tables.stream()
.filter(tableId -> config.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.map(tableId -> new DataCollection(tableId.schema(), tableId.table()))
.collect(Collectors.toList());
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/postgres.json";

View File

@ -98,6 +98,111 @@ public void testInvalidConnection() {
Map.of("property", PostgresConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required")));
}
@Test
public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(5))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"),
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithValidTableIncludeList() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1)
.with("table.include.list", "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(2))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithValidSchemaIncludeList() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1)
.with("schema.include.list", "inventory");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(5))
.body("matchingCollections",
hasItems(
Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"),
Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"),
Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"),
Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"),
Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products")));
}
@Test
public void testFiltersWithInvalidSchemaIncludeListPattern() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1)
.with("schema.include.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("schema.include.list"))
.body("message", equalTo(
"The 'schema.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
@Test
public void testFiltersWithInvalidSchemaExcludeListPattern() {
ConnectorConfiguration config = getPostgresConnectorConfiguration(1)
.with("schema.exclude.list", "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("schema.exclude.list"))
.body("message", equalTo(
"The 'schema.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getPostgresContainer())
.with(PostgresConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!)

View File

@ -13,9 +13,7 @@
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
* Kafka Connect's REST interface.
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:

View File

@ -5,6 +5,13 @@
*/
package io.debezium.connector.sqlserver.rest;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@ -13,10 +20,17 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.Module;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.rest.ConnectionValidationResource;
import io.debezium.rest.FilterValidationResource;
import io.debezium.rest.SchemaResource;
import io.debezium.rest.model.DataCollection;
/**
* A JAX-RS Resource class defining endpoints of the Debezium SQL Server Connect REST Extension
@ -25,7 +39,7 @@
@Path(DebeziumSqlServerConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource {
public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource {
public static final String BASE_PATH = "/debezium/sqlserver";
public static final String VERSION_ENDPOINT = "/version";
@ -41,6 +55,32 @@ public Connector getConnector() {
return new SqlServerConnector();
}
@Override
public List<DataCollection> getMatchingCollections(Configuration configuration) {
final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(configuration);
final List<String> databaseNames = sqlServerConfig.getDatabaseNames();
try (SqlServerConnection connection = new SqlServerConnection(sqlServerConfig, null, Collections.emptySet(), sqlServerConfig.useSingleDatabase())) {
Set<TableId> tables = new HashSet<>();
databaseNames.forEach(databaseName -> {
try {
tables.addAll(connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }));
}
catch (SQLException e) {
throw new DebeziumException(e);
}
});
return tables.stream()
.filter(tableId -> sqlServerConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.map(tableId -> new DataCollection(tableId.catalog(), tableId.schema(), tableId.table()))
.collect(Collectors.toList());
}
catch (SQLException e) {
throw new RuntimeException("Could not retrieve real database name", e);
}
}
@Override
public String getSchemaFilePath() {
return "/META-INF/resources/sqlserver.json";

View File

@ -102,6 +102,89 @@ public void testInvalidConnection() {
"The 'database.hostname' value is invalid: A value is required")));
}
@Test
public void testFiltersWithEmptyFilters() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1);
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(5))
.body("matchingCollections",
hasItems(
Map.of("realm", "testDB", "namespace", "inventory", "name", "products_on_hand", "identifier", "testDB.inventory.products_on_hand"),
Map.of("realm", "testDB", "namespace", "inventory", "name", "customers", "identifier", "testDB.inventory.customers"),
Map.of("realm", "testDB", "namespace", "inventory", "name", "orders", "identifier", "testDB.inventory.orders"),
Map.of("realm", "testDB", "namespace", "inventory", "name", "products", "identifier", "testDB.inventory.products"),
Map.of("realm", "testDB2", "namespace", "inventory", "name", "products", "identifier", "testDB2.inventory.products")));
}
@Test
public void testFiltersWithValidTableIncludeList() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "inventory\\.product.*");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("VALID"))
.body("validationResults.size()", is(0))
.body("matchingCollections.size()", is(3))
.body("matchingCollections",
hasItems(
Map.of("realm", "testDB", "namespace", "inventory", "name", "products_on_hand", "identifier", "testDB.inventory.products_on_hand"),
Map.of("realm", "testDB", "namespace", "inventory", "name", "products", "identifier", "testDB.inventory.products"),
Map.of("realm", "testDB2", "namespace", "inventory", "name", "products", "identifier", "testDB2.inventory.products")));
}
@Test
public void testFiltersWithInvalidTableIncludeList() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("table.include.list"))
.body("message", equalTo(
"The 'table.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
@Test
public void testFiltersWithInvalidSchemaExcludeList() {
ConnectorConfiguration config = getSqlServerConnectorConfiguration(1)
.with(SqlServerConnectorConfig.TABLE_EXCLUDE_LIST.name(), "+");
given()
.port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort())
.when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson())
.put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT)
.then().log().all()
.statusCode(200)
.assertThat().body("status", equalTo("INVALID"))
.body("matchingCollections.size()", is(0))
.body("validationResults.size()", is(1))
.rootPath("validationResults[0]")
.body("property", equalTo("table.exclude.list"))
.body("message", equalTo(
"The 'table.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^"));
}
public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) {
final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getSqlServerContainer())
.with(ConnectorConfiguration.USER, "sa")
@ -111,6 +194,7 @@ public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id,
.with(SqlServerConnectorConfig.DATABASE_NAMES.name(), "testDB,testDB2")
.with(SqlServerConnectorConfig.SNAPSHOT_MODE.name(), "initial")
.with(SqlServerConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id)
.with("driver.encrypt", false)
.with("database.encrypt", false);
if (options != null && options.length > 0) {

View File

@ -12,7 +12,7 @@
import org.apache.kafka.connect.connector.Connector;
import io.debezium.config.ValidationResults;
import io.debezium.rest.model.ValidationResults;
public interface ConnectionValidationResource {
@ -24,10 +24,9 @@ public interface ConnectionValidationResource {
@Path(VALIDATE_CONNECTION_ENDPOINT)
default ValidationResults validateConnectionProperties(Map<String, ?> properties) {
// switch classloader to the connector specific classloader in order to load dependencies required to validate the connector config
ValidationResults validationResults;
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(getConnector().getClass().getClassLoader());
validationResults = new ValidationResults(getConnector(), properties);
ValidationResults validationResults = ConnectorConfigValidator.validateConfig(getConnector(), properties);
Thread.currentThread().setContextClassLoader(originalClassLoader);
return validationResults;
}

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.rest.model.FilterValidationResults;
import io.debezium.rest.model.ValidationResults;
public class ConnectorConfigValidator {
public static ValidationResults validateConfig(Connector connector, Map<String, ?> properties) {
return new ValidationResults(connector.validate(convertPropertiesToStrings(properties)));
}
public static FilterValidationResults validateFilterConfig(Connector connector, Map<String, ?> properties,
FilterValidationResults.MatchingCollectionsParameter matchingCollections) {
return new FilterValidationResults(connector.validate(convertPropertiesToStrings(properties)), matchingCollections);
}
static Map<String, String> convertPropertiesToStrings(Map<String, ?> properties) {
return properties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())));
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest;
import java.util.List;
import java.util.Map;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import org.apache.kafka.connect.connector.Connector;
import io.debezium.config.Configuration;
import io.debezium.rest.model.DataCollection;
import io.debezium.rest.model.FilterValidationResults;
public interface FilterValidationResource {
Connector getConnector();
List<DataCollection> getMatchingCollections(Configuration configuration);
String VALIDATE_FILTERS_ENDPOINT = "/validate/filters";
@PUT
@Path(VALIDATE_FILTERS_ENDPOINT)
default FilterValidationResults validateFiltersProperties(Map<String, ?> properties) {
// switch classloader to the connector specific classloader in order to load dependencies required to validate the connector config
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(getConnector().getClass().getClassLoader());
FilterValidationResults validationResults = ConnectorConfigValidator.validateFilterConfig(
getConnector(), properties, () -> getMatchingCollections(Configuration.from(properties)));
Thread.currentThread().setContextClassLoader(originalClassLoader);
return validationResults;
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.model;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
/**
* A model class representing the structure of the response for the REST Extension
* call that returns the matching tables/collections on the `validate filters`
* endpoint.
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class DataCollection {
/**
* catalog, schema or replica set name
*/
private final String namespace;
/**
* table or collection
*/
private final String name;
/**
* optional database or schema name
*/
private final String realm;
public DataCollection(String namespace, String name) {
this.namespace = namespace;
this.name = name;
this.realm = null;
}
public DataCollection(String realm, String namespace, String name) {
this.realm = realm;
this.namespace = namespace;
this.name = name;
}
@JsonGetter("identifier")
public String identifier() {
if (null == this.realm || "null".equals(this.realm)) {
return this.namespace + "." + this.name;
}
else {
return this.realm + "." + this.namespace + "." + this.name;
}
}
@JsonGetter("realm")
public String realm() {
return this.realm;
}
@JsonGetter("namespace")
public String namespace() {
return this.namespace;
}
@JsonGetter("name")
public String getName() {
return this.name;
}
@Override
public String toString() {
return "DataCollection{" +
"realm='" + realm + '\'' +
", namespace='" + namespace + '\'' +
", name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest.model;
import java.util.List;
import org.apache.kafka.common.config.Config;
public class FilterValidationResults extends ValidationResults {
public final List<DataCollection> matchingCollections;
public FilterValidationResults(Config validatedConfig, MatchingCollectionsParameter matchingCollections) {
super(validatedConfig);
if (Status.INVALID == this.status) {
this.matchingCollections = List.of();
}
else {
this.matchingCollections = matchingCollections.get();
}
}
/**
* Functional interface to provide the list of matching tables or collections with an anonymous function / lambda with lazy evaluation.
*/
public interface MatchingCollectionsParameter {
/**
* @return the list of matching tables or collections that match the connector config based filters / filter conditions.
*/
List<DataCollection> get();
}
}

View File

@ -3,22 +3,20 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.config;
package io.debezium.rest.model;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.connector.Connector;
public class ValidationResults {
public final List<ValidationResult> validationResults;
public Status status;
public final List<ValidationResult> validationResults;
public ValidationResults(Connector connector, Map<String, ?> properties) {
this.validationResults = convertConfigToValidationResults(connector.validate(convertPropertiesToStrings(properties)));
public ValidationResults(Config validatedConfig) {
this.validationResults = convertConfigToValidationResults(validatedConfig);
if (validationResults.isEmpty()) {
this.status = Status.VALID;
}
@ -27,11 +25,6 @@ public ValidationResults(Connector connector, Map<String, ?> properties) {
}
}
private static Map<String, String> convertPropertiesToStrings(Map<String, ?> properties) {
return properties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())));
}
private List<ValidationResult> convertConfigToValidationResults(Config result) {
return result.configValues()
.stream()

View File

@ -5,7 +5,11 @@
*/
package io.debezium.testing.testcontainers;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.testcontainers.containers.JdbcDatabaseContainer;
@ -26,11 +30,12 @@ public class ConnectorConfiguration {
public static final String PASSWORD = "database.password";
public static final String DBNAME = "database.dbname";
private static final ObjectMapper MAPPER = new ObjectMapper();
private final ObjectNode configNode;
protected ConnectorConfiguration() {
final ObjectMapper mapper = new ObjectMapper();
this.configNode = mapper.createObjectNode();
this.configNode = MAPPER.createObjectNode();
this.configNode.put("tasks.max", 1);
}
@ -115,10 +120,23 @@ public ConnectorConfiguration with(String key, Double value) {
return this;
}
public ConnectorConfiguration remove(String key) {
this.configNode.remove(key);
return this;
}
public String toJson() {
return getConfiguration().toString();
}
public Properties asProperties() {
var connectorProperties = new Properties();
Map<?, ?> configMap = MAPPER.convertValue(configNode, HashMap.class);
configMap.values().removeIf(Objects::isNull);
connectorProperties.putAll(configMap);
return connectorProperties;
}
ObjectNode getConfiguration() {
return configNode;
}

View File

@ -145,9 +145,11 @@ public void start() {
*/
@Override
public void stop() {
// Idempotent
LOGGER.info("Stopping {} shard cluster...", shards.size());
MoreStartables.deepStopSync(stream());
if (started) {
LOGGER.info("Stopping {} shard cluster...", shards.size());
MoreStartables.deepStopSync(stream());
started = false;
}
}
public int size() {

View File

@ -16,13 +16,13 @@
public class OracleContainer extends org.testcontainers.containers.OracleContainer {
private static final String FALLBACK_ORACLE_SERVER_VERSION = "21.3.0";
static final String DEFAULT_TAG = parameterWithDefault(System.getProperty("version.oracle.server"), FALLBACK_ORACLE_SERVER_VERSION);
static final String ORACLE_USERNAME = parameterWithDefault(System.getProperty("database.username"), "c##dbzuser");
static final String ORACLE_PASSWORD = parameterWithDefault(System.getProperty("database.password"), "dbz");
static final String ORACLE_DBNAME = parameterWithDefault(System.getProperty("database.dbname"), "ORCLCDB");
static final int ORACLE_PORT = 1521;
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("quay.io/rh_integration/dbz-oracle");
public static final String DEFAULT_TAG = parameterWithDefault(System.getProperty("version.oracle.server"), FALLBACK_ORACLE_SERVER_VERSION);
public static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("quay.io/rh_integration/dbz-oracle");
public final String ORACLE_DBNAME = parameterWithDefault(System.getProperty("database.dbname"), "ORCLCDB");
public final String ORACLE_PDB_NAME = parameterWithDefault(System.getProperty("database.pdb.name"), "ORCLPDB1");
private static final String ORACLE_USERNAME = parameterWithDefault(System.getProperty("database.username"), "debezium");
private static final String ORACLE_PASSWORD = parameterWithDefault(System.getProperty("database.password"), "dbz");
public static final int ORACLE_PORT = 1521;
private static final int ORACLE_DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120;
private static final int APEX_HTTP_PORT = 8080;

View File

@ -7,6 +7,7 @@
import java.nio.file.Paths;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
@ -21,6 +22,7 @@
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;
@ -86,9 +88,14 @@ public enum DATABASE {
.withEnv("MSSQL_PID", "Standard")
.withEnv("MSSQL_AGENT_ENABLED", "true")
.withPassword("Password!")
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(5)))
.withInitScript("initialize-sqlserver-database.sql")
.acceptLicense();
.acceptLicense()
.waitingFor(new LogMessageWaitStrategy()
.withRegEx(".*SQL Server is now ready for client connections\\..*\\s")
.withTimes(1)
.withStartupTimeout(Duration.of(CI_CONTAINER_STARTUP_TIME * 3, ChronoUnit.SECONDS)))
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)))
.withConnectTimeoutSeconds(300);
private static final OracleContainer ORACLE_CONTAINER = (OracleContainer) new OracleContainer()
.withNetwork(NETWORK)
@ -128,7 +135,8 @@ private static Supplier<Stream<Startable>> getContainers(DATABASE database) {
}
public static void stopContainers() {
Stream<Startable> containers = Stream.of(DEBEZIUM_CONTAINER, SQL_SERVER_CONTAINER, MONGODB_REPLICA, MYSQL_CONTAINER, POSTGRES_CONTAINER, KAFKA_CONTAINER);
Stream<Startable> containers = Stream.of(DEBEZIUM_CONTAINER, ORACLE_CONTAINER, SQL_SERVER_CONTAINER, MONGODB_REPLICA, MYSQL_CONTAINER, POSTGRES_CONTAINER,
KAFKA_CONTAINER);
MoreStartables.deepStopSync(containers);
DEBEZIUM_CONTAINER = null;
}