DBZ-7416 Fix duplicate SMTs sometimes returned by Kafka Connect. Moved deduplication from Map to LinkedHashSet.

+ minor fixes added for cleanup and centralization of common code

closes https://issues.redhat.com/browse/DBZ-7416
This commit is contained in:
rkerner 2024-02-22 14:21:28 +01:00 committed by Chris Cranford
parent 68b6591142
commit 1e69e40ec2
8 changed files with 21 additions and 19 deletions

View File

@ -35,8 +35,7 @@
@Path(DebeziumMongoDbConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMongoDbConnectorResource
implements SchemaResource, ConnectionValidationResource<MongoDbConnector>, FilterValidationResource<MongoDbConnector>, MetricsResource {
public class DebeziumMongoDbConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource {
public static final String BASE_PATH = "/debezium/mongodb";
public static final String VERSION_ENDPOINT = "/version";

View File

@ -35,8 +35,7 @@
@Path(DebeziumMySqlConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumMySqlConnectorResource
implements SchemaResource, ConnectionValidationResource<MySqlConnector>, FilterValidationResource<MySqlConnector>, MetricsResource {
public class DebeziumMySqlConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource {
public static final String BASE_PATH = "/debezium/mysql";
public static final String VERSION_ENDPOINT = "/version";

View File

@ -35,8 +35,7 @@
@Path(DebeziumOracleConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumOracleConnectorResource
implements SchemaResource, ConnectionValidationResource<OracleConnector>, FilterValidationResource<OracleConnector>, MetricsResource {
public class DebeziumOracleConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource {
public static final String BASE_PATH = "/debezium/oracle";
public static final String VERSION_ENDPOINT = "/version";

View File

@ -35,8 +35,7 @@
@Path(DebeziumPostgresConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumPostgresConnectorResource
implements SchemaResource, ConnectionValidationResource<PostgresConnector>, FilterValidationResource<PostgresConnector>, MetricsResource {
public class DebeziumPostgresConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource {
public static final String BASE_PATH = "/debezium/postgres";
public static final String VERSION_ENDPOINT = "/version";

View File

@ -35,8 +35,7 @@
@Path(DebeziumSqlServerConnectorResource.BASE_PATH)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumSqlServerConnectorResource
implements SchemaResource, ConnectionValidationResource<SqlServerConnector>, FilterValidationResource<SqlServerConnector>, MetricsResource {
public class DebeziumSqlServerConnectorResource implements SchemaResource, ConnectionValidationResource, FilterValidationResource, MetricsResource {
public static final String BASE_PATH = "/debezium/sqlserver";
public static final String VERSION_ENDPOINT = "/version";

View File

@ -10,12 +10,9 @@
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import io.debezium.connector.common.BaseSourceConnector;
import io.debezium.rest.model.ValidationResults;
public interface ConnectionValidationResource<T extends BaseSourceConnector> {
T getConnector();
public interface ConnectionValidationResource extends ConnectorAware {
String VALIDATE_CONNECTION_ENDPOINT = "/validate/connection";

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.rest;
import io.debezium.connector.common.BaseSourceConnector;
public interface ConnectorAware {
BaseSourceConnector getConnector();
}

View File

@ -12,16 +12,15 @@
import javax.ws.rs.Path;
import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceConnector;
import io.debezium.rest.model.DataCollection;
import io.debezium.rest.model.FilterValidationResults;
public interface FilterValidationResource<T extends BaseSourceConnector> {
T getConnector();
public interface FilterValidationResource extends ConnectorAware {
String VALIDATE_FILTERS_ENDPOINT = "/validate/filters";
List<DataCollection> getMatchingCollections(Configuration configuration);
@PUT
@Path(VALIDATE_FILTERS_ENDPOINT)
default FilterValidationResults validateFiltersProperties(Map<String, ?> properties) {
@ -34,5 +33,4 @@ default FilterValidationResults validateFiltersProperties(Map<String, ?> propert
return validationResults;
}
List<DataCollection> getMatchingCollections(Configuration configuration);
}