DBZ-7065 Add ConvertCloudEventToSaveableForm transform

This commit is contained in:
Roman Kudryashov 2023-10-29 18:02:00 +03:00 committed by Jiri Pechanec
parent 67ac47f65d
commit 0853eb90fc
4 changed files with 363 additions and 1 deletions

View File

@ -0,0 +1,155 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.transform;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.transforms.outbox.AdditionalFieldsValidator;
/**
* The transform converts a CloudEvent to a structure suitable for `JdbcSinkConnector`. It uses provided by a user
* mapping between a CloudEvent's fields and names of database columns. The resulting value schema has no name. A
* CloudEvent's `data` field is flattened if needed
*
* @author Roman Kudryashov
*/
public class ConvertCloudEventToSaveableForm implements Transformation<SinkRecord> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConvertCloudEventToSaveableForm.class);
private static final String FIELD_NAME_SEPARATOR = ":";
private static final String CLOUD_EVENTS_SCHEMA_NAME_SUFFIX = ".CloudEvents.Envelope";
private static final Field FIELDS_MAPPING = Field.create("fields.mapping")
.withDisplayName("Specifies a list of pairs with mappings between a CloudEvent's fields and names of database columns")
.withType(ConfigDef.Type.LIST)
.withValidation(AdditionalFieldsValidator::isListOfStringPairs)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Specifies a list of pairs with mappings between a CloudEvent's fields and names of database columns");
private Map<String, String> fieldsMapping;
private final JsonConverter jsonDataConverter = new JsonConverter();
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, FIELDS_MAPPING);
return config;
}
@Override
public void configure(final Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
final List<String> rawFieldsMapping = config.getList(FIELDS_MAPPING);
fieldsMapping = Collections.unmodifiableMap(parseFieldsMapping(rawFieldsMapping));
Map<String, Object> jsonDataConverterConfig = new HashMap<>();
jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value");
jsonDataConverter.configure(jsonDataConverterConfig);
}
private Map<String, String> parseFieldsMapping(List<String> rawFieldsMapping) {
final Map<String, String> parsedFieldsMapping = new HashMap<>();
for (String rawFieldMapping : rawFieldsMapping) {
final String[] parts = rawFieldMapping.split(FIELD_NAME_SEPARATOR);
final String cloudEventFieldName = parts[0];
final String databaseColumnName;
if (rawFieldMapping.contains(FIELD_NAME_SEPARATOR)) {
databaseColumnName = parts[1];
}
else {
databaseColumnName = cloudEventFieldName;
}
parsedFieldsMapping.put(cloudEventFieldName, databaseColumnName);
}
return parsedFieldsMapping;
}
@Override
public SinkRecord apply(final SinkRecord record) {
if (!record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX) || fieldsMapping.isEmpty()) {
return null;
}
final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA);
final boolean cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT;
final Schema newSchema = getSchema(record, cloudEventContainsDataAsStruct);
final Struct newValue = getValue(record, newSchema, cloudEventContainsDataAsStruct);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
newSchema,
newValue,
record.timestamp());
}
private Schema getSchema(SinkRecord record, boolean cloudEventContainsDataAsStruct) {
final SchemaBuilder schemaBuilder = SchemaBuilder.struct();
for (Map.Entry<String, String> fieldMapping : fieldsMapping.entrySet()) {
final String cloudEventFieldName = fieldMapping.getKey();
final String databaseColumnName = fieldMapping.getValue();
final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName);
final Schema databaseColumnSchema;
if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) {
databaseColumnSchema = Schema.STRING_SCHEMA;
}
else {
databaseColumnSchema = cloudEventField.schema();
}
schemaBuilder.field(databaseColumnName, databaseColumnSchema);
}
return schemaBuilder.build();
}
private Struct getValue(SinkRecord record, Schema schema, boolean cloudEventContainsDataAsStruct) {
final Struct struct = new Struct(schema);
final Struct cloudEvent = requireStruct(record.value(), "convert cloud event");
for (Map.Entry<String, String> fieldMapping : fieldsMapping.entrySet()) {
final String cloudEventFieldName = fieldMapping.getKey();
final String databaseColumnName = fieldMapping.getValue();
Object fieldValue = cloudEvent.get(cloudEventFieldName);
if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) {
final Struct data = (Struct) fieldValue;
final byte[] dataInJson = jsonDataConverter.fromConnectData(null, data.schema(), data);
fieldValue = new String(dataInJson);
}
struct.put(databaseColumnName, fieldValue);
}
return struct;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,158 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.transform;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Test;
import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.doc.FixFor;
/**
* Unit tests for {@link ConvertCloudEventToSaveableFormTest}
*
* @author Roman Kudryashov
*/
class ConvertCloudEventToSaveableFormTest {
@Test
@FixFor("DBZ-7065")
void testConvertNotCloudEventRecord() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord createRecord = factory.createRecord("test.topic");
assertThat(createRecord.valueSchema().name()).doesNotEndWith(".CloudEvents.Envelope");
final SinkRecord convertedRecord = transform.apply(createRecord);
assertThat(convertedRecord).isNull();
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithEmptyConfig() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNull();
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfIdField() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id");
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();
assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT);
assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(1);
assertThat(convertedRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(convertedRecord.value()).isInstanceOf(Struct.class);
assertThat(((Struct) convertedRecord.value()).getString("id")).isNotBlank();
checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord);
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfDataField() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "data");
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();
assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT);
assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(1);
assertThat(convertedRecord.valueSchema().field("data").schema()).isEqualTo(Schema.STRING_SCHEMA);
assertThat(convertedRecord.value()).isInstanceOf(Struct.class);
assertThat(((Struct) convertedRecord.value()).getString("data")).isNotBlank();
checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord);
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id,source:created_by,specversion:ce_spec_number,type,time:created_at,datacontenttype:payload_format,data:payload");
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();
assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT);
assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(convertedRecord.value()).isInstanceOf(Struct.class);
Struct convertedRecordValue = (Struct) convertedRecord.value();
assertThat(convertedRecordValue.getString("id")).isNotBlank();
assertThat(convertedRecordValue.getString("created_by")).isNotBlank();
assertThat(convertedRecordValue.getString("ce_spec_number")).isNotBlank();
assertThat(convertedRecordValue.getString("type")).isNotBlank();
assertThat(convertedRecordValue.getString("created_at")).isNotBlank();
assertThat(convertedRecordValue.getString("payload_format")).isNotBlank();
assertThat(convertedRecordValue.getString("payload")).isNotBlank();
checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord);
}
}
private void checkParamsOfOriginalAndConvertedRecordsAreEqual(SinkRecord original, SinkRecord converted) {
assertThat(converted.topic()).isEqualTo(original.topic());
assertThat(converted.kafkaPartition()).isEqualTo(original.originalKafkaPartition());
assertThat(converted.kafkaOffset()).isEqualTo(original.originalKafkaOffset());
assertThat(converted.keySchema()).isEqualTo(original.keySchema());
assertThat(converted.key()).isEqualTo(original.key());
assertThat(converted.headers()).isEqualTo(original.headers());
assertThat(converted.timestamp()).isEqualTo(original.timestamp());
}
}

View File

@ -6,14 +6,18 @@
package io.debezium.connector.jdbc.util;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
import io.debezium.util.Strings;
@ -47,6 +51,10 @@ public static SinkRecordTypeBuilder truncate() {
return new SinkRecordTypeBuilder(Type.TRUNCATE);
}
public static SinkRecordTypeBuilder cloudEvent() {
return new SinkRecordTypeBuilder(Type.CLOUD_EVENT);
}
public static class SinkRecordTypeBuilder {
private final Type type;
@ -58,6 +66,7 @@ public static class SinkRecordTypeBuilder {
private Schema sourceSchema;
private int partition;
private int offset;
private SinkRecord basicRecord;
private Map<String, Object> keyValues = new HashMap<>();
private Map<String, Object> beforeValues = new HashMap<>();
private Map<String, Object> afterValues = new HashMap<>();
@ -127,6 +136,11 @@ public SinkRecordTypeBuilder offset(int offset) {
return this;
}
public SinkRecordTypeBuilder basicRecord(SinkRecord basicRecord) {
this.basicRecord = basicRecord;
return this;
}
public SinkRecord build() {
switch (type) {
case CREATE:
@ -139,6 +153,8 @@ public SinkRecord build() {
return buildTombstoneSinkRecord();
case TRUNCATE:
return buildTruncateSinkRecord();
case CLOUD_EVENT:
return buildCloudEventRecord();
}
return null;
}
@ -215,6 +231,32 @@ private SinkRecord buildTruncateSinkRecord() {
}
}
private SinkRecord buildCloudEventRecord() {
final SchemaBuilder schemaBuilder = SchemaBuilder.struct()
.name("test.CloudEvents.Envelope")
.field(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.TIME, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA)
.field(CloudEventsMaker.FieldName.DATA, basicRecord.valueSchema());
Schema ceSchema = schemaBuilder.build();
Struct ceValue = new Struct(ceSchema);
ceValue.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString());
ceValue.put(CloudEventsMaker.FieldName.SOURCE, "test");
ceValue.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0");
ceValue.put(CloudEventsMaker.FieldName.TYPE, "TestType");
ceValue.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString());
ceValue.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json");
ceValue.put(CloudEventsMaker.FieldName.DATA, basicRecord.value());
return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(), ceSchema, ceValue,
basicRecord.kafkaOffset());
}
private Envelope createEnvelope() {
return Envelope.defineSchema()
.withRecord(recordSchema)
@ -243,6 +285,7 @@ private enum Type {
UPDATE,
DELETE,
TOMBSTONE,
TRUNCATE
TRUNCATE,
CLOUD_EVENT
}
}

View File

@ -404,4 +404,10 @@ default SinkRecord truncateRecord(String topicName) {
.build();
}
default SinkRecord cloudEventRecord(String topicName) {
final SinkRecord basicRecord = updateRecord(topicName);
return SinkRecordBuilder.cloudEvent()
.basicRecord(basicRecord)
.build();
}
}