From 3b8645a5b87968fe6dfa8a5e307d42f4106878d4 Mon Sep 17 00:00:00 2001 From: Roman Kudryashov Date: Sat, 11 Nov 2023 06:51:57 +0300 Subject: [PATCH] DBZ-7130 `ConvertCloudEventToSaveableForm` can work with cloud events deserialized from JSON --- .../ConvertCloudEventToSaveableForm.java | 125 ++++++++++++++++-- ...ctJdbcSinkSaveConvertedCloudEventTest.java | 43 +++++- .../ConvertCloudEventToSaveableFormTest.java | 93 ++++++++----- .../jdbc/util/SinkRecordBuilder.java | 71 ++++++++-- .../jdbc/util/SinkRecordFactory.java | 5 +- 5 files changed, 286 insertions(+), 51 deletions(-) diff --git a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableForm.java b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableForm.java index 52dcf21a5..e700d7378 100644 --- a/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableForm.java +++ b/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableForm.java @@ -11,11 +11,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.sink.SinkRecord; @@ -26,6 +29,7 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.converters.spi.CloudEventsMaker; +import io.debezium.converters.spi.SerializerType; import io.debezium.transforms.outbox.AdditionalFieldsValidator; /** @@ -49,14 +53,29 @@ public class ConvertCloudEventToSaveableForm implements Transformation fieldsMapping; + private SerializerType serializerType; + private final JsonConverter jsonDataConverter = new JsonConverter(); + private final Set cloudEventsSpecRequiredFields = Set.of(CloudEventsMaker.FieldName.ID, CloudEventsMaker.FieldName.SOURCE, + CloudEventsMaker.FieldName.SPECVERSION, + CloudEventsMaker.FieldName.TYPE); + + private final Map cloudEventsFieldToColumnSchema = new HashMap<>(); + @Override public ConfigDef config() { final ConfigDef config = new ConfigDef(); - Field.group(config, null, FIELDS_MAPPING); + Field.group(config, null, FIELDS_MAPPING, SERIALIZER_TYPE); return config; } @@ -67,10 +86,24 @@ public void configure(final Map configs) { final List rawFieldsMapping = config.getList(FIELDS_MAPPING); fieldsMapping = Collections.unmodifiableMap(parseFieldsMapping(rawFieldsMapping)); + serializerType = SerializerType.withName(config.getString(SERIALIZER_TYPE)); + if (serializerType == null) { + throw new ConfigException(SERIALIZER_TYPE.name(), serializerType, "Serialization/deserialization type of CloudEvents converter is required"); + } + Map jsonDataConverterConfig = new HashMap<>(); jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value"); jsonDataConverter.configure(jsonDataConverterConfig); + + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATASCHEMA, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.TIME, Schema.STRING_SCHEMA); + cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATA, Schema.STRING_SCHEMA); } private Map parseFieldsMapping(List rawFieldsMapping) { @@ -92,12 +125,19 @@ private Map parseFieldsMapping(List rawFieldsMapping) { @Override public SinkRecord apply(final SinkRecord record) { - if (record == null || !record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX) || fieldsMapping.isEmpty()) { + if (record == null || !isCloudEvent(record) || fieldsMapping.isEmpty()) { return record; } - final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA); - final boolean cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT; + final boolean cloudEventContainsDataAsStruct; + if (serializerType == SerializerType.JSON) { + Object dataFieldValue = getCloudEventFieldsMap(record).get(CloudEventsMaker.FieldName.DATA); + cloudEventContainsDataAsStruct = dataFieldValue instanceof Struct; + } + else { + final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA); + cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT; + } final Schema newSchema = getSchema(record, cloudEventContainsDataAsStruct); final Struct newValue = getValue(record, newSchema, cloudEventContainsDataAsStruct); @@ -113,36 +153,105 @@ public SinkRecord apply(final SinkRecord record) { record.headers()); } + private boolean isCloudEvent(SinkRecord record) { + if (serializerType == SerializerType.JSON) { + boolean valueIsMap = record.value() instanceof Map; + if (valueIsMap && record.valueSchema() == null) { + final Map cloudEventMap = getCloudEventFieldsMap(record); + return cloudEventMap.size() >= 4 && cloudEventsSpecRequiredFields.stream().allMatch(cloudEventMap::containsKey); + } + return false; + } + else { + return record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX); + } + } + + private Map getCloudEventFieldsMap(SinkRecord record) { + return (Map) record.value(); + } + private Schema getSchema(SinkRecord record, boolean cloudEventContainsDataAsStruct) { + Map cloudEventMap = null; + if (serializerType == SerializerType.JSON) { + cloudEventMap = getCloudEventFieldsMap(record); + } + final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); for (Map.Entry fieldMapping : fieldsMapping.entrySet()) { final String cloudEventFieldName = fieldMapping.getKey(); final String databaseColumnName = fieldMapping.getValue(); - final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName); + + final Schema cloudEventFieldSchema; + if (serializerType == SerializerType.JSON) { + Object cloudEventFieldValue = cloudEventMap.get(cloudEventFieldName); + if (cloudEventFieldValue == null) { + // set default schemas + cloudEventFieldSchema = cloudEventsFieldToColumnSchema.get(cloudEventFieldName); + } + else { + cloudEventFieldSchema = determineCloudEventFieldSchema(cloudEventFieldValue); + } + } + else { + final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName); + cloudEventFieldSchema = cloudEventField.schema(); + } + final Schema databaseColumnSchema; if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) { databaseColumnSchema = Schema.STRING_SCHEMA; } else { - databaseColumnSchema = cloudEventField.schema(); + databaseColumnSchema = cloudEventFieldSchema; } schemaBuilder.field(databaseColumnName, databaseColumnSchema); } return schemaBuilder.build(); } + private Schema determineCloudEventFieldSchema(Object cloudEventFieldValue) { + final Schema cloudEventFieldSchema; + if (cloudEventFieldValue instanceof String) { + cloudEventFieldSchema = Schema.STRING_SCHEMA; + } + else if (cloudEventFieldValue instanceof Struct) { + cloudEventFieldSchema = ((Struct) cloudEventFieldValue).schema(); + } + else { + throw new DataException("Unsupported type of CloudEvent field: " + cloudEventFieldValue.getClass()); + } + return cloudEventFieldSchema; + } + private Struct getValue(SinkRecord record, Schema schema, boolean cloudEventContainsDataAsStruct) { + Map cloudEventMap = null; + Struct cloudEventStruct = null; + if (serializerType == SerializerType.JSON) { + cloudEventMap = getCloudEventFieldsMap(record); + } + else { + cloudEventStruct = requireStruct(record.value(), "convert cloud event"); + } + final Struct struct = new Struct(schema); - final Struct cloudEvent = requireStruct(record.value(), "convert cloud event"); for (Map.Entry fieldMapping : fieldsMapping.entrySet()) { final String cloudEventFieldName = fieldMapping.getKey(); final String databaseColumnName = fieldMapping.getValue(); - Object fieldValue = cloudEvent.get(cloudEventFieldName); + + Object fieldValue; + if (serializerType == SerializerType.JSON) { + fieldValue = cloudEventMap.get(cloudEventFieldName); + } + else { + fieldValue = cloudEventStruct.get(cloudEventFieldName); + } if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) { final Struct data = (Struct) fieldValue; final byte[] dataInJson = jsonDataConverter.fromConnectData(null, data.schema(), data); fieldValue = new String(dataInJson); } + struct.put(databaseColumnName, fieldValue); } return struct; diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSaveConvertedCloudEventTest.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSaveConvertedCloudEventTest.java index e8479a663..b06ad9b80 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSaveConvertedCloudEventTest.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSaveConvertedCloudEventTest.java @@ -27,6 +27,7 @@ import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider; import io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm; import io.debezium.connector.jdbc.util.SinkRecordFactory; +import io.debezium.converters.spi.SerializerType; /** * Common converted CloudEvent saving tests. @@ -41,10 +42,11 @@ public AbstractJdbcSinkSaveConvertedCloudEventTest(Sink sink) { @ParameterizedTest @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) - public void testSaveConvertedCloudEventRecord(SinkRecordFactory factory) { + public void testSaveConvertedCloudEventRecordFromJson(SinkRecordFactory factory) { final ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm(); final Map config = new HashMap<>(); config.put("fields.mapping", "id,source:created_by,data:payload"); + config.put("serializer.type", "json"); transform.configure(config); final Map properties = getDefaultSinkConfig(); @@ -57,7 +59,44 @@ public void testSaveConvertedCloudEventRecord(SinkRecordFactory factory) { final String tableName = randomTableName(); final String topicName = topicName("server1", "schema", tableName); - final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName); + final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName, SerializerType.withName("json")); + final SinkRecord convertedRecord = transform.apply(cloudEventRecord); + consume(convertedRecord); + + final String destinationTableName = destinationTableName(convertedRecord); + + final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName); + tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(3); + + getSink().assertColumnType(tableAssert, "id", ValueType.TEXT); + getSink().assertColumnType(tableAssert, "created_by", ValueType.TEXT, "test_ce_source"); + getSink().assertColumnType(tableAssert, "payload", ValueType.TEXT); + + assertHasPrimaryKeyColumns(destinationTableName, "id"); + + transform.close(); + } + + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + public void testSaveConvertedCloudEventRecordFromAvro(SinkRecordFactory factory) { + final ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm(); + final Map config = new HashMap<>(); + config.put("fields.mapping", "id,source:created_by,data:payload"); + config.put("serializer.type", "avro"); + transform.configure(config); + + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_VALUE.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS, "id"); + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server1", "schema", tableName); + + final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName, SerializerType.withName("avro")); final SinkRecord convertedRecord = transform.apply(cloudEventRecord); consume(convertedRecord); diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableFormTest.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableFormTest.java index 21b5b4ab0..dc106635c 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableFormTest.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/transforms/ConvertCloudEventToSaveableFormTest.java @@ -10,13 +10,18 @@ import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory; import io.debezium.connector.jdbc.util.SinkRecordFactory; +import io.debezium.converters.spi.SerializerType; import io.debezium.doc.FixFor; /** @@ -27,10 +32,22 @@ class ConvertCloudEventToSaveableFormTest { @Test - @FixFor("DBZ-7065") - void testConvertNotCloudEventRecord() { + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertCloudEventRecordWithEmptyConfig() { try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { final Map config = new HashMap<>(); + Assert.assertThrows("Invalid value null for configuration serializer.type: Serialization/deserialization type of CloudEvents converter is required", + ConfigException.class, () -> transform.configure(config)); + } + } + + @ParameterizedTest + @ValueSource(strings = { "json", "avro" }) + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertNotCloudEventRecord(String serializerType) { + try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { + final Map config = new HashMap<>(); + config.put("serializer.type", serializerType); transform.configure(config); final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); @@ -43,39 +60,47 @@ void testConvertNotCloudEventRecord() { } } - @Test - @FixFor("DBZ-7065") - void testConvertCloudEventRecordWithEmptyConfig() { + @ParameterizedTest + @ValueSource(strings = { "json", "avro" }) + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertCloudEventRecordWithEmptyMapping(String serializerType) { try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { final Map config = new HashMap<>(); + config.put("serializer.type", serializerType); transform.configure(config); final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); - final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); - assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); - assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); - assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType)); + if (serializerType.equals("avro")) { + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + } final SinkRecord convertedRecord = transform.apply(cloudEventRecord); assertThat(convertedRecord).isEqualTo(cloudEventRecord); } } - @Test - @FixFor("DBZ-7065") - void testConvertCloudEventRecordWithMappingOfIdField() { + @ParameterizedTest + @ValueSource(strings = { "json", "avro" }) + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertCloudEventRecordWithMappingOfIdField(String serializerType) { try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { final Map config = new HashMap<>(); config.put("fields.mapping", "id"); + config.put("serializer.type", serializerType); transform.configure(config); final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); - final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); - assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); - assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); - assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType)); + if (serializerType.equals("avro")) { + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA); + } final SinkRecord convertedRecord = transform.apply(cloudEventRecord); assertThat(convertedRecord).isNotNull(); @@ -89,20 +114,24 @@ void testConvertCloudEventRecordWithMappingOfIdField() { } } - @Test - @FixFor("DBZ-7065") - void testConvertCloudEventRecordWithMappingOfDataField() { + @ParameterizedTest + @ValueSource(strings = { "json", "avro" }) + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertCloudEventRecordWithMappingOfDataField(String serializerType) { try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { final Map config = new HashMap<>(); config.put("fields.mapping", "data"); + config.put("serializer.type", serializerType); transform.configure(config); final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); - final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); - assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); - assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); - assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType)); + if (serializerType.equals("avro")) { + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + } final SinkRecord convertedRecord = transform.apply(cloudEventRecord); assertThat(convertedRecord).isNotNull(); @@ -116,20 +145,24 @@ void testConvertCloudEventRecordWithMappingOfDataField() { } } - @Test - @FixFor("DBZ-7065") - void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames() { + @ParameterizedTest + @ValueSource(strings = { "json", "avro" }) + @FixFor({ "DBZ-7065", "DBZ-7130" }) + void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames(String serializerType) { try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) { final Map config = new HashMap<>(); config.put("fields.mapping", "id,source:created_by,specversion:ce_spec_number,type,time:created_at,datacontenttype:payload_format,data:payload"); + config.put("serializer.type", serializerType); transform.configure(config); final SinkRecordFactory factory = new DebeziumSinkRecordFactory(); - final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic"); - assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); - assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); - assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType)); + if (serializerType.equals("avro")) { + assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope"); + assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7); + assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT); + } final SinkRecord convertedRecord = transform.apply(cloudEventRecord); assertThat(convertedRecord).isNotNull(); diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java index cb9c7349b..390813fe4 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordBuilder.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.jdbc.util; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; @@ -15,9 +16,16 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.sink.SinkRecord; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.debezium.converters.spi.CloudEventsMaker; +import io.debezium.converters.spi.SerializerType; import io.debezium.data.Envelope; import io.debezium.util.Strings; @@ -67,6 +75,7 @@ public static class SinkRecordTypeBuilder { private int partition; private int offset; private SinkRecord basicRecord; + private SerializerType cloudEventSerializerType; private Map keyValues = new HashMap<>(); private Map beforeValues = new HashMap<>(); private Map afterValues = new HashMap<>(); @@ -141,6 +150,11 @@ public SinkRecordTypeBuilder basicRecord(SinkRecord basicRecord) { return this; } + public SinkRecordTypeBuilder cloudEventSerializerType(SerializerType serializerType) { + this.cloudEventSerializerType = serializerType; + return this; + } + public SinkRecord build() { switch (type) { case CREATE: @@ -244,17 +258,27 @@ private SinkRecord buildCloudEventRecord() { Schema ceSchema = schemaBuilder.build(); - Struct ceValue = new Struct(ceSchema); - ceValue.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString()); - ceValue.put(CloudEventsMaker.FieldName.SOURCE, "test_ce_source"); - ceValue.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0"); - ceValue.put(CloudEventsMaker.FieldName.TYPE, "TestType"); - ceValue.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString()); - ceValue.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json"); - ceValue.put(CloudEventsMaker.FieldName.DATA, basicRecord.value()); + Struct ceValueStruct = new Struct(ceSchema); + ceValueStruct.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString()); + ceValueStruct.put(CloudEventsMaker.FieldName.SOURCE, "test_ce_source"); + ceValueStruct.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0"); + ceValueStruct.put(CloudEventsMaker.FieldName.TYPE, "TestType"); + ceValueStruct.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString()); + ceValueStruct.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json"); + ceValueStruct.put(CloudEventsMaker.FieldName.DATA, basicRecord.value()); - return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(), ceSchema, ceValue, - basicRecord.kafkaOffset()); + final Object ceValue; + if (cloudEventSerializerType == SerializerType.JSON) { + ceValue = convertCloudEventToMap(ceSchema, ceValueStruct); + ceSchema = null; + } + else { + ceValue = ceValueStruct; + } + + return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(), + ceSchema, ceValue, + basicRecord.kafkaOffset(), basicRecord.timestamp(), basicRecord.timestampType(), basicRecord.headers()); } private Envelope createEnvelope() { @@ -278,6 +302,33 @@ private Struct populateStructForKey() { } return null; } + + private Map convertCloudEventToMap(Schema ceSchema, Struct ceValueStruct) { + byte[] cloudEventJson; + try (JsonConverter jsonConverter = new JsonConverter()) { + final Map jsonDataConverterConfig = new HashMap<>(); + jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false); + jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value"); + jsonConverter.configure(jsonDataConverterConfig); + + cloudEventJson = jsonConverter.fromConnectData(null, ceSchema, ceValueStruct); + } + + ObjectMapper objectMapper = new ObjectMapper(); + Map map; + try { + map = objectMapper.readValue(cloudEventJson, new TypeReference<>() { + }); + } + catch (IOException e) { + throw new DataException("Failed to instantiate map from CloudEvent JSON"); + } + final Object dataMap = map.get(CloudEventsMaker.FieldName.DATA); + if (dataMap != null) { + map.put(CloudEventsMaker.FieldName.DATA, dataMap.toString()); + } + return map; + } } private enum Type { diff --git a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java index e91f89911..4b1eda3ef 100644 --- a/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java +++ b/debezium-connector-jdbc/src/test/java/io/debezium/connector/jdbc/util/SinkRecordFactory.java @@ -14,6 +14,8 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.sink.SinkRecord; +import io.debezium.converters.spi.SerializerType; + /** * @author Chris Cranford */ @@ -413,10 +415,11 @@ default SinkRecord truncateRecord(String topicName) { .build(); } - default SinkRecord cloudEventRecord(String topicName) { + default SinkRecord cloudEventRecord(String topicName, SerializerType serializerType) { final SinkRecord basicRecord = updateRecord(topicName); return SinkRecordBuilder.cloudEvent() .basicRecord(basicRecord) + .cloudEventSerializerType(serializerType) .build(); } }