From 48c67e40fb41ec8f9ba500e8fe7dc10b34657ade Mon Sep 17 00:00:00 2001 From: harveyyue Date: Sat, 9 Mar 2024 15:32:37 +0800 Subject: [PATCH] DBZ-7618 Implement Versioned interfaces in Transformation and Converter plugins --- .../DebeziumConnectRestExtension.java | 3 +-- .../transforms/outbox/MongoEventRouter.java | 9 ++++++++- .../mysql/transforms/ReadToInsertEvent.java | 9 ++++++++- .../transforms/timescaledb/TimescaleDb.java | 9 ++++++++- .../src/main/java/io/debezium/Module.java | 19 +++++++++++++++++++ .../converters/BinaryDataConverter.java | 9 ++++++++- .../converters/ByteArrayConverter.java | 9 ++++++++- .../converters/CloudEventsConverter.java | 17 ++++++++++++----- .../AbstractExtractNewRecordState.java | 9 ++++++++- .../transforms/ByLogicalTableRouter.java | 9 ++++++++- .../transforms/ExtractChangedRecordState.java | 9 ++++++++- .../transforms/ExtractSchemaToNewRecord.java | 9 ++++++++- .../io/debezium/transforms/HeaderToValue.java | 9 ++++++++- .../transforms/SchemaChangeEventFilter.java | 9 ++++++++- .../transforms/TimezoneConverter.java | 9 ++++++++- .../transforms/outbox/EventRouter.java | 10 +++++++++- .../partitions/PartitionRouting.java | 9 ++++++++- .../tracing/ActivateTracingSpan.java | 9 ++++++++- .../main/resources/io/debezium/build.version | 1 + debezium-microbenchmark/pom.xml | 1 + .../java/io/debezium/performance/Module.java | 18 ++++++++++++++++++ .../performance/core/FilterSmtPerf.java | 9 ++++++++- .../performance/core/SmtOverheadPerf.java | 17 +++++++++++++++-- .../io/debezium/performance/build.version | 1 + debezium-scripting/debezium-scripting/pom.xml | 1 + .../java/io/debezium/transforms/Module.java | 19 +++++++++++++++++++ .../transforms/ScriptingTransformation.java | 8 +++++++- .../io/debezium/scripting/build.version | 1 + 28 files changed, 225 insertions(+), 26 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/Module.java create mode 100644 debezium-core/src/main/resources/io/debezium/build.version create mode 100644 debezium-microbenchmark/src/main/java/io/debezium/performance/Module.java create mode 100644 debezium-microbenchmark/src/main/resources/io/debezium/performance/build.version create mode 100644 debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/Module.java create mode 100644 debezium-scripting/debezium-scripting/src/main/resources/io/debezium/scripting/build.version diff --git a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumConnectRestExtension.java b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumConnectRestExtension.java index b3162d8da..88fa75721 100644 --- a/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumConnectRestExtension.java +++ b/debezium-connect-rest-extension/src/main/java/io/debezium/kcrestextension/DebeziumConnectRestExtension.java @@ -7,7 +7,6 @@ import java.util.Map; -import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; @@ -43,6 +42,6 @@ public void configure(Map configs) { @Override public String version() { - return AppInfoParser.getVersion(); + return Module.version(); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java index 0161c322a..8b73c17d4 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java @@ -9,6 +9,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -25,6 +26,7 @@ import io.debezium.common.annotation.Incubating; import io.debezium.config.Configuration; +import io.debezium.connector.mongodb.Module; import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState; import io.debezium.connector.mongodb.transforms.MongoDataConverter; import io.debezium.time.Timestamp; @@ -38,7 +40,7 @@ * @author Sungho Hwang */ @Incubating -public class MongoEventRouter> implements Transformation { +public class MongoEventRouter> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(MongoEventRouter.class); @@ -95,6 +97,11 @@ public void configure(Map configMap) { eventRouterDelegate.configure(convertedConfigMap); } + @Override + public String version() { + return Module.version(); + } + /** * Replaces after field by parsing and expanding original JSON string to Struct type. * diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/transforms/ReadToInsertEvent.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/transforms/ReadToInsertEvent.java index b00745424..edf7e38e7 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/transforms/ReadToInsertEvent.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/transforms/ReadToInsertEvent.java @@ -8,6 +8,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.Transformation; @@ -15,6 +16,7 @@ import org.slf4j.LoggerFactory; import io.debezium.config.Configuration; +import io.debezium.connector.mysql.Module; import io.debezium.data.Envelope; import io.debezium.transforms.SmtManager; @@ -25,7 +27,7 @@ * @param the subtype of {@link ConnectRecord} on which this transformation will operate * @author Anisha Mohanty */ -public class ReadToInsertEvent> implements Transformation { +public class ReadToInsertEvent> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(ReadToInsertEvent.class); @@ -72,4 +74,9 @@ public void configure(Map props) { final Configuration config = Configuration.from(props); smtManager = new SmtManager<>(config); } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDb.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDb.java index 427f0d1d7..c1348492e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDb.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/transforms/timescaledb/TimescaleDb.java @@ -10,6 +10,7 @@ import java.util.Optional; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.header.Headers; @@ -20,6 +21,7 @@ import io.debezium.annotation.VisibleForTesting; import io.debezium.config.Configuration; import io.debezium.config.Field; +import io.debezium.connector.postgresql.Module; import io.debezium.connector.postgresql.SourceInfo; import io.debezium.data.Envelope; import io.debezium.relational.TableId; @@ -37,7 +39,7 @@ * * @param */ -public class TimescaleDb> implements Transformation { +public class TimescaleDb> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(TimescaleDb.class); @@ -150,6 +152,11 @@ public void close() { } } + @Override + public String version() { + return Module.version(); + } + @VisibleForTesting void setMetadata(TimescaleDbMetadata metadata) { this.metadata = metadata; diff --git a/debezium-core/src/main/java/io/debezium/Module.java b/debezium-core/src/main/java/io/debezium/Module.java new file mode 100644 index 000000000..cfb9bbf38 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/Module.java @@ -0,0 +1,19 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium; + +import java.util.Properties; + +import io.debezium.util.IoUtil; + +public class Module { + + private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/build.version"); + + public static String version() { + return INFO.getProperty("version"); + } +} diff --git a/debezium-core/src/main/java/io/debezium/converters/BinaryDataConverter.java b/debezium-core/src/main/java/io/debezium/converters/BinaryDataConverter.java index c3c35eeae..25fd8eee2 100644 --- a/debezium-core/src/main/java/io/debezium/converters/BinaryDataConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/BinaryDataConverter.java @@ -11,6 +11,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; @@ -20,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Instantiator; @@ -37,7 +39,7 @@ * * @author Nathan Bradshaw */ -public class BinaryDataConverter implements Converter, HeaderConverter { +public class BinaryDataConverter implements Converter, HeaderConverter, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(BinaryDataConverter.class); private static final ConfigDef CONFIG_DEF; @@ -109,6 +111,11 @@ public void close() throws IOException { } + @Override + public String version() { + return Module.version(); + } + private void assertDelegateProvided(String name, Object type) { if (delegateConverter == null) { throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured"); diff --git a/debezium-core/src/main/java/io/debezium/converters/ByteArrayConverter.java b/debezium-core/src/main/java/io/debezium/converters/ByteArrayConverter.java index eb6780dc6..75f7a2d04 100644 --- a/debezium-core/src/main/java/io/debezium/converters/ByteArrayConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/ByteArrayConverter.java @@ -10,6 +10,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; @@ -19,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Instantiator; @@ -31,7 +33,7 @@ * * @author Nathan Bradshaw */ -public class ByteArrayConverter implements Converter, HeaderConverter { +public class ByteArrayConverter implements Converter, HeaderConverter, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayConverter.class); private static final ConfigDef CONFIG_DEF; @@ -99,6 +101,11 @@ public void close() throws IOException { } + @Override + public String version() { + return Module.version(); + } + private void assertDelegateProvided(String name, Object type) { if (delegateConverter == null) { throw new DataException("A " + name + " of type '" + type + "' requires a delegate.converter.type to be configured"); diff --git a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java index d793616ab..2ec3229b3 100644 --- a/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java +++ b/debezium-core/src/main/java/io/debezium/converters/CloudEventsConverter.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema.Type; @@ -44,6 +45,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import io.debezium.Module; import io.debezium.annotation.Immutable; import io.debezium.annotation.VisibleForTesting; import io.debezium.config.Configuration; @@ -85,7 +87,7 @@ * Since Kafka converters has not support headers yet, right now CloudEvents converter use structured mode as the * default. */ -public class CloudEventsConverter implements Converter { +public class CloudEventsConverter implements Converter, Versioned { private static final String EXTENSION_NAME_PREFIX = "iodebezium"; private static final String TX_ATTRIBUTE_PREFIX = "tx"; @@ -96,7 +98,7 @@ public class CloudEventsConverter implements Converter { private static final String CONFLUENT_AVRO_CONVERTER_CLASS = "io.confluent.connect.avro.AvroConverter"; private static final String CONFLUENT_SCHEMA_REGISTRY_URL_CONFIG = "schema.registry.url"; - private static String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter"; + private static final String APICURIO_AVRO_CONVERTER_CLASS = "io.apicurio.registry.utils.converter.AvroConverter"; private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url"; /** @@ -109,7 +111,7 @@ public class CloudEventsConverter implements Converter { private static Method CONVERT_TO_CONNECT_METHOD; @Immutable - private static Map providers = new HashMap<>(); + private static final Map PROVIDERS; static { try { @@ -135,7 +137,7 @@ public class CloudEventsConverter implements Converter { tmp.put(provider.getName(), provider); } - providers = Collections.unmodifiableMap(tmp); + PROVIDERS = Collections.unmodifiableMap(tmp); } private SerializerType ceSerializerType = withName(CloudEventsConverterConfig.CLOUDEVENTS_SERIALIZER_TYPE_DEFAULT); @@ -169,6 +171,11 @@ public CloudEventsConverter(Converter avroConverter) { this.avroConverter = avroConverter; } + @Override + public String version() { + return Module.version(); + } + @Override public void configure(Map configs, boolean isKey) { Map conf = new HashMap<>(configs); @@ -335,7 +342,7 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje */ private static CloudEventsProvider lookupCloudEventsProvider(Struct source) { String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY); - CloudEventsProvider provider = providers.get(connectorType); + CloudEventsProvider provider = PROVIDERS.get(connectorType); if (provider != null) { return provider; } diff --git a/debezium-core/src/main/java/io/debezium/transforms/AbstractExtractNewRecordState.java b/debezium-core/src/main/java/io/debezium/transforms/AbstractExtractNewRecordState.java index 1b51dc522..814a8bef4 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/AbstractExtractNewRecordState.java +++ b/debezium-core/src/main/java/io/debezium/transforms/AbstractExtractNewRecordState.java @@ -32,6 +32,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.data.Envelope; @@ -60,7 +62,7 @@ * @param the subtype of {@link ConnectRecord} on which this transformation will operate * @author Harvey Yue */ -public abstract class AbstractExtractNewRecordState> implements Transformation { +public abstract class AbstractExtractNewRecordState> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExtractNewRecordState.class); private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\."); @@ -75,6 +77,11 @@ public abstract class AbstractExtractNewRecordState> protected List additionalFields; protected String routeByField; + @Override + public String version() { + return Module.version(); + } + @Override public void configure(final Map configs) { config = Configuration.from(configs); diff --git a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java index 86337180a..eaab32c14 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ByLogicalTableRouter.java @@ -16,6 +16,7 @@ import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -25,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -53,7 +55,7 @@ * @author David Leibovic * @author Mario Mueller */ -public class ByLogicalTableRouter> implements Transformation { +public class ByLogicalTableRouter> implements Transformation, Versioned { private static final Field TOPIC_REGEX = Field.create("topic.regex") .withDisplayName("Topic regex") @@ -288,6 +290,11 @@ public ConfigDef config() { return config; } + @Override + public String version() { + return Module.version(); + } + /** * Determine the new topic name. * diff --git a/debezium-core/src/main/java/io/debezium/transforms/ExtractChangedRecordState.java b/debezium-core/src/main/java/io/debezium/transforms/ExtractChangedRecordState.java index 994d93ae9..fd24cd165 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ExtractChangedRecordState.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ExtractChangedRecordState.java @@ -13,12 +13,14 @@ import java.util.Objects; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.Transformation; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.util.Strings; @@ -30,7 +32,7 @@ * @param the subtype of {@link ConnectRecord} on which this transformation will operate * @author Harvey Yue */ -public class ExtractChangedRecordState> implements Transformation { +public class ExtractChangedRecordState> implements Transformation, Versioned { public static final Field HEADER_CHANGED_NAME = Field.create("header.changed.name") .withDisplayName("Header change name.") @@ -110,4 +112,9 @@ public ConfigDef config() { Field.group(config, null, HEADER_CHANGED_NAME, HEADER_UNCHANGED_NAME); return config; } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/ExtractSchemaToNewRecord.java b/debezium-core/src/main/java/io/debezium/transforms/ExtractSchemaToNewRecord.java index a1fa6d462..c187ed65a 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/ExtractSchemaToNewRecord.java +++ b/debezium-core/src/main/java/io/debezium/transforms/ExtractSchemaToNewRecord.java @@ -29,6 +29,7 @@ import java.util.Optional; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.Module; import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -47,7 +49,7 @@ import io.debezium.schema.SchemaNameAdjuster; import io.debezium.util.BoundedConcurrentHashMap; -public class ExtractSchemaToNewRecord> implements Transformation { +public class ExtractSchemaToNewRecord> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(ExtractSchemaToNewRecord.class); public static final String SOURCE_SCHEMA_KEY = "sourceSchema"; @@ -127,6 +129,11 @@ public void configure(Map configs) { .createAdjuster(); } + @Override + public String version() { + return Module.version(); + } + private Iterable validateConfigFields() { return configFields; } diff --git a/debezium-core/src/main/java/io/debezium/transforms/HeaderToValue.java b/debezium-core/src/main/java/io/debezium/transforms/HeaderToValue.java index 5bbea69bb..cd18f6f71 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/HeaderToValue.java +++ b/debezium-core/src/main/java/io/debezium/transforms/HeaderToValue.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -29,11 +30,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.util.BoundedConcurrentHashMap; -public class HeaderToValue> implements Transformation { +public class HeaderToValue> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(HeaderToValue.class); public static final String FIELDS_CONF = "fields"; @@ -309,4 +311,9 @@ private String headersToString(Map map) { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java b/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java index 9a79561b0..ddcb85c36 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/SchemaChangeEventFilter.java @@ -14,12 +14,14 @@ import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.Transformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.relational.history.ConnectTableChangeSerializer; @@ -30,7 +32,7 @@ * This SMT to filter schema change event * @param */ -public class SchemaChangeEventFilter> implements Transformation { +public class SchemaChangeEventFilter> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(SchemaChangeEventFilter.class); private static final Field SCHEMA_CHANGE_EVENT_EXCLUDE_LIST = Field.create("schema.change.event.exclude.list") @@ -91,4 +93,9 @@ public ConfigDef config() { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java index d9e4fab8d..ebf16f92f 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -25,6 +25,7 @@ import java.util.regex.Pattern; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.data.Envelope; @@ -50,7 +52,7 @@ * */ -public class TimezoneConverter> implements Transformation { +public class TimezoneConverter> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(TimezoneConverter.class); private static final Field CONVERTED_TIMEZONE = Field.create("converted.timezone") @@ -254,6 +256,11 @@ else if (ZoneId.getAvailableZoneIds().contains(convertedTimezone)) { public void close() { } + @Override + public String version() { + return Module.version(); + } + private enum Type { ALL, INCLUDE, diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 308d77e4c..dcbbc0d95 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -8,15 +8,18 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.Transformation; +import io.debezium.Module; + /** * Debezium Outbox Transform Event Router * * @author Renato mefi (gh@mefi.in) */ -public class EventRouter> implements Transformation { +public class EventRouter> implements Transformation, Versioned { EventRouterDelegate eventRouterDelegate = new EventRouterDelegate<>(); @@ -39,4 +42,9 @@ public void close() { public void configure(Map configMap) { eventRouterDelegate.configure(configMap); } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/partitions/PartitionRouting.java b/debezium-core/src/main/java/io/debezium/transforms/partitions/PartitionRouting.java index d6e18256f..91a72e806 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/partitions/PartitionRouting.java +++ b/debezium-core/src/main/java/io/debezium/transforms/partitions/PartitionRouting.java @@ -17,6 +17,7 @@ import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; @@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; @@ -39,7 +41,7 @@ * @param the subtype of {@link ConnectRecord} on which this transformation will operate * @author Mario Fiore Vitale */ -public class PartitionRouting> implements Transformation { +public class PartitionRouting> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRouting.class); private static final MurmurHash3 MURMUR_HASH_3 = MurmurHash3.getInstance(); @@ -250,4 +252,9 @@ protected int computePartition(Integer partitionNumber, List values) { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java index 5dc73abb1..24cd40838 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java +++ b/debezium-core/src/main/java/io/debezium/transforms/tracing/ActivateTracingSpan.java @@ -8,6 +8,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -16,6 +17,7 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.Module; import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.data.Envelope; @@ -36,7 +38,7 @@ * @param the subtype of {@link ConnectRecord} on which this transformation will operate * @author Jiri Pechanec */ -public class ActivateTracingSpan> implements Transformation { +public class ActivateTracingSpan> implements Transformation, Versioned { private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class); @@ -146,6 +148,11 @@ public ConfigDef config() { return config; } + @Override + public String version() { + return Module.version(); + } + public static boolean isOpenTelemetryAvailable() { return OPEN_TELEMETRY_AVAILABLE; } diff --git a/debezium-core/src/main/resources/io/debezium/build.version b/debezium-core/src/main/resources/io/debezium/build.version new file mode 100644 index 000000000..e5683df88 --- /dev/null +++ b/debezium-core/src/main/resources/io/debezium/build.version @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file diff --git a/debezium-microbenchmark/pom.xml b/debezium-microbenchmark/pom.xml index fbef8b959..c9d1e03d1 100644 --- a/debezium-microbenchmark/pom.xml +++ b/debezium-microbenchmark/pom.xml @@ -71,6 +71,7 @@ src/main/resources **/build.properties + **/* diff --git a/debezium-microbenchmark/src/main/java/io/debezium/performance/Module.java b/debezium-microbenchmark/src/main/java/io/debezium/performance/Module.java new file mode 100644 index 000000000..6f40c861c --- /dev/null +++ b/debezium-microbenchmark/src/main/java/io/debezium/performance/Module.java @@ -0,0 +1,18 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.performance; + +import java.util.Properties; + +import io.debezium.util.IoUtil; + +public class Module { + private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/performance/build.version"); + + public static String version() { + return INFO.getProperty("version"); + } +} diff --git a/debezium-microbenchmark/src/main/java/io/debezium/performance/core/FilterSmtPerf.java b/debezium-microbenchmark/src/main/java/io/debezium/performance/core/FilterSmtPerf.java index 98ea90501..9ec1b1edf 100644 --- a/debezium-microbenchmark/src/main/java/io/debezium/performance/core/FilterSmtPerf.java +++ b/debezium-microbenchmark/src/main/java/io/debezium/performance/core/FilterSmtPerf.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -28,6 +29,7 @@ import org.openjdk.jmh.annotations.Warmup; import io.debezium.data.Envelope; +import io.debezium.performance.Module; import io.debezium.transforms.Filter; import io.debezium.util.Collect; @@ -39,7 +41,7 @@ */ public class FilterSmtPerf { - private static class NativeFilter implements Transformation { + private static class NativeFilter implements Transformation, Versioned { @Override public void configure(Map configs) { @@ -64,6 +66,11 @@ public ConfigDef config() { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } @State(Scope.Thread) diff --git a/debezium-microbenchmark/src/main/java/io/debezium/performance/core/SmtOverheadPerf.java b/debezium-microbenchmark/src/main/java/io/debezium/performance/core/SmtOverheadPerf.java index 5bc67cfec..d907fd785 100644 --- a/debezium-microbenchmark/src/main/java/io/debezium/performance/core/SmtOverheadPerf.java +++ b/debezium-microbenchmark/src/main/java/io/debezium/performance/core/SmtOverheadPerf.java @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -27,6 +28,8 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +import io.debezium.performance.Module; + /** * A basic test to calculate overhead of using SMTs. * @@ -35,7 +38,7 @@ */ public class SmtOverheadPerf { - private static class NewRecord implements Transformation { + private static class NewRecord implements Transformation, Versioned { @Override public void configure(Map configs) { @@ -62,9 +65,14 @@ public ConfigDef config() { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } - private static class NoOp implements Transformation { + private static class NoOp implements Transformation, Versioned { @Override public void configure(Map configs) { @@ -83,6 +91,11 @@ public ConfigDef config() { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } @State(Scope.Thread) diff --git a/debezium-microbenchmark/src/main/resources/io/debezium/performance/build.version b/debezium-microbenchmark/src/main/resources/io/debezium/performance/build.version new file mode 100644 index 000000000..e5683df88 --- /dev/null +++ b/debezium-microbenchmark/src/main/resources/io/debezium/performance/build.version @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file diff --git a/debezium-scripting/debezium-scripting/pom.xml b/debezium-scripting/debezium-scripting/pom.xml index b75189827..34bdcb966 100644 --- a/debezium-scripting/debezium-scripting/pom.xml +++ b/debezium-scripting/debezium-scripting/pom.xml @@ -79,6 +79,7 @@ true src/main/resources + **/* **/build.properties diff --git a/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/Module.java b/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/Module.java new file mode 100644 index 000000000..0ef1b393f --- /dev/null +++ b/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/Module.java @@ -0,0 +1,19 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import java.util.Properties; + +import io.debezium.util.IoUtil; + +public class Module { + + private static final Properties INFO = IoUtil.loadProperties(Module.class, "io/debezium/scripting/build.version"); + + public static String version() { + return INFO.getProperty("version"); + } +} diff --git a/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/ScriptingTransformation.java b/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/ScriptingTransformation.java index dcba045d3..65690bbca 100644 --- a/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/ScriptingTransformation.java +++ b/debezium-scripting/debezium-scripting/src/main/java/io/debezium/transforms/ScriptingTransformation.java @@ -9,6 +9,7 @@ import java.util.regex.Pattern; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.Transformation; import org.slf4j.Logger; @@ -37,7 +38,7 @@ * @author Jiri Pechanec */ @Incubating -public abstract class ScriptingTransformation> implements Transformation { +public abstract class ScriptingTransformation> implements Transformation, Versioned { private final Logger LOGGER = LoggerFactory.getLogger(getClass()); @@ -203,4 +204,9 @@ public ConfigDef config() { @Override public void close() { } + + @Override + public String version() { + return Module.version(); + } } diff --git a/debezium-scripting/debezium-scripting/src/main/resources/io/debezium/scripting/build.version b/debezium-scripting/debezium-scripting/src/main/resources/io/debezium/scripting/build.version new file mode 100644 index 000000000..e5683df88 --- /dev/null +++ b/debezium-scripting/debezium-scripting/src/main/resources/io/debezium/scripting/build.version @@ -0,0 +1 @@ +version=${project.version} \ No newline at end of file