DBZ-7618 Implement Versioned interfaces in Transformation and Converter plugins

This commit is contained in:
harveyyue 2024-03-09 15:32:37 +08:00 committed by Jiri Pechanec
parent a5cb8b4596
commit 48c67e40fb
28 changed files with 225 additions and 26 deletions

View File

@ -7,7 +7,6 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
@ -43,6 +42,6 @@ public void configure(Map<String, ?> configs) {
@Override @Override
public String version() { public String version() {
return AppInfoParser.getVersion(); return Module.version();
} }
} }

View File

@ -9,6 +9,7 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -25,6 +26,7 @@
import io.debezium.common.annotation.Incubating; import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState; import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState;
import io.debezium.connector.mongodb.transforms.MongoDataConverter; import io.debezium.connector.mongodb.transforms.MongoDataConverter;
import io.debezium.time.Timestamp; import io.debezium.time.Timestamp;
@ -38,7 +40,7 @@
* @author Sungho Hwang * @author Sungho Hwang
*/ */
@Incubating @Incubating
public class MongoEventRouter<R extends ConnectRecord<R>> implements Transformation<R> { public class MongoEventRouter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoEventRouter.class); private static final Logger LOGGER = LoggerFactory.getLogger(MongoEventRouter.class);
@ -95,6 +97,11 @@ public void configure(Map<String, ?> configMap) {
eventRouterDelegate.configure(convertedConfigMap); eventRouterDelegate.configure(convertedConfigMap);
} }
@Override
public String version() {
return Module.version();
}
/** /**
* Replaces <i>after</i> field by parsing and expanding original JSON string to Struct type. * Replaces <i>after</i> field by parsing and expanding original JSON string to Struct type.
* *

View File

@ -8,6 +8,7 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
@ -15,6 +16,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.Module;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.transforms.SmtManager; import io.debezium.transforms.SmtManager;
@ -25,7 +27,7 @@
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Anisha Mohanty * @author Anisha Mohanty
*/ */
public class ReadToInsertEvent<R extends ConnectRecord<R>> implements Transformation<R> { public class ReadToInsertEvent<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(ReadToInsertEvent.class); private static final Logger LOGGER = LoggerFactory.getLogger(ReadToInsertEvent.class);
@ -72,4 +74,9 @@ public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props); final Configuration config = Configuration.from(props);
smtManager = new SmtManager<>(config); smtManager = new SmtManager<>(config);
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -10,6 +10,7 @@
import java.util.Optional; import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.header.Headers;
@ -20,6 +21,7 @@
import io.debezium.annotation.VisibleForTesting; import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.connector.postgresql.Module;
import io.debezium.connector.postgresql.SourceInfo; import io.debezium.connector.postgresql.SourceInfo;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
@ -37,7 +39,7 @@
* *
* @param <R> * @param <R>
*/ */
public class TimescaleDb<R extends ConnectRecord<R>> implements Transformation<R> { public class TimescaleDb<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(TimescaleDb.class); private static final Logger LOGGER = LoggerFactory.getLogger(TimescaleDb.class);
@ -150,6 +152,11 @@ public void close() {
} }
} }
@Override
public String version() {
return Module.version();
}
@VisibleForTesting @VisibleForTesting
void setMetadata(TimescaleDbMetadata metadata) { void setMetadata(TimescaleDbMetadata metadata) {
this.metadata = metadata; this.metadata = metadata;

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium;
import java.util.Properties;
import io.debezium.util.IoUtil;
public class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/build.version");
public static String version() {
return INFO.getProperty("version");
}
}

View File

@ -11,6 +11,7 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
@ -20,6 +21,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
@ -37,7 +39,7 @@
* *
* @author Nathan Bradshaw * @author Nathan Bradshaw
*/ */
public class BinaryDataConverter implements Converter, HeaderConverter { public class BinaryDataConverter implements Converter, HeaderConverter, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(BinaryDataConverter.class); private static final Logger LOGGER = LoggerFactory.getLogger(BinaryDataConverter.class);
private static final ConfigDef CONFIG_DEF; private static final ConfigDef CONFIG_DEF;
@ -109,6 +111,11 @@ public void close() throws IOException {
} }
@Override
public String version() {
return Module.version();
}
private void assertDelegateProvided(String name, Object type) { private void assertDelegateProvided(String name, Object type) {
if (delegateConverter == null) { if (delegateConverter == null) {
throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured"); throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured");

View File

@ -10,6 +10,7 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
@ -19,6 +20,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
@ -31,7 +33,7 @@
* *
* @author Nathan Bradshaw * @author Nathan Bradshaw
*/ */
public class ByteArrayConverter implements Converter, HeaderConverter { public class ByteArrayConverter implements Converter, HeaderConverter, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayConverter.class); private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayConverter.class);
private static final ConfigDef CONFIG_DEF; private static final ConfigDef CONFIG_DEF;
@ -99,6 +101,11 @@ public void close() throws IOException {
} }
@Override
public String version() {
return Module.version();
}
private void assertDelegateProvided(String name, Object type) { private void assertDelegateProvided(String name, Object type) {
if (delegateConverter == null) { if (delegateConverter == null) {
throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured"); throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured");

View File

@ -24,6 +24,7 @@
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.Schema.Type;
@ -44,6 +45,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.Module;
import io.debezium.annotation.Immutable; import io.debezium.annotation.Immutable;
import io.debezium.annotation.VisibleForTesting; import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
@ -85,7 +87,7 @@
* Since Kafka converters has not support headers yet, right now CloudEvents converter use structured mode as the * Since Kafka converters has not support headers yet, right now CloudEvents converter use structured mode as the
* default. * default.
*/ */
public class CloudEventsConverter implements Converter { public class CloudEventsConverter implements Converter, Versioned {
private static final String EXTENSION_NAME_PREFIX = "iodebezium"; private static final String EXTENSION_NAME_PREFIX = "iodebezium";
private static final String TX_ATTRIBUTE_PREFIX = "tx"; private static final String TX_ATTRIBUTE_PREFIX = "tx";
@ -96,7 +98,7 @@ public class CloudEventsConverter implements Converter {
private static final String CONFLUENT_AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter"; private static final String CONFLUENT_AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter";
private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url";
private static String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter"; private static final String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter";
private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url"; private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
/** /**
@ -109,7 +111,7 @@ public class CloudEventsConverter implements Converter {
private static Method CONVERT_TO_CONNECT_METHOD; private static Method CONVERT_TO_CONNECT_METHOD;
@Immutable @Immutable
private static Map<String, CloudEventsProvider> providers = new HashMap<>(); private static final Map<String, CloudEventsProvider> PROVIDERS;
static { static {
try { try {
@ -135,7 +137,7 @@ public class CloudEventsConverter implements Converter {
tmp.put(provider.getName(), provider); tmp.put(provider.getName(), provider);
} }
providers = Collections.unmodifiableMap(tmp); PROVIDERS = Collections.unmodifiableMap(tmp);
} }
private SerializerType ceSerializerType = withName(CloudEventsConverterConfig.CLOUDEVENTS_SERIALIZER_TYPE_DEFAULT); private SerializerType ceSerializerType = withName(CloudEventsConverterConfig.CLOUDEVENTS_SERIALIZER_TYPE_DEFAULT);
@ -169,6 +171,11 @@ public CloudEventsConverter(Converter avroConverter) {
this.avroConverter = avroConverter; this.avroConverter = avroConverter;
} }
@Override
public String version() {
return Module.version();
}
@Override @Override
public void configure(Map<String, ?> configs, boolean isKey) { public void configure(Map<String, ?> configs, boolean isKey) {
Map<String, Object> conf = new HashMap<>(configs); Map<String, Object> conf = new HashMap<>(configs);
@ -335,7 +342,7 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
*/ */
private static CloudEventsProvider lookupCloudEventsProvider(Struct source) { private static CloudEventsProvider lookupCloudEventsProvider(Struct source) {
String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY); String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
CloudEventsProvider provider = providers.get(connectorType); CloudEventsProvider provider = PROVIDERS.get(connectorType);
if (provider != null) { if (provider != null) {
return provider; return provider;
} }

View File

@ -32,6 +32,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -44,6 +45,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
@ -60,7 +62,7 @@
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Harvey Yue * @author Harvey Yue
*/ */
public abstract class AbstractExtractNewRecordState<R extends ConnectRecord<R>> implements Transformation<R> { public abstract class AbstractExtractNewRecordState<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExtractNewRecordState.class); private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExtractNewRecordState.class);
private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\."); private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\.");
@ -75,6 +77,11 @@ public abstract class AbstractExtractNewRecordState<R extends ConnectRecord<R>>
protected List<FieldReference> additionalFields; protected List<FieldReference> additionalFields;
protected String routeByField; protected String routeByField;
@Override
public String version() {
return Module.version();
}
@Override @Override
public void configure(final Map<String, ?> configs) { public void configure(final Map<String, ?> configs) {
config = Configuration.from(configs); config = Configuration.from(configs);

View File

@ -16,6 +16,7 @@
import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache; import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
@ -25,6 +26,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode; import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
@ -53,7 +55,7 @@
* @author David Leibovic * @author David Leibovic
* @author Mario Mueller * @author Mario Mueller
*/ */
public class ByLogicalTableRouter<R extends ConnectRecord<R>> implements Transformation<R> { public class ByLogicalTableRouter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Field TOPIC_REGEX = Field.create("topic.regex") private static final Field TOPIC_REGEX = Field.create("topic.regex")
.withDisplayName("Topic regex") .withDisplayName("Topic regex")
@ -288,6 +290,11 @@ public ConfigDef config() {
return config; return config;
} }
@Override
public String version() {
return Module.version();
}
/** /**
* Determine the new topic name. * Determine the new topic name.
* *

View File

@ -13,12 +13,14 @@
import java.util.Objects; import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -30,7 +32,7 @@
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Harvey Yue * @author Harvey Yue
*/ */
public class ExtractChangedRecordState<R extends ConnectRecord<R>> implements Transformation<R> { public class ExtractChangedRecordState<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
public static final Field HEADER_CHANGED_NAME = Field.create("header.changed.name") public static final Field HEADER_CHANGED_NAME = Field.create("header.changed.name")
.withDisplayName("Header change name.") .withDisplayName("Header change name.")
@ -110,4 +112,9 @@ public ConfigDef config() {
Field.group(config, null, HEADER_CHANGED_NAME, HEADER_UNCHANGED_NAME); Field.group(config, null, HEADER_CHANGED_NAME, HEADER_UNCHANGED_NAME);
return config; return config;
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -29,6 +29,7 @@
import java.util.Optional; import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
@ -39,6 +40,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode; import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
@ -47,7 +49,7 @@
import io.debezium.schema.SchemaNameAdjuster; import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.util.BoundedConcurrentHashMap; import io.debezium.util.BoundedConcurrentHashMap;
public class ExtractSchemaToNewRecord<R extends ConnectRecord<R>> implements Transformation<R> { public class ExtractSchemaToNewRecord<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractSchemaToNewRecord.class); private static final Logger LOGGER = LoggerFactory.getLogger(ExtractSchemaToNewRecord.class);
public static final String SOURCE_SCHEMA_KEY = "sourceSchema"; public static final String SOURCE_SCHEMA_KEY = "sourceSchema";
@ -127,6 +129,11 @@ public void configure(Map<String, ?> configs) {
.createAdjuster(); .createAdjuster();
} }
@Override
public String version() {
return Module.version();
}
private Iterable<Field> validateConfigFields() { private Iterable<Field> validateConfigFields() {
return configFields; return configFields;
} }

View File

@ -18,6 +18,7 @@
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
@ -29,11 +30,12 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.util.BoundedConcurrentHashMap; import io.debezium.util.BoundedConcurrentHashMap;
public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R> { public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(HeaderToValue.class); private static final Logger LOGGER = LoggerFactory.getLogger(HeaderToValue.class);
public static final String FIELDS_CONF = "fields"; public static final String FIELDS_CONF = "fields";
@ -309,4 +311,9 @@ private String headersToString(Map<?, ?> map) {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -14,12 +14,14 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.relational.history.ConnectTableChangeSerializer; import io.debezium.relational.history.ConnectTableChangeSerializer;
@ -30,7 +32,7 @@
* This SMT to filter schema change event * This SMT to filter schema change event
* @param <R> * @param <R>
*/ */
public class SchemaChangeEventFilter<R extends ConnectRecord<R>> implements Transformation<R> { public class SchemaChangeEventFilter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeEventFilter.class); private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeEventFilter.class);
private static final Field SCHEMA_CHANGE_EVENT_EXCLUDE_LIST = Field.create("schema.change.event.exclude.list") private static final Field SCHEMA_CHANGE_EVENT_EXCLUDE_LIST = Field.create("schema.change.event.exclude.list")
@ -91,4 +93,9 @@ public ConfigDef config() {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -25,6 +25,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -35,6 +36,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
@ -50,7 +52,7 @@
* *
*/ */
public class TimezoneConverter<R extends ConnectRecord<R>> implements Transformation<R> { public class TimezoneConverter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(TimezoneConverter.class); private static final Logger LOGGER = LoggerFactory.getLogger(TimezoneConverter.class);
private static final Field CONVERTED_TIMEZONE = Field.create("converted.timezone") private static final Field CONVERTED_TIMEZONE = Field.create("converted.timezone")
@ -254,6 +256,11 @@ else if (ZoneId.getAvailableZoneIds().contains(convertedTimezone)) {
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
private enum Type { private enum Type {
ALL, ALL,
INCLUDE, INCLUDE,

View File

@ -8,15 +8,18 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import io.debezium.Module;
/** /**
* Debezium Outbox Transform Event Router * Debezium Outbox Transform Event Router
* *
* @author Renato mefi (gh@mefi.in) * @author Renato mefi (gh@mefi.in)
*/ */
public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R> { public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
EventRouterDelegate<R> eventRouterDelegate = new EventRouterDelegate<>(); EventRouterDelegate<R> eventRouterDelegate = new EventRouterDelegate<>();
@ -39,4 +42,9 @@ public void close() {
public void configure(Map<String, ?> configMap) { public void configure(Map<String, ?> configMap) {
eventRouterDelegate.configure(configMap); eventRouterDelegate.configure(configMap);
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -17,6 +17,7 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
@ -26,6 +27,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field; import io.debezium.config.Field;
@ -39,7 +41,7 @@
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Mario Fiore Vitale * @author Mario Fiore Vitale
*/ */
public class PartitionRouting<R extends ConnectRecord<R>> implements Transformation<R> { public class PartitionRouting<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRouting.class); private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRouting.class);
private static final MurmurHash3 MURMUR_HASH_3 = MurmurHash3.getInstance(); private static final MurmurHash3 MURMUR_HASH_3 = MurmurHash3.getInstance();
@ -250,4 +252,9 @@ protected int computePartition(Integer partitionNumber, List<Object> values) {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -8,6 +8,7 @@
import java.util.Map; import java.util.Map;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -16,6 +17,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.Module;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
@ -36,7 +38,7 @@
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate * @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */
public class ActivateTracingSpan<R extends ConnectRecord<R>> implements Transformation<R> { public class ActivateTracingSpan<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class); private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class);
@ -146,6 +148,11 @@ public ConfigDef config() {
return config; return config;
} }
@Override
public String version() {
return Module.version();
}
public static boolean isOpenTelemetryAvailable() { public static boolean isOpenTelemetryAvailable() {
return OPEN_TELEMETRY_AVAILABLE; return OPEN_TELEMETRY_AVAILABLE;
} }

View File

@ -0,0 +1 @@
version=${project.version}

View File

@ -71,6 +71,7 @@
<directory>src/main/resources</directory> <directory>src/main/resources</directory>
<includes> <includes>
<include>**/build.properties</include> <include>**/build.properties</include>
<include>**/*</include>
</includes> </includes>
</resource> </resource>
</resources> </resources>

View File

@ -0,0 +1,18 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.performance;
import java.util.Properties;
import io.debezium.util.IoUtil;
public class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/performance/build.version");
public static String version() {
return INFO.getProperty("version");
}
}

View File

@ -10,6 +10,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -28,6 +29,7 @@
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.performance.Module;
import io.debezium.transforms.Filter; import io.debezium.transforms.Filter;
import io.debezium.util.Collect; import io.debezium.util.Collect;
@ -39,7 +41,7 @@
*/ */
public class FilterSmtPerf { public class FilterSmtPerf {
private static class NativeFilter implements Transformation<SourceRecord> { private static class NativeFilter implements Transformation<SourceRecord>, Versioned {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
@ -64,6 +66,11 @@ public ConfigDef config() {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }
@State(Scope.Thread) @State(Scope.Thread)

View File

@ -10,6 +10,7 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -27,6 +28,8 @@
import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import io.debezium.performance.Module;
/** /**
* A basic test to calculate overhead of using SMTs. * A basic test to calculate overhead of using SMTs.
* *
@ -35,7 +38,7 @@
*/ */
public class SmtOverheadPerf { public class SmtOverheadPerf {
private static class NewRecord implements Transformation<SourceRecord> { private static class NewRecord implements Transformation<SourceRecord>, Versioned {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
@ -62,9 +65,14 @@ public ConfigDef config() {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }
private static class NoOp implements Transformation<SourceRecord> { private static class NoOp implements Transformation<SourceRecord>, Versioned {
@Override @Override
public void configure(Map<String, ?> configs) { public void configure(Map<String, ?> configs) {
@ -83,6 +91,11 @@ public ConfigDef config() {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }
@State(Scope.Thread) @State(Scope.Thread)

View File

@ -0,0 +1 @@
version=${project.version}

View File

@ -79,6 +79,7 @@
<filtering>true</filtering> <filtering>true</filtering>
<directory>src/main/resources</directory> <directory>src/main/resources</directory>
<includes> <includes>
<include>**/*</include>
<include>**/build.properties</include> <include>**/build.properties</include>
</includes> </includes>
</resource> </resource>

View File

@ -0,0 +1,19 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import java.util.Properties;
import io.debezium.util.IoUtil;
public class Module {
private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/scripting/build.version");
public static String version() {
return INFO.getProperty("version");
}
}

View File

@ -9,6 +9,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -37,7 +38,7 @@
* @author Jiri Pechanec * @author Jiri Pechanec
*/ */
@Incubating @Incubating
public abstract class ScriptingTransformation<R extends ConnectRecord<R>> implements Transformation<R> { public abstract class ScriptingTransformation<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private final Logger LOGGER = LoggerFactory.getLogger(getClass()); private final Logger LOGGER = LoggerFactory.getLogger(getClass());
@ -203,4 +204,9 @@ public ConfigDef config() {
@Override @Override
public void close() { public void close() {
} }
@Override
public String version() {
return Module.version();
}
} }

View File

@ -0,0 +1 @@
version=${project.version}