DBZ-7416 Fix duplicate SMTs sometimes returned by Kafka Connect. Added deduplication with a Map with a key based on the SMT's fully-qualified class name and version.

closes https://issues.redhat.com/browse/DBZ-7416
This commit is contained in:
rkerner 2024-02-08 15:40:42 +01:00 committed by René Kerner
parent 2c9b77fc1c
commit 5c817c4fa3
2 changed files with 16 additions and 5 deletions

View File

@ -11,6 +11,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -92,20 +93,20 @@ else if (m.lookingAt()) {
throw new IllegalArgumentException("Invalid version string: \"" + version + "\"");
}
private static <T> void addConnectorPlugins(List<ConnectorDescriptor> connectorPlugins, Collection<PluginDesc<T>> plugins) {
private static <T> void addConnectorPlugins(Map<String, ConnectorDescriptor> connectorPlugins, Collection<PluginDesc<T>> plugins) {
plugins.stream()
.filter(p -> SUPPORTED_CONNECTORS.contains(p.pluginClass().getName()))
.forEach(p -> connectorPlugins.add(new ConnectorDescriptor(p.pluginClass().getName(), p.version())));
.forEach(p -> connectorPlugins.put(p.pluginClass().getName() + "#" + p.version(), new ConnectorDescriptor(p.pluginClass().getName(), p.version())));
}
private synchronized void initConnectorPlugins() {
if (null == this.availableConnectorPlugins || this.availableConnectorPlugins.isEmpty()) {
// TODO: improve once plugins are allowed to be added/removed during runtime by Kafka Connect, @see org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource
final List<ConnectorDescriptor> connectorPlugins = new ArrayList<>();
final Map<String, ConnectorDescriptor> connectorPlugins = new HashMap<>();
Herder herder = getHerder();
addConnectorPlugins(connectorPlugins, herder.plugins().sinkConnectors());
addConnectorPlugins(connectorPlugins, herder.plugins().sourceConnectors());
this.availableConnectorPlugins = Collections.unmodifiableList(connectorPlugins);
this.availableConnectorPlugins = Collections.unmodifiableList(new ArrayList<>(connectorPlugins.values()));
}
}

View File

@ -7,10 +7,15 @@
import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -62,6 +67,10 @@ public class DebeziumResourceIT {
"org.apache.kafka.connect.transforms.TimestampRouter",
"org.apache.kafka.connect.transforms.ValueToKey");
private static final List<String> KAFKA_CONNECT_SMTs = SUPPORTED_TRANSFORMS.stream()
.filter(smt -> smt.startsWith("org.apache.kafka.connect.transforms."))
.collect(Collectors.toList());
private static final List<String> SUPPORTED_PREDICATES = List.of(
"org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
"org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
@ -111,7 +120,8 @@ public void testTransformsEndpoint() {
.when().get(DebeziumResource.BASE_PATH + DebeziumResource.TRANSFORMS_ENDPOINT)
.then().log().all()
.statusCode(200)
.body("transform.size()", is(SUPPORTED_TRANSFORMS.size()))
.body("transform.size()", allOf(greaterThanOrEqualTo(KAFKA_CONNECT_SMTs.size()), lessThanOrEqualTo(SUPPORTED_TRANSFORMS.size() + 1)))
.body("transform", containsInRelativeOrder(KAFKA_CONNECT_SMTs.toArray()))
.body("transform", containsInAnyOrder(SUPPORTED_TRANSFORMS.toArray()));
}