diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java index 1344da042..9ea9b105d 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.java @@ -21,8 +21,12 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.RetriableException; @@ -176,6 +180,17 @@ public final class EmbeddedEngine implements DebeziumEngine { .withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.") .withDefault(JsonConverter.class.getName()); + /** + * A list of SMTs to be applied on the messages generated by the engine.. + */ + public static final Field TRANSFORMS = Field.create("transforms") + .withDisplayName("List of prefixes defining transformations.") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(Importance.LOW) + .withDescription("Optional list of single message transformations applied on the messages. " + + "The transforms are defined using '.type' config option and configured using options '.