DBZ-6201: Add support for move and nested fields for HeaderToValue SMT

This commit is contained in:
mfvitale 2023-03-17 14:44:08 +01:00 committed by Jiri Pechanec
parent c45bf39eb0
commit 039a3f04e8
2 changed files with 297 additions and 72 deletions

View File

@ -5,9 +5,17 @@
*/
package io.debezium.transforms;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.BoundedConcurrentHashMap;
import static io.debezium.transforms.HeaderToValue.Operation.MOVE;
import static java.lang.String.format;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
@ -15,17 +23,13 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static java.lang.String.format;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.util.BoundedConcurrentHashMap;
public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation<R> {
@ -34,7 +38,9 @@ public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation
public static final String OPERATION_CONF = "operation";
private static final String MOVE_OPERATION = "move";
private static final String COPY_OPERATION = "copy";
private static final int SCHEMA_CACHE_SIZE = 64;
private static final int CACHE_SIZE = 64;
public static final String NESTING_SEPARATOR = ".";
public static final String ROOT_FIELD_NAME = "payload";
enum Operation {
MOVE(MOVE_OPERATION),
@ -73,7 +79,8 @@ public String toString() {
.withDisplayName("Field names list")
.withType(ConfigDef.Type.LIST)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.")
.withDescription(
"Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.")
.required();
public static final Field OPERATION_FIELD = Field.create(OPERATION_CONF)
@ -91,9 +98,9 @@ public String toString() {
private Operation operation;
private SmtManager<R> smtManager;
private final BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache = new BoundedConcurrentHashMap<>(CACHE_SIZE);
private final BoundedConcurrentHashMap<Headers, Headers> headersUpdateCache = new BoundedConcurrentHashMap<>(CACHE_SIZE);
private BoundedConcurrentHashMap<Schema, Schema> schemaUpdateCache = new BoundedConcurrentHashMap<>(SCHEMA_CACHE_SIZE);
@Override
public ConfigDef config() {
@ -106,7 +113,7 @@ public ConfigDef config() {
public void configure(Map<String, ?> props) {
final Configuration config = Configuration.from(props);
smtManager = new SmtManager<>(config);
SmtManager<R> smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(FIELDS_FIELD, HEADERS_FIELD, OPERATION_FIELD));
fields = config.getList(FIELDS_FIELD);
@ -121,57 +128,143 @@ public void configure(Map<String, ?> props) {
@Override
public R apply(R record) {
final Struct value = requireStruct(record.value(), "");
final Struct value = requireStruct(record.value(), "Header field insertion");
Map<String, Header> headerToProcess = StreamSupport.stream(record.headers().spliterator(), false)
.filter(header -> headers.contains(header.key()))
.collect(Collectors.toMap(Header::key, Function.identity()));
Schema updatedSchema = schemaUpdateCache.computeIfAbsent(value.schema(),
s -> makeNewSchema(value.schema(), headerToProcess));
Schema updatedSchema = schemaUpdateCache.computeIfAbsent(value.schema(), valueSchema -> makeNewSchema(valueSchema, headerToProcess));
// Update the value with the new fields
Struct updatedValue = new Struct(updatedSchema);
for (org.apache.kafka.connect.data.Field field : value.schema().fields()) {
if (value.get(field) != null) {
updatedValue.put(field.name(), value.get(field));
}
}
for (int i = 0; i < headers.size(); i++) {
Struct updatedValue = makeUpdatedValue(value, headerToProcess, updatedSchema);
Header currentHeader = headerToProcess.get(headers.get(i));
updatedValue.put(fields.get(i), currentHeader.value());
Headers updatedHeaders = record.headers();
if (MOVE.equals(operation)) {
updatedHeaders = headersUpdateCache.computeIfAbsent(record.headers(), this::makeNewHeaders);
}
return record.newRecord(record.topic(), record.kafkaPartition(),
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp(),
record.headers());
updatedHeaders);
}
private Schema makeNewSchema(Schema oldSchema, Map<String, Header> headerToProcess) {
private Headers makeNewHeaders(Headers originalHeaders) {
// Get fields from original schema
SchemaBuilder newSchemabuilder = SchemaUtil.copySchemaBasics(oldSchema, SchemaBuilder.struct());
for (org.apache.kafka.connect.data.Field field : oldSchema.fields()) {
newSchemabuilder.field(field.name(), field.schema());
Headers updatedHeaders = originalHeaders.duplicate();
headers.forEach(updatedHeaders::remove);
return updatedHeaders;
}
private Struct makeUpdatedValue(Struct originalValue, Map<String, Header> headerToProcess, Schema updatedSchema) {
List<String> nestedFields = fields.stream().filter(field -> field.contains(NESTING_SEPARATOR)).collect(Collectors.toList());
return buildUpdatedValue(ROOT_FIELD_NAME, originalValue, headerToProcess, updatedSchema, nestedFields, 0);
}
private Struct buildUpdatedValue(String fieldName, Struct originalValue, Map<String, Header> headerToProcess, Schema updatedSchema, List<String> nestedFields,
int level) {
Struct updatedValue = new Struct(updatedSchema);
for (org.apache.kafka.connect.data.Field field : originalValue.schema().fields()) {
if (originalValue.get(field) != null) {
if (isContainedIn(field.name(), nestedFields)) {
Struct nestedField = requireStruct(originalValue.get(field), "Nested field");
updatedValue.put(field.name(),
buildUpdatedValue(field.name(), nestedField, headerToProcess, updatedSchema.field(field.name()).schema(), nestedFields, ++level));
}
else {
updatedValue.put(field.name(), originalValue.get(field));
}
}
}
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
String destinationFieldName = fields.get(i);
newSchemabuilder = newSchemabuilder.field(destinationFieldName, currentHeader.schema());
if (currentHeader != null) {
Optional<String> fieldNameToAdd = getFieldName(fields.get(i), fieldName, level);
fieldNameToAdd.ifPresent(s -> updatedValue.put(s, currentHeader.value()));
}
}
return updatedValue;
}
private boolean isContainedIn(String fieldName, List<String> nestedFields) {
return nestedFields.stream().anyMatch(s -> s.contains(fieldName));
}
private Schema makeNewSchema(Schema oldSchema, Map<String, Header> headerToProcess) {
List<String> nestedFields = fields.stream().filter(field -> field.contains(NESTING_SEPARATOR)).collect(Collectors.toList());
return buildNewSchema(ROOT_FIELD_NAME, oldSchema, headerToProcess, nestedFields, 0);
}
private Schema buildNewSchema(String fieldName, Schema oldSchema, Map<String, Header> headerToProcess, List<String> nestedFields, int level) {
if (oldSchema.type().isPrimitive()) {
return oldSchema;
}
// Get fields from original schema
SchemaBuilder newSchemabuilder = SchemaUtil.copySchemaBasics(oldSchema, SchemaBuilder.struct());
for (org.apache.kafka.connect.data.Field field : oldSchema.fields()) {
if (isContainedIn(field.name(), nestedFields)) {
newSchemabuilder.field(field.name(), buildNewSchema(field.name(), field.schema(), headerToProcess, nestedFields, ++level));
}
else {
newSchemabuilder.field(field.name(), field.schema());
}
}
for (int i = 0; i < headers.size(); i++) {
Header currentHeader = headerToProcess.get(headers.get(i));
Optional<String> currentFieldName = getFieldName(fields.get(i), fieldName, level);
if (currentFieldName.isPresent() && currentHeader != null) {
newSchemabuilder = newSchemabuilder.field(currentFieldName.get(), currentHeader.schema());
}
}
return newSchemabuilder.build();
}
private Optional<String> getFieldName(String destinationFieldName, String fieldName, int level) {
String[] nestedNames = destinationFieldName.split("\\.");
if (isRootField(fieldName, nestedNames)) {
return Optional.of(nestedNames[0]);
}
if (isChildrenOf(fieldName, level, nestedNames)) {
return Optional.of(nestedNames[level]);
}
return Optional.empty();
}
private static boolean isChildrenOf(String fieldName, int level, String[] nestedNames) {
int parentLevel = level == 0 ? 0 : level - 1;
return nestedNames[parentLevel].equals(fieldName);
}
private static boolean isRootField(String fieldName, String[] nestedNames) {
return nestedNames.length == 1 && fieldName.equals(ROOT_FIELD_NAME);
}
@Override
public void close() {

View File

@ -6,22 +6,26 @@
package io.debezium.transforms;
import io.debezium.data.Envelope;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.junit.Test;
import io.debezium.data.Envelope;
public class HeaderToValueTest {
@ -36,54 +40,50 @@ public class HeaderToValueTest {
@Test
public void whenOperationIsNotMoveOrCopyAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
headerToValue.configure(Map.of(
assertThatThrownBy(() -> headerToValue.configure(Map.of(
"headers", "h1",
"fields", "f1",
"operation", "invalidOp")))
.isInstanceOf(ConfigException.class)
.hasMessageContaining(
"Invalid value invalidOp for configuration operation: The 'operation' value is invalid: Value must be one of move, copy");;
"Invalid value invalidOp for configuration operation: The 'operation' value is invalid: Value must be one of move, copy");
}
@Test
public void whenNoFieldsDeclaredAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
headerToValue.configure(Map.of(
"headers", "h1",
"operation", "copy")))
assertThatThrownBy(() -> headerToValue.configure(Map.of(
"headers", "h1",
"operation", "copy")))
.isInstanceOf(ConfigException.class)
.hasMessageContaining(
"Invalid value null for configuration fields: The 'fields' value is invalid: A value is required");;
"Invalid value null for configuration fields: The 'fields' value is invalid: A value is required");
}
@Test
public void whenNoHeadersDeclaredAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
headerToValue.configure(Map.of(
"fields", "f1",
"operation", "copy")))
assertThatThrownBy(() -> headerToValue.configure(Map.of(
"fields", "f1",
"operation", "copy")))
.isInstanceOf(ConfigException.class)
.hasMessageContaining(
"Invalid value null for configuration headers: The 'headers' value is invalid: A value is required");;
"Invalid value null for configuration headers: The 'headers' value is invalid: A value is required");
}
@Test
public void whenHeadersAndFieldsHaveDifferentSizeAConfigExceptionIsThrew() {
assertThatThrownBy(() ->
headerToValue.configure(Map.of(
"headers", "h1,h2",
"fields", "f1",
"operation", "copy")))
assertThatThrownBy(() -> headerToValue.configure(Map.of(
"headers", "h1,h2",
"fields", "f1",
"operation", "copy")))
.isInstanceOf(ConfigException.class)
.hasMessageContaining(
"'fields' config must have the same number of elements as 'headers' config.");;
"'fields' config must have the same number of elements as 'headers' config.");
}
@ -106,7 +106,7 @@ public void whenARecordThatContainsADefinedHeaderItWillBeCopiedInTheDefinedField
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null , Instant.now());
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", "this is a value from h1 header", Schema.STRING_SCHEMA);
sourceRecord.headers().add("h2", "this is a value from h2 header", Schema.STRING_SCHEMA);
@ -138,7 +138,7 @@ public void whenARecordThatContainsADefinedStructHeaderItWillBeCopiedInTheDefine
.withRecord(VALUE_SCHEMA)
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null , Instant.now());
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", List.of("v1", "v2"), headerSchema);
@ -148,4 +148,136 @@ public void whenARecordThatContainsADefinedStructHeaderItWillBeCopiedInTheDefine
assertThat(payloadStruct.getArray("f1")).contains("v1", "v2");
}
}
@Test
public void whenARecordThatContainsADefinedHeaderAndOperationIsMoveItWillBeCopiedInTheDefinedFieldAndRemovedFromHeaders() {
Schema headerSchema = SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().name("h1").build();
headerToValue.configure(Map.of(
"headers", "h1,h2,h3",
"fields", "f1,f2,f3",
"operation", "move"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
.withName("mysql-server-1.inventory.product.Envelope")
.withRecord(VALUE_SCHEMA)
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", List.of("v1", "v2"), headerSchema);
sourceRecord.headers().add("h2", List.of("v1", "v2"), headerSchema);
sourceRecord.headers().add("h3", List.of("v1", "v2"), headerSchema);
sourceRecord.headers().add("h4", List.of("v1", "v2"), headerSchema);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct.getArray("f1")).contains("v1", "v2");
assertThat(payloadStruct.getArray("f2")).contains("v1", "v2");
assertThat(payloadStruct.getArray("f3")).contains("v1", "v2");
assertThat(StreamSupport.stream(transformedRecord.headers().spliterator(), false)
.map(Header::key).collect(Collectors.toList())).containsExactly("h4");
}
@Test
public void supportNestedField() {
headerToValue.configure(Map.of(
"headers", "h1,h2,h3",
"fields", "f1,after.f2,source.f3",
"operation", "copy"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
.withName("mysql-server-1.inventory.product.Envelope")
.withRecord(VALUE_SCHEMA)
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h1", "this is a value from h1 header", Schema.STRING_SCHEMA);
sourceRecord.headers().add("h2", "this is a value from h2 header", Schema.STRING_SCHEMA);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct.get("f1")).isEqualTo("this is a value from h1 header");
Struct after = Requirements.requireStruct(payloadStruct.get("after"), "");
assertThat(after.get("f2")).isEqualTo("this is a value from h2 header");
}
@Test
public void notExistingHeader() {
headerToValue.configure(Map.of(
"headers", "h1,",
"fields", "f1",
"operation", "copy"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
.withName("mysql-server-1.inventory.product.Envelope")
.withRecord(VALUE_SCHEMA)
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h2", "this is a value from h2 header", Schema.STRING_SCHEMA);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct).isEqualTo(payload);
}
@Test
public void notExistingNestedField() {
headerToValue.configure(Map.of(
"headers", "h1,",
"fields", "parent.f1",
"operation", "copy"));
Struct row = new Struct(VALUE_SCHEMA)
.put("id", 101L)
.put("price", 20.0F)
.put("product", "a product");
Envelope createEnvelope = Envelope.defineSchema()
.withName("mysql-server-1.inventory.product.Envelope")
.withRecord(VALUE_SCHEMA)
.withSource(Schema.STRING_SCHEMA)
.build();
Struct payload = createEnvelope.create(row, null, Instant.now());
SourceRecord sourceRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "topic", createEnvelope.schema(), payload);
sourceRecord.headers().add("h2", "this is a value from h2 header", Schema.STRING_SCHEMA);
SourceRecord transformedRecord = headerToValue.apply(sourceRecord);
Struct payloadStruct = Requirements.requireStruct(transformedRecord.value(), "");
assertThat(payloadStruct).isEqualTo(payload);
}
}