DBZ-4028 Add a Debezium Kafka Connect REST Extension

+ fetch available transforms and their configuration documentation
+ fetch if topic auto-creation is available and enabled

closes https://issues.redhat.com/browse/DBZ-4028
This commit is contained in:
rkerner 2021-09-22 18:19:46 +02:00 committed by Gunnar Morling
parent 4a1d89c429
commit 5a8b5e00f1
8 changed files with 567 additions and 0 deletions

View File

@ -0,0 +1,33 @@
Copyright Debezium Authors.
Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0).
# Debezium Kafka Connect Distributed (KCD) REST Extension
Debezium is an open source distributed platform for change data capture (CDC).
This repository contains extensions to Kafka Connect's REST API.
## Setup
1. Install or mount the Debezium Kafka Connect REST Extension jar into a separate
Kafka Connect plugin directory.
For example with `docker-compose`:
```yaml
volumes:
- debezium-kcd-rest-extension-1.0.0.jar:/kafka/connect/dbz-rest-extension/debezium-kcd-rest-extension-1.0.0.jar
```
2. Register the REST extension with Kafka Connect:
```yaml
environment:
- CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcdrestextension.DebeziumConnectRestExtension
```
or set `rest.extension.classes=io.debezium.kcdrestextension.DebeziumConnectRestExtension` in your Kafka Connect properties file.
## Contribution
This project is under active development, any contributions are very welcome.

View File

@ -0,0 +1,119 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<parent>
<groupId>io.debezium</groupId>
<artifactId>debezium-parent</artifactId>
<version>1.7.0-SNAPSHOT</version>
<relativePath>../debezium-parent/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>debezium-connect-rest-extension</artifactId>
<name>Debezium Kafka Connect REST Extension</name>
<properties>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.compiler.parameters>true</maven.compiler.parameters>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.release>11</maven.compiler.release>
<!-- Plug-in versions -->
<kafka.version>2.8.0</kafka.version>
<debezium.base.version>1.6.1.Final</debezium.base.version>
<frontend.maven.plugin.version>1.11.2</frontend.maven.plugin.version>
<maven.clean.plugin.version>3.1.0</maven.clean.plugin.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven.jar.plugin.version>3.2.0</maven.jar.plugin.version>
<maven.resources.plugin.version>3.1.0</maven.resources.plugin.version>
<maven.surefire.plugin.version>3.0.0-M5</maven.surefire.plugin.version>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<version.checkstyle>8.32</version.checkstyle>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<scope>provided</scope>
<version>${kafka.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin.version}</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.plugin.version}</version>
<configuration>
<systemPropertyVariables>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<id>default-test</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<systemProperties>
<maven.home>${maven.home}</maven.home>
</systemProperties>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${maven.surefire.plugin.version}</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
<configuration>
<skipTests>${skipITs}</skipTests>
<enableAssertions>true</enableAssertions>
<systemPropertyVariables>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<resources>
<!-- Apply the properties set in the POM to the resource files -->
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/META-INF/services/*</include>
</includes>
</resource>
</resources>
</build>
<profiles>
<profile>
<id>skip-integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>skipITs</name>
</property>
</activation>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcdrestextension;
import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
/**
* A Kafka Connect REST extension that enables some advanced features over
* Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
* To install this extension put the jar file into a separate Kafka Connect
* plugin dir and configure your Kafka Connect properties file with:
*
* `rest.extension.classes=io.debezium.kcdrestextension.DebeziumConnectRestExtension`
*
*/
public class DebeziumConnectRestExtension implements ConnectRestExtension {
private Map<String, ?> config;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
restPluginContext.configurable().register(new DebeziumResource(restPluginContext.clusterState(), config));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.config = configs;
}
@Override
public String version() {
return AppInfoParser.getVersion();
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcdrestextension;
import java.lang.Runtime.Version;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.kcdrestextension.entities.TransformsInfo;
/**
* A JAX-RS Resource class defining endpoints that enable some advanced features
* over Kafka Connect's REST interface:
* + report available transformations and their configuration
* + return if topic auto-creation is available and enabled
*
*/
@Path("/debezium")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class DebeziumResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumResource.class);
// TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
// session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
// we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
// but currently a worker simply leaving the group can take this long as well.
public static final Duration REQUEST_TIMEOUT_MS = Duration.ofSeconds(90);
// Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS
// to run
private static Duration requestTimeoutMs = REQUEST_TIMEOUT_MS;
private final List<TransformsInfo> transforms;
private final Boolean isTopicCreationEnabled;
private final Herder herder;
private final Map<String, ?> config;
private static final Version TOPIC_CREATION_KAFKA_VERSION = Version.parse("2.6.0");
@javax.ws.rs.core.Context
private ServletContext context;
public DebeziumResource(ConnectClusterState clusterState, Map<String, ?> config) {
Field herderField;
try {
herderField = ConnectClusterStateImpl.class.getDeclaredField("herder");
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
herderField.setAccessible(true);
try {
this.herder = (Herder) herderField.get(clusterState);
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
this.transforms = new ArrayList<>();
this.config = config;
this.isTopicCreationEnabled = isTopicCreationEnabled();
}
// For testing purposes only
public static void setRequestTimeout(long requestTimeoutMs) {
DebeziumResource.requestTimeoutMs = Duration.ofMillis(requestTimeoutMs);
}
public static void resetRequestTimeout() {
DebeziumResource.requestTimeoutMs = REQUEST_TIMEOUT_MS;
}
@GET
@Path("/transforms")
public List<TransformsInfo> listTransforms() {
return this.getTransforms();
}
private synchronized List<TransformsInfo> getTransforms() {
if (this.transforms.isEmpty()) {
for (PluginDesc<Transformation> plugin : herder.plugins().transformations()) {
if ("org.apache.kafka.connect.runtime.PredicatedTransformation".equals(plugin.className())) {
this.transforms.add(new TransformsInfo(HasHeaderKey.class.getName(), (new HasHeaderKey<>().config())));
this.transforms.add(new TransformsInfo(RecordIsTombstone.class.getName(), (new RecordIsTombstone<>().config())));
this.transforms.add(new TransformsInfo(TopicNameMatches.class.getName(), (new TopicNameMatches<>().config())));
}
else {
this.transforms.add(new TransformsInfo(plugin));
}
}
}
return Collections.unmodifiableList(this.transforms);
}
@GET
@Path("/topic-creation")
public boolean getTopicCreationEnabled() {
return this.isTopicCreationEnabled;
}
private synchronized Boolean isTopicCreationEnabled() {
Version kafkaConnectVersion = Version.parse(AppInfoParser.getVersion());
String topicCreationProperty = (String) config.get("topic.creation.enable");
if (null == topicCreationProperty) { // when config is not set, default to true
topicCreationProperty = "true";
}
return TOPIC_CREATION_KAFKA_VERSION.compareTo(kafkaConnectVersion) <= 0
&& Boolean.parseBoolean(topicCreationProperty);
}
}

View File

@ -0,0 +1,126 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcdrestextension.entities;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* JSON model that describes a property of a Single Message Transform (SMT).
*/
public class PropertyDescriptor {
@JsonProperty
public String title;
@JsonProperty("x-name")
public String name;
@JsonProperty
public String description;
@JsonProperty
public String type;
@JsonProperty("enum")
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<String> allowedValues;
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String format;
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String defaultValue;
public PropertyDescriptor(String className, ConfigDef.ConfigKey configKey) {
Map<String, List<String>> specialEnums = new HashMap<>();
specialEnums.put(
"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState#array.encoding",
Arrays.asList("array", "document"));
specialEnums.put(
"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState#delete.handling.mode",
Arrays.asList("drop", "rewrite", "none"));
specialEnums.put(
"io.debezium.transforms.ExtractNewRecordState#delete.handling.mode",
Arrays.asList("drop", "rewrite", "none"));
specialEnums.put(
"io.debezium.transforms.outbox.EventRouter#debezium.op.invalid.behavior",
Arrays.asList("warn", "error", "fatal"));
specialEnums.put(
"io.debezium.transforms.ScriptingTransformation#null.handling.mode",
Arrays.asList("keep", "drop", "evaluate"));
this.title = configKey.displayName;
this.name = configKey.name;
this.description = configKey.documentation;
if (!Objects.equals(ConfigDef.NO_DEFAULT_VALUE, configKey.defaultValue)
&& null != configKey.defaultValue) {
this.defaultValue = String.valueOf(configKey.defaultValue);
}
JsonType jsonType = toJsonType(configKey.type());
this.type = jsonType.schemaType;
this.format = jsonType.format;
String classPropertyName = className + "#" + configKey.name;
if (specialEnums.containsKey(classPropertyName)) {
this.allowedValues = specialEnums.get(classPropertyName);
}
}
public static class JsonType {
@JsonProperty
public final String schemaType;
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public final String format;
public JsonType(String schemaType, String format) {
this.schemaType = schemaType;
this.format = format;
}
public JsonType(String schemaType) {
this.schemaType = schemaType;
this.format = null;
}
}
private static JsonType toJsonType(ConfigDef.Type type) {
switch (type) {
case BOOLEAN:
return new JsonType("BOOLEAN");
case CLASS:
return new JsonType("STRING", "class");
case DOUBLE:
return new JsonType("NUMBER", "double");
case INT:
case SHORT:
return new JsonType("INTEGER", "int32");
case LIST:
return new JsonType("STRING", "list,regex");
case LONG:
return new JsonType("INTEGER", "int64");
case PASSWORD:
return new JsonType("STRING", "password");
case STRING:
return new JsonType("STRING");
default:
throw new IllegalArgumentException("Unsupported property type: " + type);
}
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.kcdrestextension.entities;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* JSON model that describes a Single Message Transform (SMT) entry.
*/
public class TransformsInfo {
private static final Logger LOGGER = LoggerFactory.getLogger(TransformsInfo.class);
private final String className;
private final Map<String, PropertyDescriptor> properties;
@JsonCreator
public TransformsInfo(String className, ConfigDef config) {
this.className = className;
this.properties = getConfigProperties(className, config);
}
@JsonCreator
public TransformsInfo(String className, Class<? extends Transformation> transformationClass) {
this.className = className;
try {
LOGGER.info("Loading config for TRANSFORM: " + className + "...");
this.properties = getConfigProperties(transformationClass.getName(), transformationClass.newInstance().config());
}
catch (InstantiationException | IllegalAccessException e) {
LOGGER.error("Unable to load TRANSFORM: " + className
+ "\n\t Reason: " + e.toString());
throw new RuntimeException(e);
}
}
private static Map<String, PropertyDescriptor> getConfigProperties(String className, ConfigDef configDef) {
Map<String, PropertyDescriptor> configProperties = new HashMap<>();
configDef.configKeys().forEach((fieldName, configKey) -> {
if (null != configKey.documentation
&& !configKey.documentation.startsWith("Deprecated")
&& !configKey.internalConfig) {
configProperties.put(fieldName, new PropertyDescriptor(className, configKey));
}
});
return configProperties;
}
public TransformsInfo(PluginDesc<Transformation> transform) {
this(transform.className(), transform.pluginClass());
}
@JsonProperty("transform")
public String className() {
return this.className;
}
@JsonProperty
public Map<String, PropertyDescriptor> properties() {
return this.properties;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
else if (o != null && this.getClass() == o.getClass()) {
TransformsInfo that = (TransformsInfo) o;
return Objects.equals(this.className, that.className)
&& Objects.equals(this.properties, that.properties);
}
else {
return false;
}
}
public int hashCode() {
return Objects.hash(this.className, this.properties);
}
public String toString() {
return "ConnectorPluginInfo{" + "className='" + this.className + '\'' +
", documentation='" + this.properties + '\'' +
'}';
}
}

View File

@ -0,0 +1 @@
io.debezium.kcdrestextension.DebeziumConnectRestExtension

View File

@ -53,6 +53,7 @@
</developers>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
@ -148,6 +149,7 @@
<module>debezium-scripting</module>
<module>debezium-server</module>
<module>debezium-testing</module>
<module>connect-rest-extension</module>
</modules>
<distributionManagement>