diff --git a/.github/workflows/debezium-workflow.yml b/.github/workflows/debezium-workflow.yml index c83b3b281..07c82eba9 100644 --- a/.github/workflows/debezium-workflow.yml +++ b/.github/workflows/debezium-workflow.yml @@ -32,7 +32,6 @@ jobs: rest-extension-changed: ${{ steps.changed-files-rest-extension.outputs.any_changed }} schema-generator-changed: ${{ steps.changed-files-schema-generator.outputs.any_changed }} debezium-testing-changed: ${{ steps.changed-files-debezium-testing.outputs.any_changed }} - debezium-ui-changed: ${{ steps.changed-files-debezium-ui.outputs.any_changed }} mysql-ddl-parser-changed: ${{ steps.changed-files-mysql-ddl-parser.outputs.any_changed }} oracle-ddl-parser-changed: ${{ steps.changed-files-oracle-ddl-parser.outputs.any_changed }} documentation-only-changed: ${{ steps.changed-files-documentation.outputs.only_changed}} @@ -1036,30 +1035,3 @@ jobs: -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -DskipNonCore - - build_ui: - needs: [check_style, file_changes] - if: ${{ needs.file_changes.outputs.debezium-ui-changed == 'true' }} - name: "UI" - runs-on: ubuntu-latest - steps: - - name: Checkout Action (UI) - uses: actions/checkout@v4 - with: - repository: debezium/debezium-ui - path: debezium-ui - - - name: Set up Java 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Cache Maven Repository - uses: actions/cache@v3 - with: - path: ~/.m2/repository - key: maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} - restore-keys: | - maven-debezium-test-build-${{ hashFiles('core/**/pom.xml') }} - diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectRestExtension.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectRestExtension.java index db8edc6da..b4b88fc17 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectRestExtension.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectRestExtension.java @@ -14,8 +14,6 @@ /** * A Kafka Connect REST extension that enables some advanced features over * Kafka Connect's REST interface: - * + report available transformations and their configuration - * + return if topic auto-creation is available and enabled * * To install this extension put the jar file into a separate Kafka Connect * plugin dir and configure your Kafka Connect properties file with: diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java index 159c9cbac..6b98adbbe 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java @@ -5,6 +5,9 @@ */ package io.debezium.connector.mongodb.rest; +import java.util.List; +import java.util.stream.Collectors; + import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -13,19 +16,26 @@ import org.apache.kafka.connect.connector.Connector; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.mongodb.Module; import io.debezium.connector.mongodb.MongoDbConnector; +import io.debezium.connector.mongodb.MongoDbTaskContext; +import io.debezium.connector.mongodb.connection.MongoDbConnection; +import io.debezium.connector.mongodb.connection.ReplicaSet; import io.debezium.rest.ConnectionValidationResource; +import io.debezium.rest.FilterValidationResource; import io.debezium.rest.SchemaResource; +import io.debezium.rest.model.DataCollection; /** - * A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension + * A JAX-RS Resource class defining endpoints of the Debezium MongoDB Connect REST Extension * */ @Path(DebeziumMongoDbConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource { +public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource { public static final String BASE_PATH = "/debezium/mongodb"; public static final String VERSION_ENDPOINT = "/version"; @@ -41,6 +51,26 @@ public Connector getConnector() { return new MongoDbConnector(); } + protected MongoDbConnection getConnection(Configuration configuration) { + MongoDbTaskContext context = new MongoDbTaskContext(configuration); + ReplicaSet replicaSet = new ReplicaSet(context.getConnectionContext().connectionString()); + return context.getConnectionContext().connect(replicaSet, context.filters(), (s, throwable) -> { + throw new DebeziumException(s, throwable); + }); + } + + @Override + public List getMatchingCollections(Configuration configuration) { + try (MongoDbConnection primary = getConnection(configuration)) { + return primary.collections().stream() + .map(collectionId -> new DataCollection(collectionId.replicaSetName(), collectionId.dbName(), collectionId.name())) + .collect(Collectors.toList()); + } + catch (InterruptedException e) { + throw new DebeziumException(e); + } + } + @Override public String getSchemaFilePath() { return "/META-INF/resources/mongodb.json"; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java index 8480297cf..b4f1840f9 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResourceIT.java @@ -105,6 +105,105 @@ public void testInvalidConnection() { Map.of("property", MongoDbConnectorConfig.TOPIC_PREFIX.name(), "message", "The 'topic.prefix' value is invalid: A value is required"))); } + @Test + public void testFiltersWithEmptyFilters() { + ConnectorConfiguration config = getMongoDbConnectorConfiguration(1); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(3)) + .body("matchingCollections", + hasItems( + Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"), + Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"), + Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products"))); + } + + @Test + public void testFiltersWithValidCollectionIncludeList() { + ConnectorConfiguration config = getMongoDbConnectorConfiguration(1) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name(), "inventory\\.product.*"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(1)) + .body("matchingCollections", + hasItems(Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products"))); + } + + @Test + public void testFiltersWithValidDatabaseIncludeList() { + ConnectorConfiguration config = getMongoDbConnectorConfiguration(1) + .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "inventory"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(3)) + .body("matchingCollections", + hasItems( + Map.of("realm", "rs0", "namespace", "inventory", "name", "customers", "identifier", "rs0.inventory.customers"), + Map.of("realm", "rs0", "namespace", "inventory", "name", "orders", "identifier", "rs0.inventory.orders"), + Map.of("realm", "rs0", "namespace", "inventory", "name", "products", "identifier", "rs0.inventory.products"))); + } + + @Test + public void testFiltersWithInvalidDatabaseIncludeListPattern() { + ConnectorConfiguration config = getMongoDbConnectorConfiguration(1) + .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name(), "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name())) + .body("message", equalTo( + "The 'database.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + + @Test + public void testFiltersWithInvalidDatabaseExcludeListPattern() { + ConnectorConfiguration config = getMongoDbConnectorConfiguration(1) + .with(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name(), "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMongoDbConnectorResource.BASE_PATH + DebeziumMongoDbConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name())) + .body("message", equalTo( + "The 'database.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + public static ConnectorConfiguration getMongoDbConnectorConfiguration(int id, String... options) { final ConnectorConfiguration config = ConnectorConfiguration.forMongoDbReplicaSet(RestExtensionTestInfrastructure.getMongoDbContainer()) .with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS.name(), 10000) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectRestExtension.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectRestExtension.java index 03c99e771..ac9be381a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectRestExtension.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectRestExtension.java @@ -13,9 +13,7 @@ /** * A Kafka Connect REST extension that enables some advanced features over - * Kafka Connect's REST interface: - * + report available transformations and their configuration - * + return if topic auto-creation is available and enabled + * Kafka Connect's REST interface. * * To install this extension put the jar file into a separate Kafka Connect * plugin dir and configure your Kafka Connect properties file with: diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java index d3ecf0da8..951dd3ac2 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java @@ -5,6 +5,12 @@ */ package io.debezium.connector.mysql.rest; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -13,10 +19,17 @@ import org.apache.kafka.connect.connector.Connector; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.mysql.Module; +import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnector; +import io.debezium.connector.mysql.MySqlConnectorConfig; +import io.debezium.relational.TableId; import io.debezium.rest.ConnectionValidationResource; +import io.debezium.rest.FilterValidationResource; import io.debezium.rest.SchemaResource; +import io.debezium.rest.model.DataCollection; /** * A JAX-RS Resource class defining endpoints of the Debezium MySQL Connect REST Extension @@ -25,12 +38,10 @@ @Path(DebeziumMySqlConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource { +public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource { public static final String BASE_PATH = "/debezium/mysql"; public static final String VERSION_ENDPOINT = "/version"; - public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties"; - public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector"; @Override public String getSchemaFilePath() { @@ -42,6 +53,40 @@ public Connector getConnector() { return new MySqlConnector(); } + @Override + public List getMatchingCollections(Configuration configuration) { + final MySqlConnectorConfig config = new MySqlConnectorConfig(configuration); + final MySqlConnection.MySqlConnectionConfiguration connectionConfig = new MySqlConnection.MySqlConnectionConfiguration(configuration); + try (MySqlConnection connection = new MySqlConnection(connectionConfig)) { + Set tables; + + final List databaseNames = new ArrayList<>(); + connection.query("SHOW DATABASES", rs -> { + while (rs.next()) { + databaseNames.add(rs.getString(1)); + } + }); + + List allMatchingTables = new ArrayList<>(); + + for (String databaseName : databaseNames) { + if (!config.getTableFilters().databaseFilter().test(databaseName)) { + continue; + } + tables = connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }); + var matchingTables = tables.stream() + .filter(tableId -> config.getTableFilters().dataCollectionFilter().isIncluded(tableId)) + .map(tableId -> new DataCollection(tableId.catalog(), tableId.table())) + .collect(Collectors.toList()); + allMatchingTables.addAll(matchingTables); + } + return allMatchingTables; + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + @GET @Path(VERSION_ENDPOINT) public String getConnectorVersion() { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java index 0807404de..e5e213adb 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResourceIT.java @@ -98,6 +98,113 @@ public void testInvalidConnection() { Map.of("property", MySqlConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required"))); } + @Test + public void testFiltersWithEmptyFilters() { + ConnectorConfiguration config = getMySqlConnectorConfiguration(1); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(6)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"), + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"), + Map.of("namespace", "inventory", "name", "addresses", "identifier", "inventory.addresses"), + Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithValidTableIncludeList() { + ConnectorConfiguration config = getMySqlConnectorConfiguration(1) + .with("table.include.list", "inventory\\.product.*"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(2)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithValidDatabaseIncludeList() { + ConnectorConfiguration config = getMySqlConnectorConfiguration(1) + .with("database.include.list", "inventory"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(6)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"), + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"), + Map.of("namespace", "inventory", "name", "addresses", "identifier", "inventory.addresses"), + Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithInvalidDatabaseIncludeListPattern() { + ConnectorConfiguration config = getMySqlConnectorConfiguration(1) + .with("database.include.list", "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("database.include.list")) + .body("message", equalTo( + "The 'database.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + + @Test + public void testFiltersWithInvalidDatabaseExcludeListPattern() { + ConnectorConfiguration config = getMySqlConnectorConfiguration(1) + .with("database.exclude.list", "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumMySqlConnectorResource.BASE_PATH + DebeziumMySqlConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("database.exclude.list")) + .body("message", equalTo( + "The 'database.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + public static ConnectorConfiguration getMySqlConnectorConfiguration(int id, String... options) { final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getMySqlContainer()) .with(MySqlConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!) diff --git a/debezium-connector-oracle/pom.xml b/debezium-connector-oracle/pom.xml index bdf6f320d..ed8383958 100644 --- a/debezium-connector-oracle/pom.xml +++ b/debezium-connector-oracle/pom.xml @@ -556,6 +556,7 @@ ${database.username} ${database.password} ${database.dbname} + ${database.pdb.name} diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectRestExtension.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectRestExtension.java index 9f1ea26ef..6f80a9d79 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectRestExtension.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectRestExtension.java @@ -13,9 +13,7 @@ /** * A Kafka Connect REST extension that enables some advanced features over - * Kafka Connect's REST interface: - * + report available transformations and their configuration - * + return if topic auto-creation is available and enabled + * Kafka Connect's REST interface. * * To install this extension put the jar file into a separate Kafka Connect * plugin dir and configure your Kafka Connect properties file with: diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java index 1b42fcd08..a3cc39519 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java @@ -5,6 +5,11 @@ */ package io.debezium.connector.oracle.rest; +import java.sql.SQLException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -13,10 +18,18 @@ import org.apache.kafka.connect.connector.Connector; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.oracle.Module; +import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnector; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.relational.TableId; import io.debezium.rest.ConnectionValidationResource; +import io.debezium.rest.FilterValidationResource; import io.debezium.rest.SchemaResource; +import io.debezium.rest.model.DataCollection; +import io.debezium.util.Strings; /** * A JAX-RS Resource class defining endpoints of the Debezium Oracle Connect REST Extension @@ -25,12 +38,10 @@ @Path(DebeziumOracleConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource { +public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource { public static final String BASE_PATH = "/debezium/oracle"; public static final String VERSION_ENDPOINT = "/version"; - public static final String VALIDATE_PROPERTIES_ENDPOINT = "/validate/properties"; - public static final String VALIDATE_CONNECTOR_ENDPOINT = "/validate/connector"; @Override public String getSchemaFilePath() { @@ -42,6 +53,30 @@ public Connector getConnector() { return new OracleConnector(); } + @Override + public List getMatchingCollections(Configuration configuration) { + final OracleConnectorConfig oracleConfig = new OracleConnectorConfig(configuration); + final String databaseName = oracleConfig.getCatalogName(); + + try (OracleConnection connection = new OracleConnection(oracleConfig.getJdbcConfig(), false)) { + if (!Strings.isNullOrBlank(oracleConfig.getPdbName())) { + connection.setSessionToPdb(oracleConfig.getPdbName()); + } + Set tables; + // @TODO: we need to expose a better method from the connector, particularly getAllTableIds + // the following's performance is acceptable when using PDBs but not as ideal with non-PDB + tables = connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" }); + + return tables.stream() + .filter(tableId -> oracleConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) + .map(tableId -> new DataCollection(tableId.schema(), tableId.table())) + .collect(Collectors.toList()); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + @GET @Path(VERSION_ENDPOINT) public String getConnectorVersion() { diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java index e2f91999c..cb937837e 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResourceIT.java @@ -15,42 +15,45 @@ import java.util.Locale; import java.util.Map; -import org.junit.After; +import org.junit.AfterClass; import org.junit.Assume; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import io.debezium.connector.oracle.Module; import io.debezium.connector.oracle.OracleConnector; import io.debezium.connector.oracle.OracleConnectorConfig; -import io.debezium.storage.kafka.history.KafkaSchemaHistory; +import io.debezium.connector.oracle.util.TestHelper; import io.debezium.testing.testcontainers.ConnectorConfiguration; import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure; import io.restassured.http.ContentType; public class DebeziumOracleConnectorResourceIT { - @BeforeClass - public static void checkCondition() { - Assume.assumeThat("Skipping DebeziumOracleConnectorResourceIT tests when assembly profile is not active!", System.getProperty("isAssemblyProfileActive", "false"), - is("true")); - } + private static final boolean IS_ASSEMBLY_PROFILE_ACTIVE = System.getProperty("isAssemblyProfileActive", "false").equalsIgnoreCase("true"); + private static String ORACLE_USERNAME; + private static boolean running = false; - @Before - public void start() { + @BeforeClass + public static void checkConditionAndStart() { + Assume.assumeTrue("Skipping DebeziumOracleConnectorResourceIT tests when assembly profile is not active!", IS_ASSEMBLY_PROFILE_ACTIVE); RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumOracleConnectRestExtension.class.getName()); RestExtensionTestInfrastructure.startContainers(DATABASE.ORACLE); + TestHelper.loadTestData(TestHelper.getOracleConnectorConfiguration(1), "rest/data.sql"); + ORACLE_USERNAME = RestExtensionTestInfrastructure.getOracleContainer().getUsername(); + running = true; } - @After - public void stop() { - RestExtensionTestInfrastructure.stopContainers(); + @AfterClass + public static void stop() { + if (IS_ASSEMBLY_PROFILE_ACTIVE && running) { + RestExtensionTestInfrastructure.stopContainers(); + } } @Test public void testValidConnection() { - ConnectorConfiguration config = getOracleConnectorConfiguration(1); + ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1); given() .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) @@ -64,7 +67,7 @@ public void testValidConnection() { @Test public void testInvalidHostnameConnection() { - ConnectorConfiguration config = getOracleConnectorConfiguration(1).with(OracleConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz"); + ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1).with(OracleConnectorConfig.HOSTNAME.name(), "zzzzzzzzzz"); Locale.setDefault(new Locale("en", "US")); // to enforce errormessages in English given() @@ -99,20 +102,80 @@ public void testInvalidConnection() { Map.of("property", OracleConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required"))); } - public static ConnectorConfiguration getOracleConnectorConfiguration(int id, String... options) { - final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getOracleContainer()) - .with(OracleConnectorConfig.PDB_NAME.name(), "ORCLPDB1") - .with(OracleConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id) - .with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092") - .with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.inventory"); + @Test + public void testFiltersWithEmptyFilters() { + ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1); - if (options != null && options.length > 0) { - for (int i = 0; i < options.length; i += 2) { - config.with(options[i], options[i + 1]); - } - } + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(10)) + .body("matchingCollections", + hasItems( + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "ORDERS", "identifier", ORACLE_USERNAME.toUpperCase() + ".ORDERS"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PEOPLE", "identifier", ORACLE_USERNAME.toUpperCase() + ".PEOPLE"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "TEST", "identifier", ORACLE_USERNAME.toUpperCase() + ".TEST"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PRODUCTS", "identifier", ORACLE_USERNAME.toUpperCase() + ".PRODUCTS"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "CUSTOMERS", "identifier", ORACLE_USERNAME.toUpperCase() + ".CUSTOMERS"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE3", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE3"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE2", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE2"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE1", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE1"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "PRODUCTS_ON_HAND", "identifier", + ORACLE_USERNAME.toUpperCase() + ".PRODUCTS_ON_HAND"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TEST", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TEST"))); + } - return config; + @Test + public void testFiltersWithValidTableIncludeList() { + ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1) + .with("table.include.list", ORACLE_USERNAME.toUpperCase() + "\\.DEBEZIUM_TABLE.*"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(3)) + .body("matchingCollections", + hasItems( + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE3", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE3"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE1", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE1"), + Map.of("namespace", ORACLE_USERNAME.toUpperCase(), "name", "DEBEZIUM_TABLE2", "identifier", + ORACLE_USERNAME.toUpperCase() + ".DEBEZIUM_TABLE2"))); + } + + @Test + public void testFiltersWithInvalidTableIncludeList() { + ConnectorConfiguration config = TestHelper.getOracleConnectorConfiguration(1) + .with("table.include.list", "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumOracleConnectorResource.BASE_PATH + DebeziumOracleConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("table.include.list")) + .body("message", equalTo( + "The 'table.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java index 5d9cc00e8..9ca4c335c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/util/TestHelper.java @@ -5,7 +5,11 @@ */ package io.debezium.connector.oracle.util; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.sql.SQLException; import java.util.ArrayList; @@ -16,7 +20,10 @@ import org.awaitility.Awaitility; import org.infinispan.client.hotrod.impl.ConfigurationProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -26,15 +33,20 @@ import io.debezium.connector.oracle.OracleConnectorConfig.LogMiningBufferType; import io.debezium.connector.oracle.Scn; import io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider; +import io.debezium.connector.oracle.rest.DebeziumOracleConnectorResourceIT; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.storage.file.history.FileSchemaHistory; +import io.debezium.storage.kafka.history.KafkaSchemaHistory; +import io.debezium.testing.testcontainers.ConnectorConfiguration; +import io.debezium.testing.testcontainers.OracleContainer; +import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure; import io.debezium.util.Strings; import io.debezium.util.Testing; public class TestHelper { private static final String PDB_NAME = "pdb.name"; - private static final String DATABASE_PREFIX = "database."; + private static final String DATABASE_PREFIX = CommonConnectorConfig.DATABASE_CONFIG_PREFIX; private static final String DATABASE_ADMIN_PREFIX = "database.admin."; public static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-connect.txt").toAbsolutePath(); @@ -75,7 +87,9 @@ public class TestHelper { */ public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale"; - private static Map cacheMappings = new HashMap<>(); + private static final Logger LOGGER = LoggerFactory.getLogger(TestHelper.class); + + private static final Map cacheMappings = new HashMap<>(); static { cacheMappings.put(CacheProvider.TRANSACTIONS_CACHE_NAME, OracleConnectorConfig.LOG_MINING_BUFFER_INFINISPAN_CACHE_TRANSACTIONS); @@ -660,4 +674,118 @@ public static Scn getCurrentScn() throws SQLException { return admin.getCurrentScn(); } } + + // Below are test helper methods for integration tests using the Testcointainers based OracleContainer instance: + + private static Configuration getTestConnectionConfiguration(ConnectorConfiguration config) { + var connectionConfiguration = Configuration.from(config.asProperties()).subset(CommonConnectorConfig.DATABASE_CONFIG_PREFIX, true); + var dbName = Strings.isNullOrEmpty(connectionConfiguration.getString(PDB_NAME)) + ? connectionConfiguration.getString(JdbcConfiguration.DATABASE) + : connectionConfiguration.getString(PDB_NAME); + return connectionConfiguration.edit() + .with(JdbcConfiguration.HOSTNAME.name(), "localhost") + .with(JdbcConfiguration.PORT, RestExtensionTestInfrastructure.getOracleContainer().getMappedPort(OracleContainer.ORACLE_PORT)) + .with(JdbcConfiguration.DATABASE, dbName) + .build(); + } + + // expects user passed in the config to be any local user account on the Oracle DB instance + private static OracleConnection createConnection(ConnectorConfiguration config, boolean autoCommit) { + Configuration connectionConfiguration = getTestConnectionConfiguration(config); + OracleConnection connection = new OracleConnection(JdbcConfiguration.adapt(connectionConfiguration)); + try { + connection.setAutoCommit(autoCommit); + return connection; + } + catch (SQLException e) { + throw new RuntimeException("Failed to create connection", e); + } + } + + // Will only work for SQL files that use ";" as ending of an SQL statement, other ";" can't be used in the SQL code + private static String[] getResourceSqlFileContent(String file) { + try (var is = DebeziumOracleConnectorResourceIT.class.getClassLoader().getResourceAsStream(file)) { + if (null == is) { + throw new IllegalArgumentException("File not found. (" + file + ")"); + } + try ( + var streamReader = new InputStreamReader(is, StandardCharsets.UTF_8); + var reader = new BufferedReader(streamReader)) { + List sqlStatements = new ArrayList<>(); + var sb = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + if (line.endsWith(";")) { + sb.append(line, 0, line.length() - 1); + sqlStatements.add(sb.toString()); + sb = new StringBuilder(); + } + else { + sb.append(line).append(" "); + } + } + return sqlStatements.toArray(new String[0]); + } + } + catch (IOException e) { + throw new DebeziumException(e); + } + } + + public static void loadTestData(ConnectorConfiguration config, String sqlFile) { + try (var conn = TestHelper.createConnection(config, false)) { + conn.execute(getResourceSqlFileContent(sqlFile)); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static void patchConnectorConfigurationForContainer(ConnectorConfiguration connectorConfiguration, OracleContainer oracleContainer) { + var oracleImageName = oracleContainer.getDockerImageName(); + if (!oracleImageName.startsWith(OracleContainer.DEFAULT_IMAGE_NAME.getUnversionedPart())) { + return; + } + String imageTag = "latest"; + String imageTagSuffix = ""; + if (oracleImageName.contains(":")) { + imageTag = oracleImageName.split(":")[1]; + } + if (imageTag.contains("-")) { + imageTagSuffix = imageTag.substring(imageTag.lastIndexOf("-") + 1); + } + String pdbName = connectorConfiguration.asProperties().getProperty(OracleConnectorConfig.PDB_NAME.name()); + if (!imageTag.contains("-") || "xs".equals(imageTagSuffix)) { + if (!Strings.isNullOrEmpty(pdbName)) { + connectorConfiguration.with(OracleConnectorConfig.DATABASE_NAME.name(), pdbName); + } + } + else if ("noncdb".equals(imageTagSuffix)) { + if (!Strings.isNullOrEmpty(pdbName)) { + connectorConfiguration.remove(OracleConnectorConfig.PDB_NAME.name()); + } + } + else { + throw new RuntimeException("Invalid or unknown image tag '" + imageTagSuffix + "' for Oracle container image: " + oracleImageName); + } + } + + public static ConnectorConfiguration getOracleConnectorConfiguration(int id, String... options) { + OracleContainer oracleContainer = RestExtensionTestInfrastructure.getOracleContainer(); + final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(oracleContainer) + .with(OracleConnectorConfig.PDB_NAME.name(), oracleContainer.ORACLE_PDB_NAME) + .with(OracleConnectorConfig.DATABASE_NAME.name(), oracleContainer.ORACLE_DBNAME) + .with(OracleConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id) + .with(KafkaSchemaHistory.BOOTSTRAP_SERVERS.name(), RestExtensionTestInfrastructure.KAFKA_HOSTNAME + ":9092") + .with(KafkaSchemaHistory.TOPIC.name(), "dbhistory.oracle"); + + if (options != null && options.length > 0) { + for (int i = 0; i < options.length; i += 2) { + config.with(options[i], options[i + 1]); + } + } + + patchConnectorConfigurationForContainer(config, oracleContainer); + return config; + } } diff --git a/debezium-connector-oracle/src/test/resources/rest/data.sql b/debezium-connector-oracle/src/test/resources/rest/data.sql new file mode 100644 index 000000000..09a592829 --- /dev/null +++ b/debezium-connector-oracle/src/test/resources/rest/data.sql @@ -0,0 +1,121 @@ +CREATE TABLE PEOPLE(name VARCHAR2(10)); +ALTER TABLE PEOPLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +CREATE TABLE debezium_table1 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), PRIMARY KEY (id)); +ALTER TABLE debezium_table1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +CREATE TABLE debezium_table2 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), PRIMARY KEY (id)); +ALTER TABLE debezium_table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +CREATE TABLE debezium_table3 (id NUMERIC(9,0) NOT NULL, name VARCHAR2(1000), birth_date date, PRIMARY KEY (id)); +ALTER TABLE debezium_table3 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +CREATE TABLE TEST +(COLUMN1 NUMBER(19) NOT NULL, +COLUMN2 VARCHAR2(255), +PRIMARY KEY (COLUMN1)); +ALTER TABLE TEST ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +CREATE TABLE DEBEZIUM_TEST +(id NUMBER(19) NOT NULL, +col1 NUMERIC(4,2), +col2 VARCHAR2(255) DEFAULT 'debezium' NOT NULL, +col3 NVARCHAR2(255) NOT NULL, +col4 CHAR(4), +col5 NCHAR(4), +col6 FLOAT(126), +col7 DATE, +col8 TIMESTAMP, +col9 blob, +col10 clob, +col12 NUMBER(1,0), +col13 DATE NOT NULL, +PRIMARY KEY (id)); +ALTER TABLE DEBEZIUM_TEST ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO PEOPLE (name) VALUES ('Larry'); +INSERT INTO PEOPLE (name) VALUES ('Bruno'); +INSERT INTO PEOPLE (name) VALUES ('Gerald'); + +CREATE TABLE products ( + id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 101) NOT NULL PRIMARY KEY, + name VARCHAR2(255) NOT NULL, + description VARCHAR2(512), + weight FLOAT +); +ALTER TABLE products ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO products + VALUES (NULL,'scooter','Small 2-wheel scooter',3.14); +INSERT INTO products + VALUES (NULL,'car battery','12V car battery',8.1); +INSERT INTO products + VALUES (NULL,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8); +INSERT INTO products + VALUES (NULL,'hammer','12oz carpenter''s hammer',0.75); +INSERT INTO products + VALUES (NULL,'hammer','14oz carpenter''s hammer',0.875); +INSERT INTO products + VALUES (NULL,'hammer','16oz carpenter''s hammer',1.0); +INSERT INTO products + VALUES (NULL,'rocks','box of assorted rocks',5.3); +INSERT INTO products + VALUES (NULL,'jacket','water resistent black wind breaker',0.1); +INSERT INTO products + VALUES (NULL,'spare tire','24 inch spare tire',22.2); + +CREATE TABLE products_on_hand ( + product_id NUMBER(4) NOT NULL PRIMARY KEY, + quantity NUMBER(4) NOT NULL, + FOREIGN KEY (product_id) REFERENCES products(id) +); +ALTER TABLE products_on_hand ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO products_on_hand VALUES (101, 3); +INSERT INTO products_on_hand VALUES (102, 8); +INSERT INTO products_on_hand VALUES (103, 18); +INSERT INTO products_on_hand VALUES (104, 4); +INSERT INTO products_on_hand VALUES (105, 5); +INSERT INTO products_on_hand VALUES (106, 0); +INSERT INTO products_on_hand VALUES (107, 44); +INSERT INTO products_on_hand VALUES (108, 2); +INSERT INTO products_on_hand VALUES (109, 5); + +CREATE TABLE customers ( + id NUMBER(4) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY, + first_name VARCHAR2(255) NOT NULL, + last_name VARCHAR2(255) NOT NULL, + email VARCHAR2(255) NOT NULL UNIQUE +); +ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO customers + VALUES (NULL,'Sally','Thomas','sally.thomas@acme.com'); +INSERT INTO customers + VALUES (NULL,'George','Bailey','gbailey@foobar.com'); +INSERT INTO customers + VALUES (NULL,'Edward','Walker','ed@walker.com'); +INSERT INTO customers + VALUES (NULL,'Anne','Kretchmar','annek@noanswer.org'); + +CREATE TABLE debezium.orders ( + id NUMBER(6) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 10001) NOT NULL PRIMARY KEY, + order_date DATE NOT NULL, + purchaser NUMBER(4) NOT NULL, + quantity NUMBER(4) NOT NULL, + product_id NUMBER(4) NOT NULL, + FOREIGN KEY (purchaser) REFERENCES customers(id), + FOREIGN KEY (product_id) REFERENCES products(id) +); +ALTER TABLE orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; + +INSERT INTO orders + VALUES (NULL, '16-JAN-2016', 1001, 1, 102); +INSERT INTO orders + VALUES (NULL, '17-JAN-2016', 1002, 2, 105); +INSERT INTO orders + VALUES (NULL, '19-FEB-2016', 1002, 2, 106); +INSERT INTO orders + VALUES (NULL, '21-FEB-2016', 1003, 1, 107); + +COMMIT; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index b1e61845e..b09456ead 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -1174,7 +1174,7 @@ private static class SystemTablesPredicate implements TableFilter { protected static final List SYSTEM_SCHEMAS = Arrays.asList("pg_catalog", "information_schema"); // these are tables that may be placed in the user's schema but are system tables. This typically includes modules // that install system tables such as the GEO module - protected static final List SYSTEM_TABLES = Arrays.asList("spatial_ref_sys"); + protected static final List SYSTEM_TABLES = List.of("spatial_ref_sys"); protected static final String TEMP_TABLE_SCHEMA_PREFIX = "pg_temp"; @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectRestExtension.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectRestExtension.java index f2cdbef5f..7afd2e3e4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectRestExtension.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectRestExtension.java @@ -13,9 +13,7 @@ /** * A Kafka Connect REST extension that enables some advanced features over - * Kafka Connect's REST interface: - * + report available transformations and their configuration - * + return if topic auto-creation is available and enabled + * Kafka Connect's REST interface. * * To install this extension put the jar file into a separate Kafka Connect * plugin dir and configure your Kafka Connect properties file with: diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java index 2ca0c6827..c6027a8c6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java @@ -5,6 +5,11 @@ */ package io.debezium.connector.postgresql.rest; +import java.sql.SQLException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -13,10 +18,17 @@ import org.apache.kafka.connect.connector.Connector; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.postgresql.Module; import io.debezium.connector.postgresql.PostgresConnector; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.relational.TableId; import io.debezium.rest.ConnectionValidationResource; +import io.debezium.rest.FilterValidationResource; import io.debezium.rest.SchemaResource; +import io.debezium.rest.model.DataCollection; /** * A JAX-RS Resource class defining endpoints of the Debezium Postgres Connect REST Extension @@ -25,7 +37,7 @@ @Path(DebeziumPostgresConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource { +public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource { public static final String BASE_PATH = "/debezium/postgres"; public static final String VERSION_ENDPOINT = "/version"; @@ -41,6 +53,25 @@ public Connector getConnector() { return new PostgresConnector(); } + @Override + public List getMatchingCollections(Configuration configuration) { + PostgresConnectorConfig config = new PostgresConnectorConfig(configuration); + try (PostgresConnection connection = new PostgresConnection(config.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) { + Set tables; + try { + tables = connection.readTableNames(config.databaseName(), null, null, new String[]{ "TABLE" }); + + return tables.stream() + .filter(tableId -> config.getTableFilters().dataCollectionFilter().isIncluded(tableId)) + .map(tableId -> new DataCollection(tableId.schema(), tableId.table())) + .collect(Collectors.toList()); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + } + @Override public String getSchemaFilePath() { return "/META-INF/resources/postgres.json"; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java index adb2a026f..bc3be4201 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResourceIT.java @@ -98,6 +98,111 @@ public void testInvalidConnection() { Map.of("property", PostgresConnectorConfig.HOSTNAME.name(), "message", "The 'database.hostname' value is invalid: A value is required"))); } + @Test + public void testFiltersWithEmptyFilters() { + ConnectorConfiguration config = getPostgresConnectorConfiguration(1); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(5)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"), + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"), + Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithValidTableIncludeList() { + ConnectorConfiguration config = getPostgresConnectorConfiguration(1) + .with("table.include.list", "inventory\\.product.*"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(2)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithValidSchemaIncludeList() { + ConnectorConfiguration config = getPostgresConnectorConfiguration(1) + .with("schema.include.list", "inventory"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(5)) + .body("matchingCollections", + hasItems( + Map.of("namespace", "inventory", "name", "geom", "identifier", "inventory.geom"), + Map.of("namespace", "inventory", "name", "products_on_hand", "identifier", "inventory.products_on_hand"), + Map.of("namespace", "inventory", "name", "customers", "identifier", "inventory.customers"), + Map.of("namespace", "inventory", "name", "orders", "identifier", "inventory.orders"), + Map.of("namespace", "inventory", "name", "products", "identifier", "inventory.products"))); + } + + @Test + public void testFiltersWithInvalidSchemaIncludeListPattern() { + ConnectorConfiguration config = getPostgresConnectorConfiguration(1) + .with("schema.include.list", "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("schema.include.list")) + .body("message", equalTo( + "The 'schema.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + + @Test + public void testFiltersWithInvalidSchemaExcludeListPattern() { + ConnectorConfiguration config = getPostgresConnectorConfiguration(1) + .with("schema.exclude.list", "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumPostgresConnectorResource.BASE_PATH + DebeziumPostgresConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("schema.exclude.list")) + .body("message", equalTo( + "The 'schema.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + private static ConnectorConfiguration getPostgresConnectorConfiguration(int id, String... options) { final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getPostgresContainer()) .with(PostgresConnectorConfig.SNAPSHOT_MODE.name(), "never") // temporarily disable snapshot mode globally until we can check if connectors inside testcontainers are in SNAPSHOT or STREAMING mode (wait for snapshot finished!) diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectRestExtension.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectRestExtension.java index 424624810..6d0229b07 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectRestExtension.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectRestExtension.java @@ -13,9 +13,7 @@ /** * A Kafka Connect REST extension that enables some advanced features over - * Kafka Connect's REST interface: - * + report available transformations and their configuration - * + return if topic auto-creation is available and enabled + * Kafka Connect's REST interface. * * To install this extension put the jar file into a separate Kafka Connect * plugin dir and configure your Kafka Connect properties file with: diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java index c497733d3..f4cb839d9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java @@ -5,6 +5,13 @@ */ package io.debezium.connector.sqlserver.rest; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -13,10 +20,17 @@ import org.apache.kafka.connect.connector.Connector; +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.sqlserver.Module; +import io.debezium.connector.sqlserver.SqlServerConnection; import io.debezium.connector.sqlserver.SqlServerConnector; +import io.debezium.connector.sqlserver.SqlServerConnectorConfig; +import io.debezium.relational.TableId; import io.debezium.rest.ConnectionValidationResource; +import io.debezium.rest.FilterValidationResource; import io.debezium.rest.SchemaResource; +import io.debezium.rest.model.DataCollection; /** * A JAX-RS Resource class defining endpoints of the Debezium SQL Server Connect REST Extension @@ -25,7 +39,7 @@ @Path(DebeziumSqlServerConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource { +public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource { public static final String BASE_PATH = "/debezium/sqlserver"; public static final String VERSION_ENDPOINT = "/version"; @@ -41,6 +55,32 @@ public Connector getConnector() { return new SqlServerConnector(); } + @Override + public List getMatchingCollections(Configuration configuration) { + final SqlServerConnectorConfig sqlServerConfig = new SqlServerConnectorConfig(configuration); + final List databaseNames = sqlServerConfig.getDatabaseNames(); + + try (SqlServerConnection connection = new SqlServerConnection(sqlServerConfig, null, Collections.emptySet(), sqlServerConfig.useSingleDatabase())) { + Set tables = new HashSet<>(); + databaseNames.forEach(databaseName -> { + try { + tables.addAll(connection.readTableNames(databaseName, null, null, new String[]{ "TABLE" })); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + }); + + return tables.stream() + .filter(tableId -> sqlServerConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) + .map(tableId -> new DataCollection(tableId.catalog(), tableId.schema(), tableId.table())) + .collect(Collectors.toList()); + } + catch (SQLException e) { + throw new RuntimeException("Could not retrieve real database name", e); + } + } + @Override public String getSchemaFilePath() { return "/META-INF/resources/sqlserver.json"; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java index 869f159a2..f6e04e6ee 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResourceIT.java @@ -102,6 +102,89 @@ public void testInvalidConnection() { "The 'database.hostname' value is invalid: A value is required"))); } + @Test + public void testFiltersWithEmptyFilters() { + ConnectorConfiguration config = getSqlServerConnectorConfiguration(1); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(5)) + .body("matchingCollections", + hasItems( + Map.of("realm", "testDB", "namespace", "inventory", "name", "products_on_hand", "identifier", "testDB.inventory.products_on_hand"), + Map.of("realm", "testDB", "namespace", "inventory", "name", "customers", "identifier", "testDB.inventory.customers"), + Map.of("realm", "testDB", "namespace", "inventory", "name", "orders", "identifier", "testDB.inventory.orders"), + Map.of("realm", "testDB", "namespace", "inventory", "name", "products", "identifier", "testDB.inventory.products"), + Map.of("realm", "testDB2", "namespace", "inventory", "name", "products", "identifier", "testDB2.inventory.products"))); + } + + @Test + public void testFiltersWithValidTableIncludeList() { + ConnectorConfiguration config = getSqlServerConnectorConfiguration(1) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "inventory\\.product.*"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("VALID")) + .body("validationResults.size()", is(0)) + .body("matchingCollections.size()", is(3)) + .body("matchingCollections", + hasItems( + Map.of("realm", "testDB", "namespace", "inventory", "name", "products_on_hand", "identifier", "testDB.inventory.products_on_hand"), + Map.of("realm", "testDB", "namespace", "inventory", "name", "products", "identifier", "testDB.inventory.products"), + Map.of("realm", "testDB2", "namespace", "inventory", "name", "products", "identifier", "testDB2.inventory.products"))); + } + + @Test + public void testFiltersWithInvalidTableIncludeList() { + ConnectorConfiguration config = getSqlServerConnectorConfiguration(1) + .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST.name(), "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("table.include.list")) + .body("message", equalTo( + "The 'table.include.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + + @Test + public void testFiltersWithInvalidSchemaExcludeList() { + ConnectorConfiguration config = getSqlServerConnectorConfiguration(1) + .with(SqlServerConnectorConfig.TABLE_EXCLUDE_LIST.name(), "+"); + + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().contentType(ContentType.JSON).accept(ContentType.JSON).body(config.toJson()) + .put(DebeziumSqlServerConnectorResource.BASE_PATH + DebeziumSqlServerConnectorResource.VALIDATE_FILTERS_ENDPOINT) + .then().log().all() + .statusCode(200) + .assertThat().body("status", equalTo("INVALID")) + .body("matchingCollections.size()", is(0)) + .body("validationResults.size()", is(1)) + .rootPath("validationResults[0]") + .body("property", equalTo("table.exclude.list")) + .body("message", equalTo( + "The 'table.exclude.list' value is invalid: A comma-separated list of valid regular expressions is expected, but Dangling meta character '+' near index 0\n+\n^")); + } + public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, String... options) { final ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(RestExtensionTestInfrastructure.getSqlServerContainer()) .with(ConnectorConfiguration.USER, "sa") @@ -111,6 +194,7 @@ public static ConnectorConfiguration getSqlServerConnectorConfiguration(int id, .with(SqlServerConnectorConfig.DATABASE_NAMES.name(), "testDB,testDB2") .with(SqlServerConnectorConfig.SNAPSHOT_MODE.name(), "initial") .with(SqlServerConnectorConfig.TOPIC_PREFIX.name(), "dbserver" + id) + .with("driver.encrypt", false) .with("database.encrypt", false); if (options != null && options.length > 0) { diff --git a/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java index f6ea2f9e2..5897b89ea 100644 --- a/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java +++ b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java @@ -12,7 +12,7 @@ import org.apache.kafka.connect.connector.Connector; -import io.debezium.config.ValidationResults; +import io.debezium.rest.model.ValidationResults; public interface ConnectionValidationResource { @@ -24,10 +24,9 @@ public interface ConnectionValidationResource { @Path(VALIDATE_CONNECTION_ENDPOINT) default ValidationResults validateConnectionProperties(Map properties) { // switch classloader to the connector specific classloader in order to load dependencies required to validate the connector config - ValidationResults validationResults; ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(getConnector().getClass().getClassLoader()); - validationResults = new ValidationResults(getConnector(), properties); + ValidationResults validationResults = ConnectorConfigValidator.validateConfig(getConnector(), properties); Thread.currentThread().setContextClassLoader(originalClassLoader); return validationResults; } diff --git a/debezium-core/src/main/java/io/debezium/rest/ConnectorConfigValidator.java b/debezium-core/src/main/java/io/debezium/rest/ConnectorConfigValidator.java new file mode 100644 index 000000000..f8887e9f3 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/ConnectorConfigValidator.java @@ -0,0 +1,32 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest; + +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.connect.connector.Connector; + +import io.debezium.rest.model.FilterValidationResults; +import io.debezium.rest.model.ValidationResults; + +public class ConnectorConfigValidator { + + public static ValidationResults validateConfig(Connector connector, Map properties) { + return new ValidationResults(connector.validate(convertPropertiesToStrings(properties))); + } + + public static FilterValidationResults validateFilterConfig(Connector connector, Map properties, + FilterValidationResults.MatchingCollectionsParameter matchingCollections) { + return new FilterValidationResults(connector.validate(convertPropertiesToStrings(properties)), matchingCollections); + } + + static Map convertPropertiesToStrings(Map properties) { + return properties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))); + } + +} diff --git a/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java b/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java new file mode 100644 index 000000000..b41d27f5c --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest; + +import java.util.List; +import java.util.Map; + +import javax.ws.rs.PUT; +import javax.ws.rs.Path; + +import org.apache.kafka.connect.connector.Connector; + +import io.debezium.config.Configuration; +import io.debezium.rest.model.DataCollection; +import io.debezium.rest.model.FilterValidationResults; + +public interface FilterValidationResource { + + Connector getConnector(); + + List getMatchingCollections(Configuration configuration); + + String VALIDATE_FILTERS_ENDPOINT = "/validate/filters"; + + @PUT + @Path(VALIDATE_FILTERS_ENDPOINT) + default FilterValidationResults validateFiltersProperties(Map properties) { + // switch classloader to the connector specific classloader in order to load dependencies required to validate the connector config + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(getConnector().getClass().getClassLoader()); + FilterValidationResults validationResults = ConnectorConfigValidator.validateFilterConfig( + getConnector(), properties, () -> getMatchingCollections(Configuration.from(properties))); + Thread.currentThread().setContextClassLoader(originalClassLoader); + return validationResults; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/rest/model/DataCollection.java b/debezium-core/src/main/java/io/debezium/rest/model/DataCollection.java new file mode 100644 index 000000000..7206ab929 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/model/DataCollection.java @@ -0,0 +1,79 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest.model; + +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonInclude; + +/** + * A model class representing the structure of the response for the REST Extension + * call that returns the matching tables/collections on the `validate filters` + * endpoint. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class DataCollection { + + /** + * catalog, schema or replica set name + */ + private final String namespace; + + /** + * table or collection + */ + private final String name; + + /** + * optional database or schema name + */ + private final String realm; + + public DataCollection(String namespace, String name) { + this.namespace = namespace; + this.name = name; + this.realm = null; + } + + public DataCollection(String realm, String namespace, String name) { + this.realm = realm; + this.namespace = namespace; + this.name = name; + } + + @JsonGetter("identifier") + public String identifier() { + if (null == this.realm || "null".equals(this.realm)) { + return this.namespace + "." + this.name; + } + else { + return this.realm + "." + this.namespace + "." + this.name; + } + } + + @JsonGetter("realm") + public String realm() { + return this.realm; + } + + @JsonGetter("namespace") + public String namespace() { + return this.namespace; + } + + @JsonGetter("name") + public String getName() { + return this.name; + } + + @Override + public String toString() { + return "DataCollection{" + + "realm='" + realm + '\'' + + ", namespace='" + namespace + '\'' + + ", name='" + name + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/rest/model/FilterValidationResults.java b/debezium-core/src/main/java/io/debezium/rest/model/FilterValidationResults.java new file mode 100644 index 000000000..4dbab3fab --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/model/FilterValidationResults.java @@ -0,0 +1,36 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest.model; + +import java.util.List; + +import org.apache.kafka.common.config.Config; + +public class FilterValidationResults extends ValidationResults { + + public final List matchingCollections; + + public FilterValidationResults(Config validatedConfig, MatchingCollectionsParameter matchingCollections) { + super(validatedConfig); + if (Status.INVALID == this.status) { + this.matchingCollections = List.of(); + } + else { + this.matchingCollections = matchingCollections.get(); + } + } + + /** + * Functional interface to provide the list of matching tables or collections with an anonymous function / lambda with lazy evaluation. + */ + public interface MatchingCollectionsParameter { + /** + * @return the list of matching tables or collections that match the connector config based filters / filter conditions. + */ + List get(); + } + +} diff --git a/debezium-core/src/main/java/io/debezium/config/ValidationResults.java b/debezium-core/src/main/java/io/debezium/rest/model/ValidationResults.java similarity index 74% rename from debezium-core/src/main/java/io/debezium/config/ValidationResults.java rename to debezium-core/src/main/java/io/debezium/rest/model/ValidationResults.java index a8d5beba6..23c0a4b83 100644 --- a/debezium-core/src/main/java/io/debezium/config/ValidationResults.java +++ b/debezium-core/src/main/java/io/debezium/rest/model/ValidationResults.java @@ -3,22 +3,20 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.config; +package io.debezium.rest.model; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.common.config.Config; -import org.apache.kafka.connect.connector.Connector; public class ValidationResults { - public final List validationResults; public Status status; + public final List validationResults; - public ValidationResults(Connector connector, Map properties) { - this.validationResults = convertConfigToValidationResults(connector.validate(convertPropertiesToStrings(properties))); + public ValidationResults(Config validatedConfig) { + this.validationResults = convertConfigToValidationResults(validatedConfig); if (validationResults.isEmpty()) { this.status = Status.VALID; } @@ -27,11 +25,6 @@ public ValidationResults(Connector connector, Map properties) { } } - private static Map convertPropertiesToStrings(Map properties) { - return properties.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))); - } - private List convertConfigToValidationResults(Config result) { return result.configValues() .stream() diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java index d4752d655..e667269c7 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java @@ -5,7 +5,11 @@ */ package io.debezium.testing.testcontainers; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; import org.testcontainers.containers.JdbcDatabaseContainer; @@ -26,11 +30,12 @@ public class ConnectorConfiguration { public static final String PASSWORD = "database.password"; public static final String DBNAME = "database.dbname"; + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final ObjectNode configNode; protected ConnectorConfiguration() { - final ObjectMapper mapper = new ObjectMapper(); - this.configNode = mapper.createObjectNode(); + this.configNode = MAPPER.createObjectNode(); this.configNode.put("tasks.max", 1); } @@ -115,10 +120,23 @@ public ConnectorConfiguration with(String key, Double value) { return this; } + public ConnectorConfiguration remove(String key) { + this.configNode.remove(key); + return this; + } + public String toJson() { return getConfiguration().toString(); } + public Properties asProperties() { + var connectorProperties = new Properties(); + Map configMap = MAPPER.convertValue(configNode, HashMap.class); + configMap.values().removeIf(Objects::isNull); + connectorProperties.putAll(configMap); + return connectorProperties; + } + ObjectNode getConfiguration() { return configNode; } diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/MongoDbShardedCluster.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/MongoDbShardedCluster.java index a4edb0b9a..22a85810e 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/MongoDbShardedCluster.java +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/MongoDbShardedCluster.java @@ -145,9 +145,11 @@ public void start() { */ @Override public void stop() { - // Idempotent - LOGGER.info("Stopping {} shard cluster...", shards.size()); - MoreStartables.deepStopSync(stream()); + if (started) { + LOGGER.info("Stopping {} shard cluster...", shards.size()); + MoreStartables.deepStopSync(stream()); + started = false; + } } public int size() { diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/OracleContainer.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/OracleContainer.java index 4b05ad0fe..a5676560a 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/OracleContainer.java +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/OracleContainer.java @@ -16,13 +16,13 @@ public class OracleContainer extends org.testcontainers.containers.OracleContainer { private static final String FALLBACK_ORACLE_SERVER_VERSION = "21.3.0"; - static final String DEFAULT_TAG = parameterWithDefault(System.getProperty("version.oracle.server"), FALLBACK_ORACLE_SERVER_VERSION); - static final String ORACLE_USERNAME = parameterWithDefault(System.getProperty("database.username"), "c##dbzuser"); - static final String ORACLE_PASSWORD = parameterWithDefault(System.getProperty("database.password"), "dbz"); - static final String ORACLE_DBNAME = parameterWithDefault(System.getProperty("database.dbname"), "ORCLCDB"); - - static final int ORACLE_PORT = 1521; - private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("quay.io/rh_integration/dbz-oracle"); + public static final String DEFAULT_TAG = parameterWithDefault(System.getProperty("version.oracle.server"), FALLBACK_ORACLE_SERVER_VERSION); + public static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("quay.io/rh_integration/dbz-oracle"); + public final String ORACLE_DBNAME = parameterWithDefault(System.getProperty("database.dbname"), "ORCLCDB"); + public final String ORACLE_PDB_NAME = parameterWithDefault(System.getProperty("database.pdb.name"), "ORCLPDB1"); + private static final String ORACLE_USERNAME = parameterWithDefault(System.getProperty("database.username"), "debezium"); + private static final String ORACLE_PASSWORD = parameterWithDefault(System.getProperty("database.password"), "dbz"); + public static final int ORACLE_PORT = 1521; private static final int ORACLE_DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; private static final int APEX_HTTP_PORT = 8080; diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java index caff7012b..aa8061a36 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java @@ -7,6 +7,7 @@ import java.nio.file.Paths; import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Stream; @@ -21,6 +22,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; import org.testcontainers.images.builder.ImageFromDockerfile; import org.testcontainers.lifecycle.Startable; import org.testcontainers.utility.DockerImageName; @@ -86,9 +88,14 @@ public enum DATABASE { .withEnv("MSSQL_PID", "Standard") .withEnv("MSSQL_AGENT_ENABLED", "true") .withPassword("Password!") - .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(5))) .withInitScript("initialize-sqlserver-database.sql") - .acceptLicense(); + .acceptLicense() + .waitingFor(new LogMessageWaitStrategy() + .withRegEx(".*SQL Server is now ready for client connections\\..*\\s") + .withTimes(1) + .withStartupTimeout(Duration.of(CI_CONTAINER_STARTUP_TIME * 3, ChronoUnit.SECONDS))) + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10))) + .withConnectTimeoutSeconds(300); private static final OracleContainer ORACLE_CONTAINER = (OracleContainer) new OracleContainer() .withNetwork(NETWORK) @@ -128,7 +135,8 @@ private static Supplier> getContainers(DATABASE database) { } public static void stopContainers() { - Stream containers = Stream.of(DEBEZIUM_CONTAINER, SQL_SERVER_CONTAINER, MONGODB_REPLICA, MYSQL_CONTAINER, POSTGRES_CONTAINER, KAFKA_CONTAINER); + Stream containers = Stream.of(DEBEZIUM_CONTAINER, ORACLE_CONTAINER, SQL_SERVER_CONTAINER, MONGODB_REPLICA, MYSQL_CONTAINER, POSTGRES_CONTAINER, + KAFKA_CONTAINER); MoreStartables.deepStopSync(containers); DEBEZIUM_CONTAINER = null; }