DBZ-6201: Add new HeaderToValue SMT to copy headers to value

This commit is contained in:
mfvitale 2023-03-16 17:48:25 +01:00 committed by Jiri Pechanec
parent 4e544debdc
commit c45bf39eb0
2 changed files with 330 additions and 0 deletions

View File

@ -0,0 +1,179 @@
* Copyright Debezium Authors.
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
package io.debezium.transforms;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.BoundedConcurrentHashMap;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.lang.String.format;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String FIELDS_CONF = "fields";
public static final String HEADERS_CONF = "headers";
public static final String OPERATION_CONF = "operation";
private static final String MOVE_OPERATION = "move";
private static final String COPY_OPERATION = "copy";
private static final int SCHEMA_CACHE_SIZE = 64;
enum Operation {
private final String name;
Operation(String name) {
this.name = name;
static Operation fromName(String name) {
switch (name) {
return MOVE;
return COPY;
throw new IllegalArgumentException();
public String toString() {
return name;
public static final Field HEADERS_FIELD = Field.create(HEADERS_CONF)
.withDisplayName("Header names list")
.withDescription("Header names in the record whose values are to be copied or moved to value.")
public static final Field FIELDS_FIELD = Field.create(FIELDS_CONF)
.withDisplayName("Field names list")
.withDescription("Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.")
public static final Field OPERATION_FIELD = Field.create(OPERATION_CONF)
.withDisplayName("Operation: mover or copy")
.withDescription("Either <code>move</code> if the fields are to be moved to the value (removed from the headers), " +
"or <code>copy</code> if the fields are to be copied to the value (retained in the headers).")
private List<String> fields;
private List<String> headers;
private Operation operation;
private SmtManager<R> smtManager;
private BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache = new BoundedConcurrentHashMap<>(SCHEMA_CACHE_SIZE);
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
return config;
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(FIELDS_FIELD, HEADERS_FIELD, OPERATION_FIELD));
fields = config.getList(FIELDS_FIELD);
headers = config.getList(HEADERS_FIELD);
if (headers.size() != fields.size()) {
throw new ConfigException(format("'%s' config must have the same number of elements as '%s' config.",
operation = Operation.fromName(config.getString(OPERATION_FIELD));
public R apply(R record) {
final Struct value = requireStruct(record.value(), "");
Map<String, Header> headerToProcess = StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> headers.contains(header.key()))
.collect(Collectors.toMap(Header::key, Function.identity()));
Schema updatedSchema = schemaUpdateCache.computeIfAbsent(value.schema(),
s -> makeNewSchema(value.schema(), headerToProcess));
// Update the value with the new fields
Struct updatedValue = new Struct(updatedSchema);
for (org.apache.kafka.connect.data.Field field : value.schema().fields()) {
if (value.get(field) != null) {
updatedValue.put(field.name(), value.get(field));
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
updatedValue.put(fields.get(i), currentHeader.value());
return record.newRecord(record.topic(), record.kafkaPartition(),
private Schema makeNewSchema(Schema oldSchema, Map<String, Header> headerToProcess) {
// Get fields from original schema
SchemaBuilder newSchemabuilder = SchemaUtil.copySchemaBasics(oldSchema, SchemaBuilder.struct());
for (org.apache.kafka.connect.data.Field field : oldSchema.fields()) {
newSchemabuilder.field(field.name(), field.schema());
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
String destinationFieldName = fields.get(i);
newSchemabuilder = newSchemabuilder.field(destinationFieldName, currentHeader.schema());
return newSchemabuilder.build();
public void close() {

View File

@ -0,0 +1,151 @@
* Copyright Debezium Authors.
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
package io.debezium.transforms;
import io.debezium.data.Envelope;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.junit.Test;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class HeaderToValueTest {
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
.field("id", Schema.INT64_SCHEMA)
.field("price", Schema.FLOAT32_SCHEMA)
.field("product", Schema.STRING_SCHEMA)
private final HeaderToValue<SourceRecord> headerToValue = new HeaderToValue<>();
public void whenOperationIsNotMoveOrCopyAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
"headers", "h1",
"fields", "f1",
"operation", "invalidOp")))
"Invalid value invalidOp for configuration operation: The 'operation' value is invalid: Value must be one of move, copy");;
public void whenNoFieldsDeclaredAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
"headers", "h1",
"operation", "copy")))
"Invalid value null for configuration fields: The 'fields' value is invalid: A value is required");;
public void whenNoHeadersDeclaredAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
"fields", "f1",
"operation", "copy")))
"Invalid value null for configuration headers: The 'headers' value is invalid: A value is required");;
public void whenHeadersAndFieldsHaveDifferentSizeAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
"headers", "h1,h2",
"fields", "f1",
"operation", "copy")))
"'fields' config must have the same number of elements as 'headers' config.");;
public void whenARecordThatContainsADefinedHeaderItWillBeCopiedInTheDefinedField() {
"headers", "h1,h2",
"fields", "f1, f2",
"operation", "copy"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
Struct payload = createEnvelope.create(row, null , Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", "this is a value from h1 header", Schema.STRING_SCHEMA);
sourceRecord.headers().add("h2", "this is a value from h2 header", Schema.STRING_SCHEMA);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct.get("f1")).isEqualTo("this is a value from h1 header");
assertThat(payloadStruct.get("f2")).isEqualTo("this is a value from h2 header");
public void whenARecordThatContainsADefinedStructHeaderItWillBeCopiedInTheDefinedField() {
Schema headerSchema = SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().name("h1").build();
"headers", "h1",
"fields", "f1",
"operation", "copy"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
Struct payload = createEnvelope.create(row, null , Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", List.of("v1", "v2"), headerSchema);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct.getArray("f1")).contains("v1", "v2");