From 1e69e40ec268aff72c5fe44e777188dd8a70bd5c Mon Sep 17 00:00:00 2001 From: rkerner Date: Thu, 22 Feb 2024 14:21:28 +0100 Subject: [PATCH] DBZ-7416 Fix duplicate SMTs sometimes returned by Kafka Connect. Moved deduplication from Map to LinkedHashSet. + minor fixes added for cleanup and centralization of common code closes https://issues.redhat.com/browse/DBZ-7416 --- .../rest/DebeziumMongoDbConnectorResource.java | 3 +-- .../mysql/rest/DebeziumMySqlConnectorResource.java | 3 +-- .../oracle/rest/DebeziumOracleConnectorResource.java | 3 +-- .../rest/DebeziumPostgresConnectorResource.java | 3 +-- .../rest/DebeziumSqlServerConnectorResource.java | 3 +-- .../debezium/rest/ConnectionValidationResource.java | 5 +---- .../main/java/io/debezium/rest/ConnectorAware.java | 12 ++++++++++++ .../io/debezium/rest/FilterValidationResource.java | 8 +++----- 8 files changed, 21 insertions(+), 19 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/rest/ConnectorAware.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java index f212fd3e4..c0d883c35 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/rest/DebeziumMongoDbConnectorResource.java @@ -35,8 +35,7 @@ @Path(DebeziumMongoDbConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumMongoDbConnectorResource - implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { +public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { public static final String BASE_PATH = "/debezium/mongodb"; public static final String VERSION_ENDPOINT = "/version"; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java index 8d7a79c3a..9040009e7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/rest/DebeziumMySqlConnectorResource.java @@ -35,8 +35,7 @@ @Path(DebeziumMySqlConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumMySqlConnectorResource - implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { +public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { public static final String BASE_PATH = "/debezium/mysql"; public static final String VERSION_ENDPOINT = "/version"; diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java index b116da3f2..a1cc4e5e4 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/rest/DebeziumOracleConnectorResource.java @@ -35,8 +35,7 @@ @Path(DebeziumOracleConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumOracleConnectorResource - implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { +public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { public static final String BASE_PATH = "/debezium/oracle"; public static final String VERSION_ENDPOINT = "/version"; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java index 16e0ee9d8..c171d6b34 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/rest/DebeziumPostgresConnectorResource.java @@ -35,8 +35,7 @@ @Path(DebeziumPostgresConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumPostgresConnectorResource - implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { +public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { public static final String BASE_PATH = "/debezium/postgres"; public static final String VERSION_ENDPOINT = "/version"; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java index b1c7f7e57..73f5a9c46 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/rest/DebeziumSqlServerConnectorResource.java @@ -35,8 +35,7 @@ @Path(DebeziumSqlServerConnectorResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class DebeziumSqlServerConnectorResource - implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { +public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource { public static final String BASE_PATH = "/debezium/sqlserver"; public static final String VERSION_ENDPOINT = "/version"; diff --git a/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java index 76088dc3b..23a9a9528 100644 --- a/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java +++ b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java @@ -10,12 +10,9 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; -import io.debezium.connector.common.BaseSourceConnector; import io.debezium.rest.model.ValidationResults; -public interface ConnectionValidationResource { - - T getConnector(); +public interface ConnectionValidationResource extends ConnectorAware { String VALIDATE_CONNECTION_ENDPOINT = "/validate/connection"; diff --git a/debezium-core/src/main/java/io/debezium/rest/ConnectorAware.java b/debezium-core/src/main/java/io/debezium/rest/ConnectorAware.java new file mode 100644 index 000000000..fdb0a3752 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/ConnectorAware.java @@ -0,0 +1,12 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest; + +import io.debezium.connector.common.BaseSourceConnector; + +public interface ConnectorAware { + BaseSourceConnector getConnector(); +} diff --git a/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java b/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java index 00649a08d..3b6c6afb7 100644 --- a/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java +++ b/debezium-core/src/main/java/io/debezium/rest/FilterValidationResource.java @@ -12,16 +12,15 @@ import javax.ws.rs.Path; import io.debezium.config.Configuration; -import io.debezium.connector.common.BaseSourceConnector; import io.debezium.rest.model.DataCollection; import io.debezium.rest.model.FilterValidationResults; -public interface FilterValidationResource { - - T getConnector(); +public interface FilterValidationResource extends ConnectorAware { String VALIDATE_FILTERS_ENDPOINT = "/validate/filters"; + List getMatchingCollections(Configuration configuration); + @PUT @Path(VALIDATE_FILTERS_ENDPOINT) default FilterValidationResults validateFiltersProperties(Map properties) { @@ -34,5 +33,4 @@ default FilterValidationResults validateFiltersProperties(Map propert return validationResults; } - List getMatchingCollections(Configuration configuration); }