diff --git a/debezium-connect-rest-extension/pom.xml b/debezium-connect-rest-extension/pom.xml index 498d01427..9a85be190 100644 --- a/debezium-connect-rest-extension/pom.xml +++ b/debezium-connect-rest-extension/pom.xml @@ -21,6 +21,10 @@ + + io.debezium + debezium-core + org.apache.kafka connect-runtime @@ -119,7 +123,7 @@ assembly - false + true @@ -153,6 +157,29 @@ + + org.codehaus.mojo + exec-maven-plugin + + tar + ${project.build.directory} + + + + extract-assembly + pre-integration-test + + exec + + + + -xf + ${project.artifactId}-${project.version}.tar.gz + + + + + diff --git a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumResource.java b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumResource.java index 70882e77d..d69c774d8 100644 --- a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumResource.java +++ b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumResource.java @@ -7,11 +7,14 @@ import java.lang.Runtime.Version; import java.lang.reflect.Field; -import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -25,16 +28,13 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.transforms.Transformation; -import org.apache.kafka.connect.transforms.predicates.HasHeaderKey; -import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone; -import org.apache.kafka.connect.transforms.predicates.TopicNameMatches; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.kafka.connect.transforms.predicates.Predicate; -import io.debezium.kcrestextension.entities.TransformsInfo; +import io.debezium.kcrestextension.entities.PredicateDefinition; +import io.debezium.kcrestextension.entities.TransformDefinition; +import io.debezium.metadata.ConnectorDescriptor; /** * A JAX-RS Resource class defining endpoints that enable some advanced features @@ -43,26 +43,30 @@ * + return if topic auto-creation is available and enabled * */ -@Path("/debezium") +@Path(DebeziumResource.BASE_PATH) @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) public class DebeziumResource { - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResource.class); + public static final String BASE_PATH = "/debezium"; + public static final String CONNECTOR_PLUGINS_ENDPOINT = "/connector-plugins"; + public static final String TRANSFORMS_ENDPOINT = "/transforms"; + public static final String PREDICATES_ENDPOINT = "/predicates"; + public static final String TOPIC_CREATION_ENDPOINT = "/topic-creation-enabled"; - // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full - // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but - // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, - // but currently a worker simply leaving the group can take this long as well. - public static final Duration REQUEST_TIMEOUT_MS = Duration.ofSeconds(90); - // Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS - // to run - private static Duration requestTimeoutMs = REQUEST_TIMEOUT_MS; + public static final Set SUPPORTED_CONNECTORS = new HashSet<>(Arrays.asList( + "io.debezium.connector.mongodb.MongoDbConnector", + "io.debezium.connector.mysql.MySqlConnector", + "io.debezium.connector.oracle.OracleConnector", + "io.debezium.connector.postgresql.PostgresConnector", + "io.debezium.connector.sqlserver.SqlServerConnector")); - private final List transforms; + private final ConnectClusterState connectClusterState; + private Herder herder = null; private final Boolean isTopicCreationEnabled; - private final Herder herder; - private final Map config; + private List transforms = null; + private List predicates = null; + private List availableConnectorPlugins = null; private static final Pattern VERSION_PATTERN = Pattern .compile("([1-9][0-9]*(?:(?:\\.0)*\\.[1-9][0-9]*)*)(?:-([a-zA-Z0-9]+))?(?:(\\+)(0|[1-9][0-9]*)?)?(?:-([-a-zA-Z0-9.]+))?"); @@ -71,29 +75,9 @@ public class DebeziumResource { @javax.ws.rs.core.Context private ServletContext context; - public DebeziumResource(ConnectClusterState clusterState, Map config) { - Field herderField; - try { - herderField = ConnectClusterStateImpl.class.getDeclaredField("herder"); - } - catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - herderField.setAccessible(true); - try { - this.herder = (Herder) herderField.get(clusterState); - } - catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - this.transforms = new ArrayList<>(); - this.config = config; - this.isTopicCreationEnabled = isTopicCreationEnabled(); - } - - // For testing purposes only - public static void setRequestTimeout(long requestTimeoutMs) { - DebeziumResource.requestTimeoutMs = Duration.ofMillis(requestTimeoutMs); + public DebeziumResource(ConnectClusterState connectClusterState, Map config) { + this.connectClusterState = connectClusterState; + this.isTopicCreationEnabled = isTopicCreationEnabled(config); } public static Version parseVersion(String version) { @@ -107,40 +91,50 @@ else if (m.lookingAt()) { throw new IllegalArgumentException("Invalid version string: \"" + version + "\""); } - public static void resetRequestTimeout() { - DebeziumResource.requestTimeoutMs = REQUEST_TIMEOUT_MS; + private static void addConnectorPlugins(List connectorPlugins, Collection> plugins) { + plugins.stream() + .filter(p -> SUPPORTED_CONNECTORS.contains(p.pluginClass().getName())) + .forEach(p -> connectorPlugins.add(new ConnectorDescriptor(p.pluginClass().getName(), p.version()))); } - @GET - @Path("/transforms") - public List listTransforms() { - return this.getTransforms(); + private synchronized void initConnectorPlugins() { + if (null == this.availableConnectorPlugins || this.availableConnectorPlugins.isEmpty()) { + // TODO: improve once plugins are allowed to be added/removed during runtime by Kafka Connect, @see org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource + final List connectorPlugins = new ArrayList<>(); + Herder herder = getHerder(); + addConnectorPlugins(connectorPlugins, herder.plugins().sinkConnectors()); + addConnectorPlugins(connectorPlugins, herder.plugins().sourceConnectors()); + this.availableConnectorPlugins = Collections.unmodifiableList(connectorPlugins); + } } - private synchronized List getTransforms() { - if (this.transforms.isEmpty()) { - for (PluginDesc> plugin : herder.plugins().transformations()) { - if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) { - this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config()))); - this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config()))); - this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config()))); + private synchronized void initTransformsAndPredicates() { + if (null == this.transforms || this.transforms.isEmpty()) { + final List transformPlugins = new ArrayList<>(); + final List predicatePlugins = new ArrayList<>(); + Herder herder = getHerder(); + for (PluginDesc> transformPlugin : herder.plugins().transformations()) { + if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(transformPlugin.className())) { + for (PluginDesc> predicate : herder.plugins().predicates()) { + PredicateDefinition predicateDefinition = PredicateDefinition.fromPluginDesc(predicate); + if (null != predicateDefinition) { + predicatePlugins.add(predicateDefinition); + } + } } else { - this.transforms.add(new TransformsInfo(plugin)); + TransformDefinition transformDefinition = TransformDefinition.fromPluginDesc(transformPlugin); + if (null != transformDefinition) { + transformPlugins.add(transformDefinition); + } } } + this.predicates = Collections.unmodifiableList(predicatePlugins); + this.transforms = Collections.unmodifiableList(transformPlugins); } - - return Collections.unmodifiableList(this.transforms); } - @GET - @Path("/topic-creation") - public boolean getTopicCreationEnabled() { - return this.isTopicCreationEnabled; - } - - private synchronized Boolean isTopicCreationEnabled() { + private synchronized Boolean isTopicCreationEnabled(Map config) { Version kafkaConnectVersion = parseVersion(AppInfoParser.getVersion()); String topicCreationProperty = (String) config.get("topic.creation.enable"); if (null == topicCreationProperty) { // when config is not set, default to true @@ -150,8 +144,60 @@ private synchronized Boolean isTopicCreationEnabled() { && Boolean.parseBoolean(topicCreationProperty); } + private synchronized Herder getHerder() { + if (null == this.herder) { + Field herderField; + try { + herderField = this.connectClusterState.getClass().getDeclaredField("herder"); + } + catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + herderField.setAccessible(true); + try { + this.herder = (Herder) herderField.get(this.connectClusterState); + } + catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return this.herder; + } + + @GET + @Path(CONNECTOR_PLUGINS_ENDPOINT) + @Produces(MediaType.APPLICATION_JSON) + public List availableDebeziumConnectors() { + initConnectorPlugins(); + return this.availableConnectorPlugins; + } + + @GET + @Path(TRANSFORMS_ENDPOINT) + @Produces(MediaType.APPLICATION_JSON) + public List listTransforms() { + initTransformsAndPredicates(); + return this.transforms; + } + + @GET + @Path(PREDICATES_ENDPOINT) + @Produces(MediaType.APPLICATION_JSON) + public List listPredicates() { + initTransformsAndPredicates(); + return this.predicates; + } + + @GET + @Path(TOPIC_CREATION_ENDPOINT) + @Produces(MediaType.APPLICATION_JSON) + public boolean getTopicCreationEnabled() { + return this.isTopicCreationEnabled; + } + @GET @Path("/version") + @Produces(MediaType.APPLICATION_JSON) public String getDebeziumVersion() { return Module.version(); } diff --git a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformsInfo.java b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PluginDefinition.java similarity index 52% rename from debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformsInfo.java rename to debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PluginDefinition.java index b5fe3af97..a3c44ccec 100644 --- a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformsInfo.java +++ b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PluginDefinition.java @@ -10,44 +10,22 @@ import java.util.Objects; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.runtime.isolation.PluginDesc; -import org.apache.kafka.connect.transforms.Transformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; /** - * JSON model that describes a Single Message Transform (SMT) entry. + * Base class for JSON models that describes a Single Message Transform (SMT) entry or a Kafka Connect Predicate entry. */ -public class TransformsInfo { +abstract class PluginDefinition { - private static final Logger LOGGER = LoggerFactory.getLogger(TransformsInfo.class); + protected final String className; + protected final Map properties; - private final String className; - private final Map properties; - - @JsonCreator - public TransformsInfo(String className, ConfigDef config) { + PluginDefinition(String className, ConfigDef config) { this.className = className; this.properties = getConfigProperties(className, config); } - @JsonCreator - public TransformsInfo(String className, Class> transformationClass) { - this.className = className; - try { - LOGGER.info("Loading config for TRANSFORM: " + className + "..."); - this.properties = getConfigProperties(transformationClass.getName(), transformationClass.newInstance().config()); - } - catch (InstantiationException | IllegalAccessException e) { - LOGGER.error("Unable to load TRANSFORM: " + className - + "\n\t Reason: " + e.toString()); - throw new RuntimeException(e); - } - } - private static Map getConfigProperties(String className, ConfigDef configDef) { Map configProperties = new HashMap<>(); configDef.configKeys().forEach((fieldName, configKey) -> { @@ -60,15 +38,6 @@ private static Map getConfigProperties(String classN return configProperties; } - public TransformsInfo(PluginDesc> transform) { - this(transform.className(), transform.pluginClass()); - } - - @JsonProperty("transform") - public String className() { - return this.className; - } - @JsonProperty public Map properties() { return this.properties; @@ -79,7 +48,7 @@ public boolean equals(Object o) { return true; } else if (o != null && this.getClass() == o.getClass()) { - TransformsInfo that = (TransformsInfo) o; + PluginDefinition that = (PluginDefinition) o; return Objects.equals(this.className, that.className) && Objects.equals(this.properties, that.properties); } @@ -93,7 +62,7 @@ public int hashCode() { } public String toString() { - return "ConnectorPluginInfo{" + "className='" + this.className + '\'' + + return "PluginDefinition{" + "className='" + this.className + '\'' + ", documentation='" + this.properties + '\'' + '}'; } diff --git a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PredicateDefinition.java b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PredicateDefinition.java new file mode 100644 index 000000000..afdb96608 --- /dev/null +++ b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/PredicateDefinition.java @@ -0,0 +1,47 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kcrestextension.entities; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.transforms.predicates.Predicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * JSON model that describes a Predicate entry used to conditionally enable/disable/apply Single Message Transforms / SMTs. + */ +public class PredicateDefinition extends PluginDefinition { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredicateDefinition.class); + + @JsonCreator + public PredicateDefinition(String className, ConfigDef config) { + super(className, config); + } + + @JsonCreator + public static PredicateDefinition fromPluginDesc(PluginDesc> predicate) { + String className = predicate.pluginClass().getName(); + LOGGER.info("Loading config for PREDICATE: " + className + "..."); + try { + return new PredicateDefinition(className, predicate.pluginClass().getDeclaredConstructor().newInstance().config()); + } + catch (ReflectiveOperationException e) { + LOGGER.error("Unable to load PREDICATE: " + className + "\n\t Reason: " + e); + return null; + } + } + + @JsonProperty("predicate") + public String className() { + return this.className; + } + +} diff --git a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformDefinition.java b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformDefinition.java new file mode 100644 index 000000000..d9c14442f --- /dev/null +++ b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/entities/TransformDefinition.java @@ -0,0 +1,53 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.kcrestextension.entities; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * JSON model that describes a Single Message Transform (SMT) entry. + */ +public class TransformDefinition extends PluginDefinition { + + private static final Logger LOGGER = LoggerFactory.getLogger(TransformDefinition.class); + + @JsonCreator + public TransformDefinition(String className, ConfigDef config) { + super(className, config); + } + + @JsonCreator + public static TransformDefinition fromPluginDesc(PluginDesc> transformPlugin) { + String className = transformPlugin.pluginClass().getName(); + if (className.endsWith("$Value")) { + return null; + } + if (className.endsWith("$Key")) { + className = className.substring(0, className.length() - 4); + } + LOGGER.info("Loading config for TRANSFORM: " + className + "..."); + try { + return new TransformDefinition(className, transformPlugin.pluginClass().getDeclaredConstructor().newInstance().config()); + } + catch (ReflectiveOperationException e) { + LOGGER.error("Unable to load TRANSFORM: " + className + "\n\t Reason: " + e); + return null; + } + } + + @JsonProperty("transform") + public String className() { + return this.className; + } + +} diff --git a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceIT.java b/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceIT.java index 6ad6789d9..0c1c80e0a 100644 --- a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceIT.java +++ b/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceIT.java @@ -5,6 +5,7 @@ */ package io.debezium.kcrestextension; +import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE; import static io.restassured.RestAssured.given; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; @@ -17,33 +18,70 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure; + /** * Tests topic creation (which is enabled in Kafka version greater than 2.6.0) and transforms endpoints. * Debezium Container with 1.7 image is used for the same. */ public class DebeziumResourceIT { - private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResourceIT.class); - private static final String DEBEZIUM_CONTAINER_IMAGE_VERSION = "1.7"; + + private static final List SUPPORTED_TRANSFORMS = List.of( + "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState", + "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter", + "io.debezium.connector.mysql.transforms.ReadToInsertEvent", + "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", + "io.debezium.transforms.ByLogicalTableRouter", + "io.debezium.transforms.ContentBasedRouter", + "io.debezium.transforms.ExtractChangedRecordState", + "io.debezium.transforms.ExtractNewRecordState", + "io.debezium.transforms.Filter", + "io.debezium.transforms.HeaderToValue", + "io.debezium.transforms.SchemaChangeEventFilter", + "io.debezium.transforms.outbox.EventRouter", + "io.debezium.transforms.partitions.ComputePartition", + "io.debezium.transforms.partitions.PartitionRouting", + "io.debezium.transforms.tracing.ActivateTracingSpan", + "org.apache.kafka.connect.transforms.Cast", + "org.apache.kafka.connect.transforms.DropHeaders", + "org.apache.kafka.connect.transforms.ExtractField", + "org.apache.kafka.connect.transforms.Filter", + "org.apache.kafka.connect.transforms.Flatten", + "org.apache.kafka.connect.transforms.HeaderFrom", + "org.apache.kafka.connect.transforms.HoistField", + "org.apache.kafka.connect.transforms.InsertField", + "org.apache.kafka.connect.transforms.InsertHeader", + "org.apache.kafka.connect.transforms.MaskField", + "org.apache.kafka.connect.transforms.RegexRouter", + "org.apache.kafka.connect.transforms.ReplaceField", + "org.apache.kafka.connect.transforms.SetSchemaMetadata", + "org.apache.kafka.connect.transforms.TimestampConverter", + "org.apache.kafka.connect.transforms.TimestampRouter", + "org.apache.kafka.connect.transforms.ValueToKey"); + + private static final List SUPPORTED_PREDICATES = List.of( + "org.apache.kafka.connect.transforms.predicates.HasHeaderKey", + "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone", + "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); @BeforeEach public void start() { - TestHelper.stopContainers(); - TestHelper.setupDebeziumContainer(DEBEZIUM_CONTAINER_IMAGE_VERSION); + RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName()); } @AfterEach public void stop() { - TestHelper.stopContainers(); + RestExtensionTestInfrastructure.stopContainers(); } @Test public void testTopicCreationEndpoint() { - TestHelper.startContainers(); + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); given() - .port(TestHelper.getDebeziumContainer().getFirstMappedPort()) + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) .when() - .get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT) + .get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT) .then().log().all() .statusCode(200) .body(is("true")); @@ -51,12 +89,12 @@ public void testTopicCreationEndpoint() { @Test public void testTopicCreationEndpointWhenExplicitlyDisabled() { - TestHelper.withEnv("CONNECT_TOPIC_CREATION_ENABLE", "false"); - TestHelper.startContainers(); + RestExtensionTestInfrastructure.getDebeziumContainer().withEnv("CONNECT_TOPIC_CREATION_ENABLE", "false"); + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); given() - .port(TestHelper.getDebeziumContainer().getFirstMappedPort()) + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) .when() - .get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT) + .get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT) .then().log().all() .statusCode(200) .body(is("false")); @@ -64,30 +102,38 @@ public void testTopicCreationEndpointWhenExplicitlyDisabled() { @Test public void testTransformsEndpoint() { - TestHelper.startContainers(); + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); given() - .port(TestHelper.getDebeziumContainer().getFirstMappedPort()) - .when().get(TestHelper.API_PREFIX + TestHelper.TRANSFORMS_ENDPOINT) + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().get(DebeziumResource.BASE_PATH + DebeziumResource.TRANSFORMS_ENDPOINT) .then().log().all() .statusCode(200) - .body("transform.size()", is(33)) - .body("transform", - containsInAnyOrder(List.of("io.debezium.connector.mongodb.transforms.ExtractNewDocumentState", - "io.debezium.connector.mysql.transforms.ReadToInsertEvent", "io.debezium.transforms.ByLogicalTableRouter", - "io.debezium.transforms.ContentBasedRouter", "io.debezium.transforms.ExtractNewRecordState", "io.debezium.transforms.Filter", - "io.debezium.transforms.outbox.EventRouter", "io.debezium.transforms.tracing.ActivateTracingSpan", - "org.apache.kafka.connect.transforms.predicates.HasHeaderKey", "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone", - "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "org.apache.kafka.connect.transforms.Cast$Key", - "org.apache.kafka.connect.transforms.Cast$Value", "org.apache.kafka.connect.transforms.ExtractField$Key", - "org.apache.kafka.connect.transforms.ExtractField$Value", "org.apache.kafka.connect.transforms.Filter", - "org.apache.kafka.connect.transforms.Flatten$Key", "org.apache.kafka.connect.transforms.Flatten$Value", - "org.apache.kafka.connect.transforms.HoistField$Key", "org.apache.kafka.connect.transforms.HoistField$Value", - "org.apache.kafka.connect.transforms.InsertField$Key", - "org.apache.kafka.connect.transforms.InsertField$Value", "org.apache.kafka.connect.transforms.MaskField$Key", - "org.apache.kafka.connect.transforms.MaskField$Value", "org.apache.kafka.connect.transforms.RegexRouter", - "org.apache.kafka.connect.transforms.ReplaceField$Key", "org.apache.kafka.connect.transforms.ReplaceField$Value", - "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key", "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value", - "org.apache.kafka.connect.transforms.TimestampConverter$Key", "org.apache.kafka.connect.transforms.TimestampConverter$Value", - "org.apache.kafka.connect.transforms.TimestampRouter", "org.apache.kafka.connect.transforms.ValueToKey").toArray())); + .body("transform.size()", is(SUPPORTED_TRANSFORMS.size())) + .body("transform", containsInAnyOrder(SUPPORTED_TRANSFORMS.toArray())); } + + @Test + public void testPredicatesEndpoint() { + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().get(DebeziumResource.BASE_PATH + DebeziumResource.PREDICATES_ENDPOINT) + .then().log().all() + .statusCode(200) + .body("predicate.size()", is(SUPPORTED_PREDICATES.size())) + .body("predicate", containsInAnyOrder(SUPPORTED_PREDICATES.toArray())); + } + + @Test + public void testConnectorPluginsEndpoint() { + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); + given() + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) + .when().get(DebeziumResource.BASE_PATH + DebeziumResource.CONNECTOR_PLUGINS_ENDPOINT) + .then().log().all() + .statusCode(200) + .body("size()", is(DebeziumResource.SUPPORTED_CONNECTORS.size())) + .body("className", containsInAnyOrder(DebeziumResource.SUPPORTED_CONNECTORS.toArray())); + } + } diff --git a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceNoTopicCreationIT.java b/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceNoTopicCreationIT.java index c754cc914..acd47385e 100644 --- a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceNoTopicCreationIT.java +++ b/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/DebeziumResourceNoTopicCreationIT.java @@ -5,6 +5,7 @@ */ package io.debezium.kcrestextension; +import static io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure.DATABASE; import static io.restassured.RestAssured.given; import static org.hamcrest.Matchers.is; @@ -14,6 +15,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.testing.testcontainers.testhelper.RestExtensionTestInfrastructure; + /** * Tests topic creation endpoint which is disabled in Kafka version less than 2.6.0. * Debezium Container with 1.2 image is used for the same. @@ -24,22 +27,21 @@ public class DebeziumResourceNoTopicCreationIT { @BeforeEach public void start() { - TestHelper.stopContainers(); - TestHelper.setupDebeziumContainer(DEBEZIUM_CONTAINER_IMAGE_VERSION); - TestHelper.startContainers(); + RestExtensionTestInfrastructure.setupDebeziumContainer(Module.version(), DebeziumConnectRestExtension.class.getName(), DEBEZIUM_CONTAINER_IMAGE_VERSION); + RestExtensionTestInfrastructure.startContainers(DATABASE.NONE); } @AfterEach public void stop() { - TestHelper.stopContainers(); + RestExtensionTestInfrastructure.stopContainers(); } @Test public void testTopicCreationEndpoint() { given() - .port(TestHelper.getDebeziumContainer().getFirstMappedPort()) + .port(RestExtensionTestInfrastructure.getDebeziumContainer().getFirstMappedPort()) .when() - .get(TestHelper.API_PREFIX + TestHelper.TOPIC_CREATION_ENDPOINT) + .get(DebeziumResource.BASE_PATH + DebeziumResource.TOPIC_CREATION_ENDPOINT) .then().log().all() .statusCode(200) .body(is("false")); diff --git a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/TestHelper.java b/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/TestHelper.java deleted file mode 100644 index a60ce8d1d..000000000 --- a/debezium-connect-rest-extension/src/test/java/io/debezium/kcrestextension/TestHelper.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.kcrestextension; - -import java.time.Duration; -import java.util.stream.Stream; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; - -import io.debezium.testing.testcontainers.DebeziumContainer; - -public class TestHelper { - - private static final Logger LOGGER = LoggerFactory.getLogger(TestHelper.class); - - private static final String KAFKA_HOSTNAME = "kafka-dbz-ui"; - private static final String DEBEZIUM_VERSION = Module.version(); - - public static final String API_PREFIX = "/debezium"; - public static final String TRANSFORMS_ENDPOINT = "/transforms"; - public static final String TOPIC_CREATION_ENDPOINT = "/topic-creation"; - - private static final Network NETWORK = Network.newNetwork(); - - private static final GenericContainer KAFKA_CONTAINER = new GenericContainer<>( - DockerImageName.parse("quay.io/debezium/kafka:latest").asCompatibleSubstituteFor("kafka")) - .withNetworkAliases(KAFKA_HOSTNAME) - .withNetwork(NETWORK) - .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@" + KAFKA_HOSTNAME + ":9093") - .withEnv("CLUSTER_ID", "5Yr1SIgYQz-b-dgRabWx4g") - .withEnv("NODE_ID", "1"); - - private static DebeziumContainer DEBEZIUM_CONTAINER; - - public static DebeziumContainer getDebeziumContainer() { - return DEBEZIUM_CONTAINER; - } - - public static void setupDebeziumContainer(String debeziumContainerImageVersion) { - DEBEZIUM_CONTAINER = new DebeziumContainer(DockerImageName.parse("quay.io/debezium/connect:" + debeziumContainerImageVersion)) - .withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true") - .withEnv("CONNECT_REST_EXTENSION_CLASSES", "io.debezium.kcrestextension.DebeziumConnectRestExtension") - .withNetwork(NETWORK) - .withCopyFileToContainer( - MountableFile.forHostPath( - "target/debezium-connect-rest-extension-" + DEBEZIUM_VERSION + ".jar"), - "/kafka/libs/debezium-kcd-rest-extension-" + DEBEZIUM_VERSION + ".jar") - .withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092") - .withLogConsumer(new Slf4jLogConsumer(LOGGER)) - .withStartupTimeout(Duration.ofSeconds(90)) - .dependsOn(KAFKA_CONTAINER); - } - - public static void withEnv(String key, String value) { - DEBEZIUM_CONTAINER = DEBEZIUM_CONTAINER.withEnv(key, value); - } - - public static void startContainers() { - Startables.deepStart(Stream.of(KAFKA_CONTAINER, DEBEZIUM_CONTAINER)).join(); - } - - public static void stopContainers() { - try { - if (DEBEZIUM_CONTAINER != null) { - DEBEZIUM_CONTAINER.stop(); - } - if (KAFKA_CONTAINER != null) { - KAFKA_CONTAINER.stop(); - } - } - catch (Exception ignored) { - } - } -} diff --git a/debezium-connect-rest-extension/src/test/resources/Dockerfile.rest.test b/debezium-connect-rest-extension/src/test/resources/Dockerfile.rest.test new file mode 100644 index 000000000..7c991524a --- /dev/null +++ b/debezium-connect-rest-extension/src/test/resources/Dockerfile.rest.test @@ -0,0 +1,36 @@ +ARG BASE_IMAGE +ARG DEBEZIUM_VERSION +FROM ${BASE_IMAGE} +ARG DEBEZIUM_VERSION + +RUN echo "Installing Debezium connectors version: ${DEBEZIUM_VERSION}" ; \ +MAVEN_REPOSITORY="https://repo1.maven.org/maven2/io/debezium" ; \ +if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \ + MAVEN_REPOSITORY="https://oss.sonatype.org/content/repositories/snapshots/io/debezium" ; \ +fi ; \ +for CONNECTOR in {mysql,mongodb,postgres,sqlserver,oracle,db2}; do \ + local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \ + if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \ + CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-connector-${CONNECTOR}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/tar.gz<\/extension>/ {getline; print $2; exit}'); \ + fi ; \ + echo "Downloading and installing debezium-connector-${CONNECTOR}-${CONNECTOR_VERSION}-plugin.tar.gz ..." ; \ + curl --silent -fSL -o /tmp/plugin.tar.gz "${MAVEN_REPOSITORY}/debezium-connector-${CONNECTOR}/${DEBEZIUM_VERSION}/debezium-connector-${CONNECTOR}-${CONNECTOR_VERSION}-plugin.tar.gz" && \ + echo "Extracting debezium-connector-${CONNECTOR}-${CONNECTOR_VERSION}-plugin.tar.gz ..." && \ + tar -xvzf /tmp/plugin.tar.gz -C ${KAFKA_CONNECT_PLUGINS_DIR}/ && \ + echo "Successfully installed debezium-connector-${CONNECTOR}-${CONNECTOR_VERSION}!" ; \ + rm -f /tmp/plugin.tar.gz; \ +done ; \ +for PACKAGE in {scripting,}; do \ + local CONNECTOR_VERSION="${DEBEZIUM_VERSION}" ; \ + if [[ "${DEBEZIUM_VERSION}" == *-SNAPSHOT ]] ; then \ + CONNECTOR_VERSION=$(curl --silent -fSL "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/maven-metadata.xml" | awk -F'<[^>]+>' '/tar.gz<\/extension>/ {getline; print $2; exit}'); \ + fi ; \ + echo "Downloading and installing debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." ; \ + curl --silent -fSL -o /tmp/package.tar.gz "${MAVEN_REPOSITORY}/debezium-${PACKAGE}/${DEBEZIUM_VERSION}/debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz" && \ + echo "Extracting debezium-${PACKAGE}-${CONNECTOR_VERSION}.tar.gz ..." && \ + tar -xzf /tmp/package.tar.gz -C $EXTERNAL_LIBS_DIR && \ + echo "Successfully installed debezium-${PACKAGE}-${CONNECTOR_VERSION}!" ; \ + rm -f /tmp/package.tar.gz ; \ +done + +COPY --chown=kafka:kafka debezium-connect-rest-extension /kafka/connect/debezium-connect-rest-extension diff --git a/debezium-connect-rest-extension/src/test/resources/logback-test.xml b/debezium-connect-rest-extension/src/test/resources/logback-test.xml index 1f91c2ed2..9aa091ab7 100644 --- a/debezium-connect-rest-extension/src/test/resources/logback-test.xml +++ b/debezium-connect-rest-extension/src/test/resources/logback-test.xml @@ -21,4 +21,8 @@ level="warn" additivity="false"> + + + + diff --git a/debezium-core/src/main/java/io/debezium/config/ValidationResults.java b/debezium-core/src/main/java/io/debezium/config/ValidationResults.java new file mode 100644 index 000000000..a8d5beba6 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/config/ValidationResults.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.config; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.Config; +import org.apache.kafka.connect.connector.Connector; + +public class ValidationResults { + + public final List validationResults; + public Status status; + + public ValidationResults(Connector connector, Map properties) { + this.validationResults = convertConfigToValidationResults(connector.validate(convertPropertiesToStrings(properties))); + if (validationResults.isEmpty()) { + this.status = Status.VALID; + } + else { + this.status = Status.INVALID; + } + } + + private static Map convertPropertiesToStrings(Map properties) { + return properties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue()))); + } + + private List convertConfigToValidationResults(Config result) { + return result.configValues() + .stream() + .filter(cv -> !cv.errorMessages().isEmpty()) + .filter(cv -> !cv.errorMessages().get(0).equals(cv.name() + " is referred in the dependents, but not defined.")) + .map(cv -> new ValidationResult(cv.name(), cv.errorMessages().get(0))) + .collect(Collectors.toList()); + } + + public static class ValidationResult { + public String property; + public String message; + + public ValidationResult(String property, String message) { + this.property = property; + this.message = message; + } + } + + public enum Status { + VALID, + INVALID; + } + +} diff --git a/debezium-core/src/main/java/io/debezium/metadata/ConnectorDescriptor.java b/debezium-core/src/main/java/io/debezium/metadata/ConnectorDescriptor.java index ee67830af..dd43d044f 100644 --- a/debezium-core/src/main/java/io/debezium/metadata/ConnectorDescriptor.java +++ b/debezium-core/src/main/java/io/debezium/metadata/ConnectorDescriptor.java @@ -6,14 +6,22 @@ package io.debezium.metadata; public class ConnectorDescriptor { + private final String id; - private final String name; + private final String displayName; private final String className; private final String version; - public ConnectorDescriptor(String id, String name, String className, String version) { + public ConnectorDescriptor(String id, String displayName, String className, String version) { this.id = id; - this.name = name; + this.displayName = displayName; + this.className = className; + this.version = version; + } + + public ConnectorDescriptor(String className, String version) { + this.id = getIdForConnectorClass(className); + this.displayName = getDisplayNameForConnectorClass(className); this.className = className; this.version = version; } @@ -22,8 +30,8 @@ public String getId() { return id; } - public String getName() { - return name; + public String getDisplayName() { + return displayName; } public String getClassName() { @@ -33,4 +41,38 @@ public String getClassName() { public String getVersion() { return version; } + + public static String getIdForConnectorClass(String className) { + switch (className) { + case "io.debezium.connector.mongodb.MongoDbConnector": + return "mongodb"; + case "io.debezium.connector.mysql.MySqlConnector": + return "mysql"; + case "io.debezium.connector.oracle.OracleConnector": + return "oracle"; + case "io.debezium.connector.postgresql.PostgresConnector": + return "postgres"; + case "io.debezium.connector.sqlserver.SqlServerConnector": + return "sqlserver"; + default: + throw new RuntimeException("Unsupported connector type with className: \"" + className + "\""); + } + } + + public static String getDisplayNameForConnectorClass(String className) { + switch (className) { + case "io.debezium.connector.mongodb.MongoDbConnector": + return "Debezium MongoDB Connector"; + case "io.debezium.connector.mysql.MySqlConnector": + return "Debezium MySQL Connector"; + case "io.debezium.connector.oracle.OracleConnector": + return "Debezium Oracle Connector"; + case "io.debezium.connector.postgresql.PostgresConnector": + return "Debezium PostgreSQL Connector"; + case "io.debezium.connector.sqlserver.SqlServerConnector": + return "Debezium SQLServer Connector"; + default: + throw new RuntimeException("Unsupported connector type with className: \"" + className + "\""); + } + } } diff --git a/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java new file mode 100644 index 000000000..f6ea2f9e2 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/ConnectionValidationResource.java @@ -0,0 +1,34 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest; + +import java.util.Map; + +import javax.ws.rs.PUT; +import javax.ws.rs.Path; + +import org.apache.kafka.connect.connector.Connector; + +import io.debezium.config.ValidationResults; + +public interface ConnectionValidationResource { + + Connector getConnector(); + + String VALIDATE_CONNECTION_ENDPOINT = "/validate/connection"; + + @PUT + @Path(VALIDATE_CONNECTION_ENDPOINT) + default ValidationResults validateConnectionProperties(Map properties) { + // switch classloader to the connector specific classloader in order to load dependencies required to validate the connector config + ValidationResults validationResults; + ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(getConnector().getClass().getClassLoader()); + validationResults = new ValidationResults(getConnector(), properties); + Thread.currentThread().setContextClassLoader(originalClassLoader); + return validationResults; + } +} diff --git a/debezium-core/src/main/java/io/debezium/rest/SchemaResource.java b/debezium-core/src/main/java/io/debezium/rest/SchemaResource.java new file mode 100644 index 000000000..7371f89b6 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/rest/SchemaResource.java @@ -0,0 +1,38 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.rest; + +import java.io.File; +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.DebeziumException; + +public interface SchemaResource { + + String getSchemaFilePath(); + + String SCHEMA_ENDPOINT = "/schema"; + + ObjectMapper MAPPER = new ObjectMapper(); + + @GET + @Path(SCHEMA_ENDPOINT) + default JsonNode getConnectorSchema() { + + try { + return MAPPER.readValue(getClass().getResourceAsStream(getSchemaFilePath()), JsonNode.class); + } + catch (IOException e) { + throw new DebeziumException("Unable to open \"" + getSchemaFilePath().substring(getSchemaFilePath().lastIndexOf(File.separator) + 1) + "\" schema file.", e); + } + } +} diff --git a/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/JsonSchemaCreatorService.java b/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/JsonSchemaCreatorService.java index 9b87be169..077bc01b1 100644 --- a/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/JsonSchemaCreatorService.java +++ b/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/JsonSchemaCreatorService.java @@ -111,7 +111,7 @@ private static JsonSchemaType toJsonSchemaType(ConfigDef.Type type) { public Schema buildConnectorSchema() { Schema schema = new SchemaImpl(connectorName); String connectorVersion = connectorMetadata.getConnectorDescriptor().getVersion(); - schema.setTitle(connectorMetadata.getConnectorDescriptor().getName()); + schema.setTitle(connectorMetadata.getConnectorDescriptor().getDisplayName()); schema.setType(Schema.SchemaType.OBJECT); schema.addExtension("connector-id", connectorBaseName); schema.addExtension("version", connectorVersion); diff --git a/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/SchemaGenerator.java b/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/SchemaGenerator.java index 82f88eed1..1305debc7 100644 --- a/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/SchemaGenerator.java +++ b/debezium-schema-generator/src/main/java/io/debezium/schemagenerator/SchemaGenerator.java @@ -56,7 +56,7 @@ private void run(String formatName, Path outputDirectory, boolean groupDirectory for (ConnectorMetadata connectorMetadata : allMetadata) { LOGGER.log(Logger.Level.INFO, "Creating \"" + format.getDescriptor().getName() + "\" schema for connector: " - + connectorMetadata.getConnectorDescriptor().getName() + "..."); + + connectorMetadata.getConnectorDescriptor().getDisplayName() + "..."); JsonSchemaCreatorService jsonSchemaCreatorService = new JsonSchemaCreatorService(connectorMetadata, format.getFieldFilter()); org.eclipse.microprofile.openapi.models.media.Schema buildConnectorSchema = jsonSchemaCreatorService.buildConnectorSchema(); String spec = format.getSpec(buildConnectorSchema); diff --git a/debezium-testing/debezium-testing-testcontainers/pom.xml b/debezium-testing/debezium-testing-testcontainers/pom.xml index d8c6eeba7..0640f23b5 100644 --- a/debezium-testing/debezium-testing-testcontainers/pom.xml +++ b/debezium-testing/debezium-testing-testcontainers/pom.xml @@ -49,6 +49,14 @@ org.testcontainers mongodb + + org.testcontainers + mysql + + + org.testcontainers + mssqlserver + io.debezium debezium-core diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java index c8152a201..d4752d655 100644 --- a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/ConnectorConfiguration.java @@ -18,6 +18,14 @@ */ public class ConnectorConfiguration { + public static final String CONNECTOR = "connector.class"; + public static final String HOSTNAME = "database.hostname"; + public static final String CONNECTION_STRING = "mongodb.connection.string"; + public static final String PORT = "database.port"; + public static final String USER = "database.user"; + public static final String PASSWORD = "database.password"; + public static final String DBNAME = "database.dbname"; + private final ObjectNode configNode; protected ConnectorConfiguration() { @@ -36,14 +44,6 @@ static ConnectorConfiguration from(JsonNode configNode) { return configuration; } - private static final String CONNECTOR = "connector.class"; - private static final String HOSTNAME = "database.hostname"; - private static final String CONNECTION_STRING = "mongodb.connection.string"; - private static final String PORT = "database.port"; - private static final String USER = "database.user"; - private static final String PASSWORD = "database.password"; - private static final String DBNAME = "database.dbname"; - public static ConnectorConfiguration forJdbcContainer(JdbcDatabaseContainer jdbcDatabaseContainer) { ConnectorConfiguration configuration = new ConnectorConfiguration(); diff --git a/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java new file mode 100644 index 000000000..23cae62c3 --- /dev/null +++ b/debezium-testing/debezium-testing-testcontainers/src/main/java/io/debezium/testing/testcontainers/testhelper/RestExtensionTestInfrastructure.java @@ -0,0 +1,203 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.testing.testcontainers.testhelper; + +import java.nio.file.Paths; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import org.awaitility.Awaitility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; +import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.lifecycle.Startable; +import org.testcontainers.utility.DockerImageName; + +import io.debezium.testing.testcontainers.Connector; +import io.debezium.testing.testcontainers.DebeziumContainer; +import io.debezium.testing.testcontainers.MongoDbReplicaSet; +import io.debezium.testing.testcontainers.util.MoreStartables; + +public class RestExtensionTestInfrastructure { + + private static final String DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST = "latest"; + public static final String KAFKA_HOSTNAME = "kafka-dbz-ui"; + public static final int CI_CONTAINER_STARTUP_TIME = 90; + + public enum DATABASE { + POSTGRES, + MYSQL, + SQLSERVER, + MONGODB, + ORACLE, + NONE + } + + private static final Logger LOGGER = LoggerFactory.getLogger(RestExtensionTestInfrastructure.class); + + private static final Network NETWORK = Network.newNetwork(); + + private static final GenericContainer KAFKA_CONTAINER = new GenericContainer<>( + DockerImageName.parse("quay.io/debezium/kafka:" + DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST).asCompatibleSubstituteFor("kafka")) + .withNetworkAliases(KAFKA_HOSTNAME) + .withNetwork(NETWORK) + .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@" + KAFKA_HOSTNAME + ":9093") + .withEnv("CLUSTER_ID", "5Yr1SIgYQz-b-dgRabWx4g") + .withEnv("NODE_ID", "1"); + + private static DebeziumContainer DEBEZIUM_CONTAINER = null; + private static final PostgreSQLContainer POSTGRES_CONTAINER = new PostgreSQLContainer<>( + DockerImageName.parse("quay.io/debezium/example-postgres:" + DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST).asCompatibleSubstituteFor("postgres")) + .withNetwork(NETWORK) + .withNetworkAliases("postgres"); + + private static final MySQLContainer MYSQL_CONTAINER = new MySQLContainer<>( + DockerImageName.parse("quay.io/debezium/example-mysql:" + DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST).asCompatibleSubstituteFor("mysql")) + .withNetwork(NETWORK) + .withUsername("mysqluser") + .withPassword("mysqlpw") + .withEnv("MYSQL_ROOT_PASSWORD", "debezium") + .withNetworkAliases("mysql"); + + private static final MongoDbReplicaSet MONGODB_REPLICA = MongoDbReplicaSet.replicaSet() + .name("rs0") + .memberCount(1) + .network(NETWORK) + .imageName(DockerImageName.parse("mongo:5.0")) + .build(); + + private static final MSSQLServerContainer SQL_SERVER_CONTAINER = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2019-latest")) + .withNetwork(NETWORK) + .withNetworkAliases("sqlserver") + .withEnv("SA_PASSWORD", "Password!") + .withEnv("MSSQL_PID", "Standard") + .withEnv("MSSQL_AGENT_ENABLED", "true") + .withPassword("Password!") + .withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(5))) + .withInitScript("initialize-sqlserver-database.sql") + .acceptLicense(); + + private static Supplier> getContainers(DATABASE database) { + final Startable dbStartable; + switch (database) { + case POSTGRES: + dbStartable = POSTGRES_CONTAINER; + break; + case MYSQL: + dbStartable = MYSQL_CONTAINER; + break; + case MONGODB: + dbStartable = MONGODB_REPLICA; + break; + case SQLSERVER: + dbStartable = SQL_SERVER_CONTAINER; + break; + case NONE: + default: + dbStartable = null; + break; + } + + if (null != dbStartable) { + return () -> Stream.of(KAFKA_CONTAINER, dbStartable, DEBEZIUM_CONTAINER); + } + else { + return () -> Stream.of(KAFKA_CONTAINER, DEBEZIUM_CONTAINER); + } + } + + public static void stopContainers() { + Stream containers = Stream.of(DEBEZIUM_CONTAINER, SQL_SERVER_CONTAINER, MONGODB_REPLICA, MYSQL_CONTAINER, POSTGRES_CONTAINER, KAFKA_CONTAINER); + MoreStartables.deepStopSync(containers); + DEBEZIUM_CONTAINER = null; + } + + public static void startContainers(DATABASE database) { + final Supplier> containers = getContainers(database); + + if ("true".equals(System.getenv("CI"))) { + containers.get().forEach(container -> { + if (container instanceof GenericContainer) { + ((GenericContainer) container).withStartupTimeout(Duration.ofSeconds(CI_CONTAINER_STARTUP_TIME)); + } + if (container instanceof MongoDbReplicaSet) { + // This could be added to MongoDbReplicaSet + ((MongoDbReplicaSet) container).getMembers().forEach(member -> { + member.withStartupTimeout(Duration.ofSeconds(CI_CONTAINER_STARTUP_TIME)); + }); + } + }); + } + MoreStartables.deepStartSync(containers.get()); + } + + public static void setupDebeziumContainer(String connectorVersion, String restExtensionClassses) { + setupDebeziumContainer(connectorVersion, restExtensionClassses, DEBEZIUM_CONTAINER_IMAGE_VERSION_LATEST); + } + + public static void setupDebeziumContainer(String connectorVersion, String restExtensionClasses, String debeziumContainerImageVersion) { + if (null != DEBEZIUM_CONTAINER && DEBEZIUM_CONTAINER.isRunning()) { + DEBEZIUM_CONTAINER.stop(); + } + final String registry = debeziumContainerImageVersion.startsWith("1.2") ? "" : "quay.io/"; + final String debeziumVersion = debeziumContainerImageVersion.startsWith("1.2") ? "1.2.5.Final" : connectorVersion; + String baseImageName = registry + "debezium/connect-base:" + debeziumContainerImageVersion; + DEBEZIUM_CONTAINER = new DebeziumContainer(new ImageFromDockerfile("debezium/connect-rest-test:" + debeziumVersion) + .withFileFromPath(".", Paths.get("target/")) + .withFileFromPath("Dockerfile", Paths.get("src/test/resources/Dockerfile.rest.test")) + .withBuildArg("BASE_IMAGE", baseImageName) + .withBuildArg("DEBEZIUM_VERSION", debeziumVersion)) + .withEnv("ENABLE_DEBEZIUM_SCRIPTING", "true") + .withEnv("CONNECT_REST_EXTENSION_CLASSES", restExtensionClasses) + .withEnv("ENABLE_JOLOKIA", "true") + .withExposedPorts(8083, 8778) + .withNetwork(NETWORK) + .withKafka(KAFKA_CONTAINER.getNetwork(), KAFKA_HOSTNAME + ":9092") + .withLogConsumer(new Slf4jLogConsumer(LOGGER)) + .dependsOn(KAFKA_CONTAINER); + } + + public static GenericContainer getKafkaContainer() { + return KAFKA_CONTAINER; + } + + public static DebeziumContainer getDebeziumContainer() { + return DEBEZIUM_CONTAINER; + } + + public static PostgreSQLContainer getPostgresContainer() { + return POSTGRES_CONTAINER; + } + + public static MySQLContainer getMySqlContainer() { + return MYSQL_CONTAINER; + } + + public static MongoDbReplicaSet getMongoDbContainer() { + return MONGODB_REPLICA; + } + + public static MSSQLServerContainer getSqlServerContainer() { + return SQL_SERVER_CONTAINER; + } + + public static void waitForConnectorTaskStatus(String connectorName, int taskNumber, Connector.State state) { + Awaitility.await() + // this needs to be set to at least a minimum of ~65-70 seconds because PostgreSQL now + // retries on certain failure conditions with a 10s between them. + .atMost(120, TimeUnit.SECONDS) + .until(() -> RestExtensionTestInfrastructure.getDebeziumContainer().getConnectorTaskState(connectorName, taskNumber) == state); + } +}