DBZ-7416 Fix duplicate SMTs sometimes returned by Kafka Connect. Moved deduplication from Map to LinkedHashSet.

closes https://issues.redhat.com/browse/DBZ-7416
This commit is contained in:
Jiri Pechanec 2024-02-22 09:06:29 +01:00 committed by Chris Cranford
parent 08e46815e4
commit 68b6591142
7 changed files with 32 additions and 17 deletions

View File

@ -11,8 +11,8 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -68,7 +68,7 @@ public class DebeziumResource {
private final Boolean isTopicCreationEnabled; private final Boolean isTopicCreationEnabled;
private List<TransformDefinition> transforms = null; private List<TransformDefinition> transforms = null;
private List<PredicateDefinition> predicates = null; private List<PredicateDefinition> predicates = null;
private List<ConnectorDescriptor> availableConnectorPlugins = null; private Set<ConnectorDescriptor> availableConnectorPlugins = null;
private static final Pattern VERSION_PATTERN = Pattern private static final Pattern VERSION_PATTERN = Pattern
.compile("([1-9][0-9]*(?:(?:\\.0)*\\.[1-9][0-9]*)*)(?:-([a-zA-Z0-9]+))?(?:(\\+)(0|[1-9][0-9]*)?)?(?:-([-a-zA-Z0-9.]+))?"); .compile("([1-9][0-9]*(?:(?:\\.0)*\\.[1-9][0-9]*)*)(?:-([a-zA-Z0-9]+))?(?:(\\+)(0|[1-9][0-9]*)?)?(?:-([-a-zA-Z0-9.]+))?");
@ -93,20 +93,20 @@ else if (m.lookingAt()) {
throw new IllegalArgumentException("Invalid version string: \"" + version + "\""); throw new IllegalArgumentException("Invalid version string: \"" + version + "\"");
} }
private static <T> void addConnectorPlugins(Map<String, ConnectorDescriptor> connectorPlugins, Collection<PluginDesc<T>> plugins) { private static <T> void addConnectorPlugins(Set<ConnectorDescriptor> connectorPlugins, Collection<PluginDesc<T>> plugins) {
plugins.stream() plugins.stream()
.filter(p -> SUPPORTED_CONNECTORS.contains(p.pluginClass().getName())) .filter(p -> SUPPORTED_CONNECTORS.contains(p.pluginClass().getName()))
.forEach(p -> connectorPlugins.put(p.pluginClass().getName() + "#" + p.version(), new ConnectorDescriptor(p.pluginClass().getName(), p.version()))); .forEach(p -> connectorPlugins.add(new ConnectorDescriptor(p.pluginClass().getName(), p.version())));
} }
private synchronized void initConnectorPlugins() { private synchronized void initConnectorPlugins() {
if (null == this.availableConnectorPlugins || this.availableConnectorPlugins.isEmpty()) { if (null == this.availableConnectorPlugins || this.availableConnectorPlugins.isEmpty()) {
// TODO: improve once plugins are allowed to be added/removed during runtime by Kafka Connect, @see org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource // TODO: improve once plugins are allowed to be added/removed during runtime by Kafka Connect, @see org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
final Map<String, ConnectorDescriptor> connectorPlugins = new HashMap<>(); final Set<ConnectorDescriptor> connectorPlugins = new LinkedHashSet<>();
Herder herder = getHerder(); Herder herder = getHerder();
addConnectorPlugins(connectorPlugins, herder.plugins().sinkConnectors()); addConnectorPlugins(connectorPlugins, herder.plugins().sinkConnectors());
addConnectorPlugins(connectorPlugins, herder.plugins().sourceConnectors()); addConnectorPlugins(connectorPlugins, herder.plugins().sourceConnectors());
this.availableConnectorPlugins = Collections.unmodifiableList(new ArrayList<>(connectorPlugins.values())); this.availableConnectorPlugins = Collections.unmodifiableSet(connectorPlugins);
} }
} }
@ -165,7 +165,7 @@ private synchronized Herder getHerder() {
@GET @GET
@Path(CONNECTOR_PLUGINS_ENDPOINT) @Path(CONNECTOR_PLUGINS_ENDPOINT)
@Produces(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON)
public List<ConnectorDescriptor> availableDebeziumConnectors() { public Set<ConnectorDescriptor> availableDebeziumConnectors() {
initConnectorPlugins(); initConnectorPlugins();
return this.availableConnectorPlugins; return this.availableConnectorPlugins;
} }

View File

@ -16,7 +16,7 @@ public class MongoDbConnectorMetadata implements ConnectorMetadata {
@Override @Override
public ConnectorDescriptor getConnectorDescriptor() { public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("mongodb", "Debezium MongoDB Connector", MongoDbConnector.class.getName(), Module.version()); return new ConnectorDescriptor(MongoDbConnector.class.getName(), Module.version());
} }
@Override @Override

View File

@ -16,7 +16,7 @@ public class MySqlConnectorMetadata implements ConnectorMetadata {
@Override @Override
public ConnectorDescriptor getConnectorDescriptor() { public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("mysql", "Debezium MySQL Connector", MySqlConnector.class.getName(), Module.version()); return new ConnectorDescriptor(MySqlConnector.class.getName(), Module.version());
} }
@Override @Override

View File

@ -16,7 +16,7 @@ public class OracleConnectorMetadata implements ConnectorMetadata {
@Override @Override
public ConnectorDescriptor getConnectorDescriptor() { public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("oracle", "Debezium Oracle Connector", OracleConnector.class.getName(), Module.version()); return new ConnectorDescriptor(OracleConnector.class.getName(), Module.version());
} }
@Override @Override

View File

@ -16,7 +16,7 @@ public class PostgresConnectorMetadata implements ConnectorMetadata {
@Override @Override
public ConnectorDescriptor getConnectorDescriptor() { public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("postgres", "Debezium PostgreSQL Connector", PostgresConnector.class.getName(), Module.version()); return new ConnectorDescriptor(PostgresConnector.class.getName(), Module.version());
} }
@Override @Override

View File

@ -16,7 +16,7 @@ public class SqlServerConnectorMetadata implements ConnectorMetadata {
@Override @Override
public ConnectorDescriptor getConnectorDescriptor() { public ConnectorDescriptor getConnectorDescriptor() {
return new ConnectorDescriptor("sqlserver", "Debezium SQLServer Connector", SqlServerConnector.class.getName(), Module.version()); return new ConnectorDescriptor(SqlServerConnector.class.getName(), Module.version());
} }
@Override @Override

View File

@ -5,6 +5,8 @@
*/ */
package io.debezium.metadata; package io.debezium.metadata;
import java.util.Objects;
public class ConnectorDescriptor { public class ConnectorDescriptor {
private final String id; private final String id;
@ -12,7 +14,7 @@ public class ConnectorDescriptor {
private final String className; private final String className;
private final String version; private final String version;
public ConnectorDescriptor(String id, String displayName, String className, String version) { private ConnectorDescriptor(String id, String displayName, String className, String version) {
this.id = id; this.id = id;
this.displayName = displayName; this.displayName = displayName;
this.className = className; this.className = className;
@ -20,10 +22,7 @@ public ConnectorDescriptor(String id, String displayName, String className, Stri
} }
public ConnectorDescriptor(String className, String version) { public ConnectorDescriptor(String className, String version) {
this.id = getIdForConnectorClass(className); this(getIdForConnectorClass(className), getDisplayNameForConnectorClass(className), className, version);
this.displayName = getDisplayNameForConnectorClass(className);
this.className = className;
this.version = version;
} }
public String getId() { public String getId() {
@ -75,4 +74,20 @@ public static String getDisplayNameForConnectorClass(String className) {
throw new RuntimeException("Unsupported connector type with className: \"" + className + "\""); throw new RuntimeException("Unsupported connector type with className: \"" + className + "\"");
} }
} }
@Override
public boolean equals(Object that) {
if (this == that) {
return true;
}
if (that == null || getClass() != that.getClass()) {
return false;
}
return this.getClassName().equals(((ConnectorDescriptor) that).getClassName())
&& this.getVersion().equals(((ConnectorDescriptor) that).getVersion());
}
public int hashCode() {
return Objects.hash(this.className, this.version);
}
} }