From 0853eb90fcb93fe6377a41d75d7eeaab2b4331e8 Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Sun, 29 Oct 2023 18:02:00 +0300 Subject: [PATCH] DBZ-7065 Add `ConvertCloudEventToSaveableForm` transform --- .../ConvertCloudEventToSaveableForm.java | 155 +++++++++++++++++ .../ConvertCloudEventToSaveableFormTest.java | 158 ++++++++++++++++++ .../jdbc/util/SinkRecordBuilder.java | 45 ++++- .../jdbc/util/SinkRecordFactory.java | 6 + 4 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableForm.java create mode 100644 debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableFormTest.java diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableForm.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableForm.java new file mode 100644 index 000000000..85af11867 --- /dev/null +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableForm.java @@ -0,0 +1,155 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.transform; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.converters.spi.CloudEventsMaker; +import io.debezium.transforms.outbox.AdditionalFieldsValidator; + +/** + * The transform converts a CloudEvent to a structure suitable for `JdbcSinkConnector`. It uses provided by a user + * mapping between a CloudEvent's fields and names of database columns. The resulting value schema has no name. A + * CloudEvent's `data` field is flattened if needed + * + * @author Roman Kudryashov + */ +public class ConvertCloudEventToSaveableForm implements Transformation { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConvertCloudEventToSaveableForm.class); + + private static final String FIELD_NAME_SEPARATOR = ":"; + + private static final String CLOUD_EVENTS_SCHEMA_NAME_SUFFIX = ".CloudEvents.Envelope"; + + private static final Field FIELDS_MAPPING = Field.create("fields.mapping") + .withDisplayName("Specifies a list of pairs with mappings between a CloudEvent's fields and names of database columns") + .withType(ConfigDef.Type.LIST) + .withValidation(AdditionalFieldsValidator::isListOfStringPairs) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.HIGH) + .withDescription("Specifies a list of pairs with mappings between a CloudEvent's fields and names of database columns"); + + private Map fieldsMapping; + + private final JsonConverter jsonDataConverter = new JsonConverter(); + + @Override + public ConfigDef config() { + final ConfigDef config = new ConfigDef(); + Field.group(config, null, FIELDS_MAPPING); + return config; + } + + @Override + public void configure(final Map configs) { + final Configuration config = Configuration.from(configs); + + final List rawFieldsMapping = config.getList(FIELDS_MAPPING); + fieldsMapping = Collections.unmodifiableMap(parseFieldsMapping(rawFieldsMapping)); + + Map jsonDataConverterConfig = new HashMap<>(); + jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value"); + jsonDataConverter.configure(jsonDataConverterConfig); + } + + private Map parseFieldsMapping(List rawFieldsMapping) { + final Map parsedFieldsMapping = new HashMap<>(); + for (String rawFieldMapping : rawFieldsMapping) { + final String[] parts = rawFieldMapping.split(FIELD_NAME_SEPARATOR); + final String cloudEventFieldName = parts[0]; + final String databaseColumnName; + if (rawFieldMapping.contains(FIELD_NAME_SEPARATOR)) { + databaseColumnName = parts[1]; + } + else { + databaseColumnName = cloudEventFieldName; + } + parsedFieldsMapping.put(cloudEventFieldName, databaseColumnName); + } + return parsedFieldsMapping; + } + + @Override + public SinkRecord apply(final SinkRecord record) { + if (!record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX) || fieldsMapping.isEmpty()) { + return null; + } + + final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA); + final boolean cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT; + + final Schema newSchema = getSchema(record, cloudEventContainsDataAsStruct); + final Struct newValue = getValue(record, newSchema, cloudEventContainsDataAsStruct); + + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + newSchema, + newValue, + record.timestamp()); + } + + private Schema getSchema(SinkRecord record, boolean cloudEventContainsDataAsStruct) { + final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); + for (Map.Entry fieldMapping : fieldsMapping.entrySet()) { + final String cloudEventFieldName = fieldMapping.getKey(); + final String databaseColumnName = fieldMapping.getValue(); + final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName); + final Schema databaseColumnSchema; + if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) { + databaseColumnSchema = Schema.STRING_SCHEMA; + } + else { + databaseColumnSchema = cloudEventField.schema(); + } + schemaBuilder.field(databaseColumnName, databaseColumnSchema); + } + return schemaBuilder.build(); + } + + private Struct getValue(SinkRecord record, Schema schema, boolean cloudEventContainsDataAsStruct) { + final Struct struct = new Struct(schema); + final Struct cloudEvent = requireStruct(record.value(), "convert cloud event"); + for (Map.Entry fieldMapping : fieldsMapping.entrySet()) { + final String cloudEventFieldName = fieldMapping.getKey(); + final String databaseColumnName = fieldMapping.getValue(); + Object fieldValue = cloudEvent.get(cloudEventFieldName); + if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) { + final Struct data = (Struct) fieldValue; + final byte[] dataInJson = jsonDataConverter.fromConnectData(null, data.schema(), data); + fieldValue = new String(dataInJson); + } + struct.put(databaseColumnName, fieldValue); + } + return struct; + } + + @Override + public void close() { + } +} diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableFormTest.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableFormTest.java new file mode 100644 index 000000000..714fbda31 --- /dev/null +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transform/ConvertCloudEventToSaveableFormTest.java @@ -0,0 +1,158 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.transform; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory; +import io.debezium.connector.jdbc.util.SinkRecordFactory; +import io.debezium.doc.FixFor; + +/** + * Unit tests for {@link ConvertCloudEventToSaveableFormTest} + * + * @author Roman Kudryashov + */ +class ConvertCloudEventToSaveableFormTest { + + @Test + @FixFor("DBZ-7065") + void testConvertNotCloudEventRecord() { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + transform.configure(config); + + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + final SinkRecord createRecord = factory.createRecord("test.topic"); + assertThat(createRecord.valueSchema().name()).doesNotEndWith(".CloudEvents.Envelope"); + + final SinkRecord convertedRecord = transform.apply(createRecord); + assertThat(convertedRecord).isNull(); + } + } + + @Test + @FixFor("DBZ-7065") + void testConvertCloudEventRecordWithEmptyConfig() { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + transform.configure(config); + + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + + final SinkRecord convertedRecord = transform.apply(cloudEventRecord); + assertThat(convertedRecord).isNull(); + } + } + + @Test + @FixFor("DBZ-7065") + void testConvertCloudEventRecordWithMappingOfIdField() { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + config.put("fields.mapping", "id"); + transform.configure(config); + + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + + final SinkRecord convertedRecord = transform.apply(cloudEventRecord); + assertThat(convertedRecord).isNotNull(); + assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT); + assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(1); + assertThat(convertedRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + assertThat(convertedRecord.value()).isInstanceOf(Struct.class); + assertThat(((Struct) convertedRecord.value()).getString("id")).isNotBlank(); + checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord); + } + } + + @Test + @FixFor("DBZ-7065") + void testConvertCloudEventRecordWithMappingOfDataField() { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + config.put("fields.mapping", "data"); + transform.configure(config); + + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + + final SinkRecord convertedRecord = transform.apply(cloudEventRecord); + assertThat(convertedRecord).isNotNull(); + assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT); + assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(1); + assertThat(convertedRecord.valueSchema().field("data").schema()).isEqualTo(Schema.STRING_SCHEMA); + assertThat(convertedRecord.value()).isInstanceOf(Struct.class); + assertThat(((Struct) convertedRecord.value()).getString("data")).isNotBlank(); + checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord); + } + } + + @Test + @FixFor("DBZ-7065") + void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames() { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + config.put("fields.mapping", "id,source:created_by,specversion:ce_spec_number,type,time:created_at,datacontenttype:payload_format,data:payload"); + transform.configure(config); + + final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); + + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + + final SinkRecord convertedRecord = transform.apply(cloudEventRecord); + assertThat(convertedRecord).isNotNull(); + assertThat(convertedRecord.valueSchema().type()).isEqualTo(Schema.Type.STRUCT); + assertThat(convertedRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(convertedRecord.value()).isInstanceOf(Struct.class); + Struct convertedRecordValue = (Struct) convertedRecord.value(); + assertThat(convertedRecordValue.getString("id")).isNotBlank(); + assertThat(convertedRecordValue.getString("created_by")).isNotBlank(); + assertThat(convertedRecordValue.getString("ce_spec_number")).isNotBlank(); + assertThat(convertedRecordValue.getString("type")).isNotBlank(); + assertThat(convertedRecordValue.getString("created_at")).isNotBlank(); + assertThat(convertedRecordValue.getString("payload_format")).isNotBlank(); + assertThat(convertedRecordValue.getString("payload")).isNotBlank(); + checkParamsOfOriginalAndConvertedRecordsAreEqual(convertedRecord, cloudEventRecord); + } + } + + private void checkParamsOfOriginalAndConvertedRecordsAreEqual(SinkRecord original, SinkRecord converted) { + assertThat(converted.topic()).isEqualTo(original.topic()); + assertThat(converted.kafkaPartition()).isEqualTo(original.originalKafkaPartition()); + assertThat(converted.kafkaOffset()).isEqualTo(original.originalKafkaOffset()); + assertThat(converted.keySchema()).isEqualTo(original.keySchema()); + assertThat(converted.key()).isEqualTo(original.key()); + assertThat(converted.headers()).isEqualTo(original.headers()); + assertThat(converted.timestamp()).isEqualTo(original.timestamp()); + } +} diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java index 8cc40dec4..0393e5589 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java @@ -6,14 +6,18 @@ package io.debezium.connector.jdbc.util; import java.time.Instant; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import org.apache.kafka.common.Uuid; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.data.Envelope; import io.debezium.util.Strings; @@ -47,6 +51,10 @@ public static SinkRecordTypeBuilder truncate() { return new SinkRecordTypeBuilder(Type.TRUNCATE); } + public static SinkRecordTypeBuilder cloudEvent() { + return new SinkRecordTypeBuilder(Type.CLOUD_EVENT); + } + public static class SinkRecordTypeBuilder { private final Type type; @@ -58,6 +66,7 @@ public static class SinkRecordTypeBuilder { private Schema sourceSchema; private int partition; private int offset; + private SinkRecord basicRecord; private Map keyValues = new HashMap<>(); private Map beforeValues = new HashMap<>(); private Map afterValues = new HashMap<>(); @@ -127,6 +136,11 @@ public SinkRecordTypeBuilder offset(int offset) { return this; } + public SinkRecordTypeBuilder basicRecord(SinkRecord basicRecord) { + this.basicRecord = basicRecord; + return this; + } + public SinkRecord build() { switch (type) { case CREATE: @@ -139,6 +153,8 @@ public SinkRecord build() { return buildTombstoneSinkRecord(); case TRUNCATE: return buildTruncateSinkRecord(); + case CLOUD_EVENT: + return buildCloudEventRecord(); } return null; } @@ -215,6 +231,32 @@ private SinkRecord buildTruncateSinkRecord() { } } + private SinkRecord buildCloudEventRecord() { + final SchemaBuilder schemaBuilder = SchemaBuilder.struct() + .name("test.CloudEvents.Envelope") + .field(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.TIME, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA) + .field(CloudEventsMaker.FieldName.DATA, basicRecord.valueSchema()); + + Schema ceSchema = schemaBuilder.build(); + + Struct ceValue = new Struct(ceSchema); + ceValue.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString()); + ceValue.put(CloudEventsMaker.FieldName.SOURCE, "test"); + ceValue.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0"); + ceValue.put(CloudEventsMaker.FieldName.TYPE, "TestType"); + ceValue.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString()); + ceValue.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json"); + ceValue.put(CloudEventsMaker.FieldName.DATA, basicRecord.value()); + + return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(), ceSchema, ceValue, + basicRecord.kafkaOffset()); + } + private Envelope createEnvelope() { return Envelope.defineSchema() .withRecord(recordSchema) @@ -243,6 +285,7 @@ private enum Type { UPDATE, DELETE, TOMBSTONE, - TRUNCATE + TRUNCATE, + CLOUD_EVENT } } diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java index 6c9fa1f96..c6c840d6b 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java @@ -404,4 +404,10 @@ default SinkRecord truncateRecord(String topicName) { .build(); } + default SinkRecord cloudEventRecord(String topicName) { + final SinkRecord basicRecord = updateRecord(topicName); + return SinkRecordBuilder.cloudEvent() + .basicRecord(basicRecord) + .build(); + } }