DBZ-7130 ConvertCloudEventToSaveableForm can work with cloud events deserialized from JSON

This commit is contained in:
Roman Kudryashov 2023-11-11 06:51:57 +03:00 committed by Jiri Pechanec
parent 1cd501abd7
commit 3b8645a5b8
5 changed files with 286 additions and 51 deletions

View File

@ -11,11 +11,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
@ -26,6 +29,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.SerializerType;
import io.debezium.transforms.outbox.AdditionalFieldsValidator;
/**
@ -49,14 +53,29 @@ public class ConvertCloudEventToSaveableForm implements Transformation<SinkRecor
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Specifies a list of pairs with mappings between a CloudEvent's fields and names of database columns");
private static final Field SERIALIZER_TYPE = Field.create("serializer.type")
.withDisplayName("Specifies a serialization type a provided CloudEvent was serialized and deserialized with")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Specifies a serialization type a provided CloudEvent was serialized and deserialized with");
private Map<String, String> fieldsMapping;
private SerializerType serializerType;
private final JsonConverter jsonDataConverter = new JsonConverter();
private final Set<String> cloudEventsSpecRequiredFields = Set.of(CloudEventsMaker.FieldName.ID, CloudEventsMaker.FieldName.SOURCE,
CloudEventsMaker.FieldName.SPECVERSION,
CloudEventsMaker.FieldName.TYPE);
private final Map<String, Schema> cloudEventsFieldToColumnSchema = new HashMap<>();
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, FIELDS_MAPPING);
Field.group(config, null, FIELDS_MAPPING, SERIALIZER_TYPE);
return config;
}
@ -67,10 +86,24 @@ public void configure(final Map<String, ?> configs) {
final List<String> rawFieldsMapping = config.getList(FIELDS_MAPPING);
fieldsMapping = Collections.unmodifiableMap(parseFieldsMapping(rawFieldsMapping));
serializerType = SerializerType.withName(config.getString(SERIALIZER_TYPE));
if (serializerType == null) {
throw new ConfigException(SERIALIZER_TYPE.name(), serializerType, "Serialization/deserialization type of CloudEvents converter is required");
}
Map<String, Object> jsonDataConverterConfig = new HashMap<>();
jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value");
jsonDataConverter.configure(jsonDataConverterConfig);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATASCHEMA, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.TIME, Schema.STRING_SCHEMA);
cloudEventsFieldToColumnSchema.put(CloudEventsMaker.FieldName.DATA, Schema.STRING_SCHEMA);
}
private Map<String, String> parseFieldsMapping(List<String> rawFieldsMapping) {
@ -92,12 +125,19 @@ private Map<String, String> parseFieldsMapping(List<String> rawFieldsMapping) {
@Override
public SinkRecord apply(final SinkRecord record) {
if (record == null || !record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX) || fieldsMapping.isEmpty()) {
if (record == null || !isCloudEvent(record) || fieldsMapping.isEmpty()) {
return record;
}
final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA);
final boolean cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT;
final boolean cloudEventContainsDataAsStruct;
if (serializerType == SerializerType.JSON) {
Object dataFieldValue = getCloudEventFieldsMap(record).get(CloudEventsMaker.FieldName.DATA);
cloudEventContainsDataAsStruct = dataFieldValue instanceof Struct;
}
else {
final org.apache.kafka.connect.data.Field dataField = record.valueSchema().field(CloudEventsMaker.FieldName.DATA);
cloudEventContainsDataAsStruct = dataField != null && dataField.schema().type() == Schema.Type.STRUCT;
}
final Schema newSchema = getSchema(record, cloudEventContainsDataAsStruct);
final Struct newValue = getValue(record, newSchema, cloudEventContainsDataAsStruct);
@ -113,36 +153,105 @@ public SinkRecord apply(final SinkRecord record) {
record.headers());
}
private boolean isCloudEvent(SinkRecord record) {
if (serializerType == SerializerType.JSON) {
boolean valueIsMap = record.value() instanceof Map;
if (valueIsMap && record.valueSchema() == null) {
final Map<String, Object> cloudEventMap = getCloudEventFieldsMap(record);
return cloudEventMap.size() >= 4 && cloudEventsSpecRequiredFields.stream().allMatch(cloudEventMap::containsKey);
}
return false;
}
else {
return record.valueSchema().name().endsWith(CLOUD_EVENTS_SCHEMA_NAME_SUFFIX);
}
}
private Map<String, Object> getCloudEventFieldsMap(SinkRecord record) {
return (Map<String, Object>) record.value();
}
private Schema getSchema(SinkRecord record, boolean cloudEventContainsDataAsStruct) {
Map<String, Object> cloudEventMap = null;
if (serializerType == SerializerType.JSON) {
cloudEventMap = getCloudEventFieldsMap(record);
}
final SchemaBuilder schemaBuilder = SchemaBuilder.struct();
for (Map.Entry<String, String> fieldMapping : fieldsMapping.entrySet()) {
final String cloudEventFieldName = fieldMapping.getKey();
final String databaseColumnName = fieldMapping.getValue();
final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName);
final Schema cloudEventFieldSchema;
if (serializerType == SerializerType.JSON) {
Object cloudEventFieldValue = cloudEventMap.get(cloudEventFieldName);
if (cloudEventFieldValue == null) {
// set default schemas
cloudEventFieldSchema = cloudEventsFieldToColumnSchema.get(cloudEventFieldName);
}
else {
cloudEventFieldSchema = determineCloudEventFieldSchema(cloudEventFieldValue);
}
}
else {
final org.apache.kafka.connect.data.Field cloudEventField = record.valueSchema().field(cloudEventFieldName);
cloudEventFieldSchema = cloudEventField.schema();
}
final Schema databaseColumnSchema;
if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) {
databaseColumnSchema = Schema.STRING_SCHEMA;
}
else {
databaseColumnSchema = cloudEventField.schema();
databaseColumnSchema = cloudEventFieldSchema;
}
schemaBuilder.field(databaseColumnName, databaseColumnSchema);
}
return schemaBuilder.build();
}
private Schema determineCloudEventFieldSchema(Object cloudEventFieldValue) {
final Schema cloudEventFieldSchema;
if (cloudEventFieldValue instanceof String) {
cloudEventFieldSchema = Schema.STRING_SCHEMA;
}
else if (cloudEventFieldValue instanceof Struct) {
cloudEventFieldSchema = ((Struct) cloudEventFieldValue).schema();
}
else {
throw new DataException("Unsupported type of CloudEvent field: " + cloudEventFieldValue.getClass());
}
return cloudEventFieldSchema;
}
private Struct getValue(SinkRecord record, Schema schema, boolean cloudEventContainsDataAsStruct) {
Map<String, Object> cloudEventMap = null;
Struct cloudEventStruct = null;
if (serializerType == SerializerType.JSON) {
cloudEventMap = getCloudEventFieldsMap(record);
}
else {
cloudEventStruct = requireStruct(record.value(), "convert cloud event");
}
final Struct struct = new Struct(schema);
final Struct cloudEvent = requireStruct(record.value(), "convert cloud event");
for (Map.Entry<String, String> fieldMapping : fieldsMapping.entrySet()) {
final String cloudEventFieldName = fieldMapping.getKey();
final String databaseColumnName = fieldMapping.getValue();
Object fieldValue = cloudEvent.get(cloudEventFieldName);
Object fieldValue;
if (serializerType == SerializerType.JSON) {
fieldValue = cloudEventMap.get(cloudEventFieldName);
}
else {
fieldValue = cloudEventStruct.get(cloudEventFieldName);
}
if (cloudEventFieldName.equals(CloudEventsMaker.FieldName.DATA) && cloudEventContainsDataAsStruct) {
final Struct data = (Struct) fieldValue;
final byte[] dataInJson = jsonDataConverter.fromConnectData(null, data.schema(), data);
fieldValue = new String(dataInJson);
}
struct.put(databaseColumnName, fieldValue);
}
return struct;

View File

@ -27,6 +27,7 @@
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.converters.spi.SerializerType;
/**
* Common converted CloudEvent saving tests.
@ -41,10 +42,11 @@ public AbstractJdbcSinkSaveConvertedCloudEventTest(Sink sink) {
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
public void testSaveConvertedCloudEventRecord(SinkRecordFactory factory) {
public void testSaveConvertedCloudEventRecordFromJson(SinkRecordFactory factory) {
final ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm();
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id,source:created_by,data:payload");
config.put("serializer.type", "json");
transform.configure(config);
final Map<String, String> properties = getDefaultSinkConfig();
@ -57,7 +59,44 @@ public void testSaveConvertedCloudEventRecord(SinkRecordFactory factory) {
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName);
final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName, SerializerType.withName("json"));
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
consume(convertedRecord);
final String destinationTableName = destinationTableName(convertedRecord);
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName);
tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
getSink().assertColumnType(tableAssert, "id", ValueType.TEXT);
getSink().assertColumnType(tableAssert, "created_by", ValueType.TEXT, "test_ce_source");
getSink().assertColumnType(tableAssert, "payload", ValueType.TEXT);
assertHasPrimaryKeyColumns(destinationTableName, "id");
transform.close();
}
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
public void testSaveConvertedCloudEventRecordFromAvro(SinkRecordFactory factory) {
final ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm();
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id,source:created_by,data:payload");
config.put("serializer.type", "avro");
transform.configure(config);
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_VALUE.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS, "id");
startSinkConnector(properties);
assertSinkConnectorIsRunning();
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
final SinkRecord cloudEventRecord = factory.cloudEventRecord(topicName, SerializerType.withName("avro"));
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
consume(convertedRecord);

View File

@ -10,13 +10,18 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.converters.spi.SerializerType;
import io.debezium.doc.FixFor;
/**
@ -27,10 +32,22 @@
class ConvertCloudEventToSaveableFormTest {
@Test
@FixFor("DBZ-7065")
void testConvertNotCloudEventRecord() {
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertCloudEventRecordWithEmptyConfig() {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
Assert.assertThrows("Invalid value null for configuration serializer.type: Serialization/deserialization type of CloudEvents converter is required",
ConfigException.class, () -> transform.configure(config));
}
}
@ParameterizedTest
@ValueSource(strings = { "json", "avro" })
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertNotCloudEventRecord(String serializerType) {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("serializer.type", serializerType);
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
@ -43,39 +60,47 @@ void testConvertNotCloudEventRecord() {
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithEmptyConfig() {
@ParameterizedTest
@ValueSource(strings = { "json", "avro" })
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertCloudEventRecordWithEmptyMapping(String serializerType) {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("serializer.type", serializerType);
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType));
if (serializerType.equals("avro")) {
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
}
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isEqualTo(cloudEventRecord);
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfIdField() {
@ParameterizedTest
@ValueSource(strings = { "json", "avro" })
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertCloudEventRecordWithMappingOfIdField(String serializerType) {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id");
config.put("serializer.type", serializerType);
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType));
if (serializerType.equals("avro")) {
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("id").schema()).isEqualTo(Schema.STRING_SCHEMA);
}
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();
@ -89,20 +114,24 @@ void testConvertCloudEventRecordWithMappingOfIdField() {
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfDataField() {
@ParameterizedTest
@ValueSource(strings = { "json", "avro" })
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertCloudEventRecordWithMappingOfDataField(String serializerType) {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "data");
config.put("serializer.type", serializerType);
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType));
if (serializerType.equals("avro")) {
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
}
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();
@ -116,20 +145,24 @@ void testConvertCloudEventRecordWithMappingOfDataField() {
}
}
@Test
@FixFor("DBZ-7065")
void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames() {
@ParameterizedTest
@ValueSource(strings = { "json", "avro" })
@FixFor({ "DBZ-7065", "DBZ-7130" })
void testConvertCloudEventRecordWithMappingOfAllFieldsWithCustomNames(String serializerType) {
try (ConvertCloudEventToSaveableForm transform = new ConvertCloudEventToSaveableForm()) {
final Map<String, String> config = new HashMap<>();
config.put("fields.mapping", "id,source:created_by,specversion:ce_spec_number,type,time:created_at,datacontenttype:payload_format,data:payload");
config.put("serializer.type", serializerType);
transform.configure(config);
final SinkRecordFactory factory = new DebeziumSinkRecordFactory();
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic");
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
final SinkRecord cloudEventRecord = factory.cloudEventRecord("test.topic", SerializerType.withName(serializerType));
if (serializerType.equals("avro")) {
assertThat(cloudEventRecord.valueSchema().name()).endsWith(".CloudEvents.Envelope");
assertThat(cloudEventRecord.valueSchema().fields().size()).isEqualTo(7);
assertThat(cloudEventRecord.valueSchema().field("data").schema().type()).isEqualTo(Schema.Type.STRUCT);
}
final SinkRecord convertedRecord = transform.apply(cloudEventRecord);
assertThat(convertedRecord).isNotNull();

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.jdbc.util;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
@ -15,9 +16,16 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.SerializerType;
import io.debezium.data.Envelope;
import io.debezium.util.Strings;
@ -67,6 +75,7 @@ public static class SinkRecordTypeBuilder {
private int partition;
private int offset;
private SinkRecord basicRecord;
private SerializerType cloudEventSerializerType;
private Map<String, Object> keyValues = new HashMap<>();
private Map<String, Object> beforeValues = new HashMap<>();
private Map<String, Object> afterValues = new HashMap<>();
@ -141,6 +150,11 @@ public SinkRecordTypeBuilder basicRecord(SinkRecord basicRecord) {
return this;
}
public SinkRecordTypeBuilder cloudEventSerializerType(SerializerType serializerType) {
this.cloudEventSerializerType = serializerType;
return this;
}
public SinkRecord build() {
switch (type) {
case CREATE:
@ -244,17 +258,27 @@ private SinkRecord buildCloudEventRecord() {
Schema ceSchema = schemaBuilder.build();
Struct ceValue = new Struct(ceSchema);
ceValue.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString());
ceValue.put(CloudEventsMaker.FieldName.SOURCE, "test_ce_source");
ceValue.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0");
ceValue.put(CloudEventsMaker.FieldName.TYPE, "TestType");
ceValue.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString());
ceValue.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json");
ceValue.put(CloudEventsMaker.FieldName.DATA, basicRecord.value());
Struct ceValueStruct = new Struct(ceSchema);
ceValueStruct.put(CloudEventsMaker.FieldName.ID, Uuid.randomUuid().toString());
ceValueStruct.put(CloudEventsMaker.FieldName.SOURCE, "test_ce_source");
ceValueStruct.put(CloudEventsMaker.FieldName.SPECVERSION, "1.0");
ceValueStruct.put(CloudEventsMaker.FieldName.TYPE, "TestType");
ceValueStruct.put(CloudEventsMaker.FieldName.TIME, LocalDateTime.now().toString());
ceValueStruct.put(CloudEventsMaker.FieldName.DATACONTENTTYPE, "application/json");
ceValueStruct.put(CloudEventsMaker.FieldName.DATA, basicRecord.value());
return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(), ceSchema, ceValue,
basicRecord.kafkaOffset());
final Object ceValue;
if (cloudEventSerializerType == SerializerType.JSON) {
ceValue = convertCloudEventToMap(ceSchema, ceValueStruct);
ceSchema = null;
}
else {
ceValue = ceValueStruct;
}
return new SinkRecord(basicRecord.topic(), basicRecord.kafkaPartition(), basicRecord.keySchema(), basicRecord.key(),
ceSchema, ceValue,
basicRecord.kafkaOffset(), basicRecord.timestamp(), basicRecord.timestampType(), basicRecord.headers());
}
private Envelope createEnvelope() {
@ -278,6 +302,33 @@ private Struct populateStructForKey() {
}
return null;
}
private Map<String, Object> convertCloudEventToMap(Schema ceSchema, Struct ceValueStruct) {
byte[] cloudEventJson;
try (JsonConverter jsonConverter = new JsonConverter()) {
final Map<String, Object> jsonDataConverterConfig = new HashMap<>();
jsonDataConverterConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
jsonDataConverterConfig.put(JsonConverterConfig.TYPE_CONFIG, "value");
jsonConverter.configure(jsonDataConverterConfig);
cloudEventJson = jsonConverter.fromConnectData(null, ceSchema, ceValueStruct);
}
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map;
try {
map = objectMapper.readValue(cloudEventJson, new TypeReference<>() {
});
}
catch (IOException e) {
throw new DataException("Failed to instantiate map from CloudEvent JSON");
}
final Object dataMap = map.get(CloudEventsMaker.FieldName.DATA);
if (dataMap != null) {
map.put(CloudEventsMaker.FieldName.DATA, dataMap.toString());
}
return map;
}
}
private enum Type {

View File

@ -14,6 +14,8 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import io.debezium.converters.spi.SerializerType;
/**
* @author Chris Cranford
*/
@ -413,10 +415,11 @@ default SinkRecord truncateRecord(String topicName) {
.build();
}
default SinkRecord cloudEventRecord(String topicName) {
default SinkRecord cloudEventRecord(String topicName, SerializerType serializerType) {
final SinkRecord basicRecord = updateRecord(topicName);
return SinkRecordBuilder.cloudEvent()
.basicRecord(basicRecord)
.cloudEventSerializerType(serializerType)
.build();
}
}