DBZ-7016 Change metadata.location property syntax

This commit is contained in:
Roman Kudryashov 2023-10-14 20:26:28 +03:00 committed by Jiri Pechanec
parent ee64c70546
commit 22f6b083b1
7 changed files with 119 additions and 238 deletions

View File

@ -115,7 +115,7 @@ public void shouldConvertToCloudEventsInJsonWithoutExtensionAttributes() throws
@Test @Test
@FixFor({ "DBZ-3642" }) @FixFor({ "DBZ-3642" })
public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAfterOutboxEventRouter() throws Exception { public void shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeadersAfterOutboxEventRouter() throws Exception {
HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>(); HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>();
Map<String, String> headerFromConfig = new LinkedHashMap<>(); Map<String, String> headerFromConfig = new LinkedHashMap<>();
headerFromConfig.put("fields", "source,op,transaction"); headerFromConfig.put("fields", "source,op,transaction");
@ -157,7 +157,7 @@ public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAft
assertThat(routedEvent.key()).isEqualTo("10711fa5"); assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(Struct.class); assertThat(routedEvent.value()).isInstanceOf(Struct.class);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeaders(routedEvent, "mongodb", "mongo1"); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(routedEvent, "mongodb", "mongo1");
headerFrom.close(); headerFrom.close();
outboxEventRouter.close(); outboxEventRouter.close();
@ -165,7 +165,7 @@ public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAft
@Test @Test
@FixFor({ "DBZ-7016" }) @FixFor({ "DBZ-7016" })
public void shouldConvertToCloudEventsInJsonWithTypeInHeader() throws Exception { public void shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader() throws Exception {
InsertHeader<SourceRecord> insertHeader = new InsertHeader<>(); InsertHeader<SourceRecord> insertHeader = new InsertHeader<>();
Map<String, String> insertHeaderConfig = new LinkedHashMap<>(); Map<String, String> insertHeaderConfig = new LinkedHashMap<>();
insertHeaderConfig.put("header", "type"); insertHeaderConfig.put("header", "type");
@ -188,7 +188,7 @@ public void shouldConvertToCloudEventsInJsonWithTypeInHeader() throws Exception
assertThat(recordWithTypeInHeader).isNotNull(); assertThat(recordWithTypeInHeader).isNotNull();
assertThat(recordWithTypeInHeader.value()).isInstanceOf(Struct.class); assertThat(recordWithTypeInHeader.value()).isInstanceOf(Struct.class);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithTypeInHeader(recordWithTypeInHeader, "mongodb", "mongo1"); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader(recordWithTypeInHeader, "mongodb", "mongo1");
insertHeader.close(); insertHeader.close();
} }

View File

@ -48,10 +48,8 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.Instantiator; import io.debezium.config.Instantiator;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.converters.CloudEventsConverterConfig.IdSource;
import io.debezium.converters.CloudEventsConverterConfig.MetadataLocation; import io.debezium.converters.CloudEventsConverterConfig.MetadataLocation;
import io.debezium.converters.CloudEventsConverterConfig.TypeSource; import io.debezium.converters.CloudEventsConverterConfig.MetadataLocationValue;
import io.debezium.converters.recordandmetadata.CloudEventFieldsSources;
import io.debezium.converters.recordandmetadata.RecordAndMetadata; import io.debezium.converters.recordandmetadata.RecordAndMetadata;
import io.debezium.converters.recordandmetadata.RecordAndMetadataBaseImpl; import io.debezium.converters.recordandmetadata.RecordAndMetadataBaseImpl;
import io.debezium.converters.recordandmetadata.RecordAndMetadataHeaderImpl; import io.debezium.converters.recordandmetadata.RecordAndMetadataHeaderImpl;
@ -157,8 +155,6 @@ public class CloudEventsConverter implements Converter {
private boolean extensionAttributesEnable; private boolean extensionAttributesEnable;
private IdSource idSource;
private TypeSource typeSource;
private MetadataLocation metadataLocation; private MetadataLocation metadataLocation;
public CloudEventsConverter() { public CloudEventsConverter() {
@ -181,8 +177,6 @@ public void configure(Map<String, ?> configs, boolean isKey) {
dataSerializerType = ceConfig.cloudeventsDataSerializerTypeConfig(); dataSerializerType = ceConfig.cloudeventsDataSerializerTypeConfig();
schemaNameAdjuster = ceConfig.schemaNameAdjustmentMode().createAdjuster(); schemaNameAdjuster = ceConfig.schemaNameAdjustmentMode().createAdjuster();
extensionAttributesEnable = ceConfig.extensionAttributesEnable(); extensionAttributesEnable = ceConfig.extensionAttributesEnable();
idSource = ceConfig.idSource();
typeSource = ceConfig.typeSource();
metadataLocation = ceConfig.metadataLocation(); metadataLocation = ceConfig.metadataLocation();
Map<String, Object> jsonHeaderConverterConfig = new HashMap<>(); Map<String, Object> jsonHeaderConverterConfig = new HashMap<>();
@ -255,7 +249,7 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
return null; return null;
} }
if (this.metadataLocation == MetadataLocation.VALUE) { if (this.metadataLocation.global() == MetadataLocationValue.VALUE) {
if (!Envelope.isEnvelopeSchema(schema)) { if (!Envelope.isEnvelopeSchema(schema)) {
// TODO Handling of non-data messages like schema change or transaction metadata // TODO Handling of non-data messages like schema change or transaction metadata
return null; return null;
@ -276,12 +270,13 @@ public byte[] fromConnectData(String topic, Headers headers, Schema schema, Obje
CloudEventsProvider provider = lookupCloudEventsProvider(source); CloudEventsProvider provider = lookupCloudEventsProvider(source);
RecordAndMetadata recordAndMetadata; RecordAndMetadata recordAndMetadata;
if (idSource == IdSource.HEADER || typeSource == TypeSource.HEADER || metadataLocation == MetadataLocation.HEADER) { boolean useBaseImpl = metadataLocation.global() != MetadataLocationValue.HEADER && metadataLocation.id() != MetadataLocationValue.HEADER
recordAndMetadata = new RecordAndMetadataHeaderImpl(record, schema, headers, jsonHeaderConverter, && metadataLocation.type() != MetadataLocationValue.HEADER;
new CloudEventFieldsSources(idSource, typeSource, metadataLocation)); if (useBaseImpl) {
recordAndMetadata = new RecordAndMetadataBaseImpl(record, schema);
} }
else { else {
recordAndMetadata = new RecordAndMetadataBaseImpl(record, schema); recordAndMetadata = new RecordAndMetadataHeaderImpl(record, schema, headers, metadataLocation, jsonHeaderConverter);
} }
RecordParser parser = provider.createParser(recordAndMetadata); RecordParser parser = provider.createParser(recordAndMetadata);
@ -341,7 +336,7 @@ private static CloudEventsProvider lookupCloudEventsProvider(Struct source) {
} }
private Struct getSource(Struct record, Headers headers) { private Struct getSource(Struct record, Headers headers) {
if (this.metadataLocation == MetadataLocation.VALUE) { if (this.metadataLocation.global() == MetadataLocationValue.VALUE) {
return record.getStruct(Envelope.FieldName.SOURCE); return record.getStruct(Envelope.FieldName.SOURCE);
} }
else { else {
@ -505,8 +500,8 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
Schema ceSchema = ceSchemaBuilder.build(); Schema ceSchema = ceSchemaBuilder.build();
String ceId = this.idSource == IdSource.GENERATE ? maker.ceId() : parser.id(); String ceId = this.metadataLocation.id() == MetadataLocationValue.GENERATE ? maker.ceId() : parser.id();
String ceType = this.typeSource == TypeSource.GENERATE ? maker.ceType() : parser.type(); String ceType = this.metadataLocation.type() == MetadataLocationValue.GENERATE ? maker.ceType() : parser.type();
// construct value of CloudEvents Envelope // construct value of CloudEvents Envelope
CEValueBuilder ceValueBuilder = withValue(ceSchema) CEValueBuilder ceValueBuilder = withValue(ceSchema)

View File

@ -5,13 +5,18 @@
*/ */
package io.debezium.converters; package io.debezium.converters;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterConfig;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode; import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.converters.spi.SerializerType; import io.debezium.converters.spi.SerializerType;
/** /**
@ -37,16 +42,8 @@ public class CloudEventsConverterConfig extends ConverterConfig {
+ "'avro' replaces the characters that cannot be used in the Avro type name with underscore (default)" + "'avro' replaces the characters that cannot be used in the Avro type name with underscore (default)"
+ "'none' does not apply any adjustment"; + "'none' does not apply any adjustment";
public static final String CLOUDEVENTS_ID_SOURCE_CONFIG = "id.source";
public static final String CLOUDEVENTS_ID_SOURCE_DEFAULT = "generate";
private static final String CLOUDEVENTS_ID_SOURCE_DOC = "Specify how to get id of CloudEvent";
public static final String CLOUDEVENTS_TYPE_SOURCE_CONFIG = "type.source";
public static final String CLOUDEVENTS_TYPE_SOURCE_DEFAULT = "generate";
private static final String CLOUDEVENTS_TYPE_SOURCE_DOC = "Specify how to get type of CloudEvent";
public static final String CLOUDEVENTS_METADATA_LOCATION_CONFIG = "metadata.location"; public static final String CLOUDEVENTS_METADATA_LOCATION_CONFIG = "metadata.location";
public static final String CLOUDEVENTS_METADATA_LOCATION_DEFAULT = "value"; public static final String CLOUDEVENTS_METADATA_LOCATION_DEFAULT = "value,id:generate,type:generate";
private static final String CLOUDEVENTS_METADATA_LOCATION_DOC = "Specify from where to retrieve metadata"; private static final String CLOUDEVENTS_METADATA_LOCATION_DOC = "Specify from where to retrieve metadata";
private static final ConfigDef CONFIG; private static final ConfigDef CONFIG;
@ -62,11 +59,7 @@ public class CloudEventsConverterConfig extends ConverterConfig {
CLOUDEVENTS_EXTENSION_ATTRIBUTES_ENABLE_DOC); CLOUDEVENTS_EXTENSION_ATTRIBUTES_ENABLE_DOC);
CONFIG.define(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DEFAULT, ConfigDef.Importance.LOW, CONFIG.define(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DEFAULT, ConfigDef.Importance.LOW,
CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DOC); CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_DOC);
CONFIG.define(CLOUDEVENTS_ID_SOURCE_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_ID_SOURCE_DEFAULT, ConfigDef.Importance.HIGH, CONFIG.define(CLOUDEVENTS_METADATA_LOCATION_CONFIG, ConfigDef.Type.LIST, CLOUDEVENTS_METADATA_LOCATION_DEFAULT, ConfigDef.Importance.HIGH,
CLOUDEVENTS_ID_SOURCE_DOC);
CONFIG.define(CLOUDEVENTS_TYPE_SOURCE_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_TYPE_SOURCE_DEFAULT, ConfigDef.Importance.HIGH,
CLOUDEVENTS_TYPE_SOURCE_DOC);
CONFIG.define(CLOUDEVENTS_METADATA_LOCATION_CONFIG, ConfigDef.Type.STRING, CLOUDEVENTS_METADATA_LOCATION_DEFAULT, ConfigDef.Importance.HIGH,
CLOUDEVENTS_METADATA_LOCATION_DOC); CLOUDEVENTS_METADATA_LOCATION_DOC);
} }
@ -114,129 +107,68 @@ public SchemaNameAdjustmentMode schemaNameAdjustmentMode() {
return SchemaNameAdjustmentMode.parse(getString(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG)); return SchemaNameAdjustmentMode.parse(getString(CLOUDEVENTS_SCHEMA_NAME_ADJUSTMENT_MODE_CONFIG));
} }
/**
* Return from where to retrieve id of a CloudEvent
*
* @return source of id field of a CloudEvent
*/
public IdSource idSource() {
return IdSource.parse(getString(CLOUDEVENTS_ID_SOURCE_CONFIG));
}
/**
* Return from where to retrieve type of a CloudEvent
*
* @return source of type field of a CloudEvent
*/
public TypeSource typeSource() {
return TypeSource.parse(getString(CLOUDEVENTS_TYPE_SOURCE_CONFIG));
}
/** /**
* Return from where to retrieve metadata * Return from where to retrieve metadata
* *
* @return metadata location * @return metadata location
*/ */
public MetadataLocation metadataLocation() { public MetadataLocation metadataLocation() {
return MetadataLocation.parse(getString(CLOUDEVENTS_METADATA_LOCATION_CONFIG)); List<String> metadataLocations = getList(CLOUDEVENTS_METADATA_LOCATION_CONFIG);
// get global metadata source
Set<MetadataLocationValue> globalMetadataLocationAllowedValues = Set.of(MetadataLocationValue.VALUE, MetadataLocationValue.HEADER);
MetadataLocationValue global = MetadataLocationValue.parse(metadataLocations.get(0));
if (!globalMetadataLocationAllowedValues.contains(global)) {
throw new ConfigException("Global metadata location can't be " + global.name());
}
// get sources for customizable fields
Set<String> cloudEventsFieldsAllowedToCustomizeSource = Set.of(CloudEventsMaker.FieldName.ID, CloudEventsMaker.FieldName.TYPE);
final Map<String, MetadataLocationValue> cloudEventsFieldsCustomSources = new HashMap<>();
for (int i = 1; i < metadataLocations.size(); i++) {
final String[] parts = metadataLocations.get(i).split(":");
final String fieldName = parts[0];
if (!cloudEventsFieldsAllowedToCustomizeSource.contains(fieldName)) {
throw new ConfigException("Field `" + fieldName + "` is not allowed to set custom source");
}
final MetadataLocationValue fieldSource = MetadataLocationValue.parse(parts[1]);
cloudEventsFieldsCustomSources.put(fieldName, fieldSource);
}
MetadataLocationValue idCustomSource = cloudEventsFieldsCustomSources.get(CloudEventsMaker.FieldName.ID);
MetadataLocationValue typeCustomSource = cloudEventsFieldsCustomSources.get(CloudEventsMaker.FieldName.TYPE);
return new MetadataLocation(global, idCustomSource != null ? idCustomSource : global, typeCustomSource != null ? typeCustomSource : global);
} }
/** public class MetadataLocation {
* The set of predefined IdSource options private final MetadataLocationValue global;
*/ private final MetadataLocationValue id;
public enum IdSource implements EnumeratedValue { private final MetadataLocationValue type;
/** public MetadataLocation(MetadataLocationValue global, MetadataLocationValue id, MetadataLocationValue type) {
* Generate id of CloudEvent this.global = global;
*/ this.id = id;
GENERATE("generate"), this.type = type;
/**
* Get type of CloudEvent from the header
*/
HEADER("header");
private final String value;
IdSource(String value) {
this.value = value;
} }
@Override public MetadataLocationValue global() {
public String getValue() { return global;
return value;
} }
/** public MetadataLocationValue id() {
* Determine if the supplied values is one of the predefined options return id;
* }
* @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found public MetadataLocationValue type() {
*/ return type;
public static IdSource parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (IdSource option : IdSource.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
} }
} }
/** /**
* The set of predefined TypeSource options * The set of predefined MetadataLocationValue options
*/ */
public enum TypeSource implements EnumeratedValue { public enum MetadataLocationValue implements EnumeratedValue {
/**
* Generate type of CloudEvent
*/
GENERATE("generate"),
/**
* Get type of CloudEvent from the header
*/
HEADER("header");
private final String value;
TypeSource(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied values is one of the predefined options
*
* @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found
*/
public static TypeSource parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (TypeSource option : TypeSource.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
}
/**
* The set of predefined MetadataLocation options
*/
public enum MetadataLocation implements EnumeratedValue {
/** /**
* Get metadata from the value * Get metadata from the value
@ -246,11 +178,16 @@ public enum MetadataLocation implements EnumeratedValue {
/** /**
* Get metadata from the header * Get metadata from the header
*/ */
HEADER("header"); HEADER("header"),
/**
* Generate a field's value
*/
GENERATE("generate");
private final String value; private final String value;
MetadataLocation(String value) { MetadataLocationValue(String value) {
this.value = value; this.value = value;
} }
@ -265,12 +202,12 @@ public String getValue() {
* @param value the configuration property value ; may not be null * @param value the configuration property value ; may not be null
* @return the matching option, or null if the match is not found * @return the matching option, or null if the match is not found
*/ */
public static MetadataLocation parse(String value) { public static MetadataLocationValue parse(String value) {
if (value == null) { if (value == null) {
return null; return null;
} }
value = value.trim(); value = value.trim();
for (MetadataLocation option : MetadataLocation.values()) { for (MetadataLocationValue option : MetadataLocationValue.values()) {
if (option.getValue().equalsIgnoreCase(value)) { if (option.getValue().equalsIgnoreCase(value)) {
return option; return option;
} }

View File

@ -1,41 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.converters.recordandmetadata;
import io.debezium.converters.CloudEventsConverterConfig.IdSource;
import io.debezium.converters.CloudEventsConverterConfig.MetadataLocation;
import io.debezium.converters.CloudEventsConverterConfig.TypeSource;
/**
* The structure containing information from where to retrieve value for some CloudEvents fields
*
* @author Roman Kudryashov
*/
public class CloudEventFieldsSources {
private final IdSource idSource;
private final TypeSource typeSource;
private final MetadataLocation metadataSource;
public CloudEventFieldsSources(IdSource idSource, TypeSource typeSource, MetadataLocation metadataSource) {
this.idSource = idSource;
this.typeSource = typeSource;
this.metadataSource = metadataSource;
}
public IdSource getIdSource() {
return idSource;
}
public TypeSource getTypeSource() {
return typeSource;
}
public MetadataLocation getMetadataSource() {
return metadataSource;
}
}

View File

@ -13,51 +13,22 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverter;
import io.debezium.converters.CloudEventsConverterConfig; import io.debezium.converters.CloudEventsConverterConfig.MetadataLocation;
import io.debezium.converters.CloudEventsConverterConfig.MetadataLocationValue;
import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
public class RecordAndMetadataHeaderImpl extends RecordAndMetadataBaseImpl implements RecordAndMetadata { public class RecordAndMetadataHeaderImpl extends RecordAndMetadataBaseImpl implements RecordAndMetadata {
private final String id; private final Headers headers;
private final String type; private final MetadataLocation metadataLocation;
private final Struct source; private final JsonConverter jsonHeaderConverter;
private final String operation;
private final Struct transaction;
private final SchemaAndValue ts_ms;
public RecordAndMetadataHeaderImpl(Struct record, Schema dataSchema, Headers headers, JsonConverter jsonHeaderConverter, public RecordAndMetadataHeaderImpl(Struct record, Schema dataSchema, Headers headers, MetadataLocation metadataLocation, JsonConverter jsonHeaderConverter) {
CloudEventFieldsSources cloudEventFieldsSources) {
super(record, dataSchema); super(record, dataSchema);
this.headers = headers;
if (cloudEventFieldsSources.getIdSource() == CloudEventsConverterConfig.IdSource.HEADER) { this.metadataLocation = metadataLocation;
this.id = (String) getHeaderSchemaAndValue(headers, CloudEventsMaker.FieldName.ID, jsonHeaderConverter).value(); this.jsonHeaderConverter = jsonHeaderConverter;
}
else {
this.id = super.id();
}
if (cloudEventFieldsSources.getTypeSource() == CloudEventsConverterConfig.TypeSource.HEADER) {
this.type = (String) getHeaderSchemaAndValue(headers, CloudEventsMaker.FieldName.TYPE, jsonHeaderConverter).value();
}
else {
this.type = super.type();
}
if (cloudEventFieldsSources.getMetadataSource() == CloudEventsConverterConfig.MetadataLocation.HEADER) {
this.source = (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.SOURCE, jsonHeaderConverter).value();
this.operation = (String) getHeaderSchemaAndValue(headers, Envelope.FieldName.OPERATION, jsonHeaderConverter).value();
this.transaction = (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.TRANSACTION, jsonHeaderConverter).value();
}
else {
this.source = super.source();
this.operation = super.operation();
this.transaction = super.transaction();
}
String ts_ms = source.getInt64(Envelope.FieldName.TIMESTAMP).toString();
Schema ts_msSchema = source.schema().field(Envelope.FieldName.TIMESTAMP).schema();
this.ts_ms = new SchemaAndValue(ts_msSchema, ts_ms);
} }
@Override @Override
@ -67,37 +38,58 @@ public Schema dataSchema(String... dataFields) {
@Override @Override
public String id() { public String id() {
return this.id; if (metadataLocation.id() == MetadataLocationValue.HEADER) {
return (String) getHeaderSchemaAndValue(headers, CloudEventsMaker.FieldName.ID, false, jsonHeaderConverter).value();
}
return super.id();
} }
@Override @Override
public String type() { public String type() {
return this.type; if (metadataLocation.type() == MetadataLocationValue.HEADER) {
return (String) getHeaderSchemaAndValue(headers, CloudEventsMaker.FieldName.TYPE, false, jsonHeaderConverter).value();
}
return super.type();
} }
@Override @Override
public Struct source() { public Struct source() {
return this.source; if (metadataLocation.global() == MetadataLocationValue.HEADER) {
return (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.SOURCE, false, jsonHeaderConverter).value();
}
return super.source();
} }
@Override @Override
public String operation() { public String operation() {
return this.operation; if (metadataLocation.global() == MetadataLocationValue.HEADER) {
return (String) getHeaderSchemaAndValue(headers, Envelope.FieldName.OPERATION, false, jsonHeaderConverter).value();
}
return super.operation();
} }
@Override @Override
public Struct transaction() { public Struct transaction() {
return this.transaction; if (metadataLocation.global() == MetadataLocationValue.HEADER) {
return (Struct) getHeaderSchemaAndValue(headers, Envelope.FieldName.TRANSACTION, true, jsonHeaderConverter).value();
}
return super.transaction();
} }
@Override @Override
public SchemaAndValue timestamp() { public SchemaAndValue timestamp() {
return this.ts_ms; if (metadataLocation.global() == MetadataLocationValue.HEADER) {
String ts_ms = this.source().getInt64(Envelope.FieldName.TIMESTAMP).toString();
Schema ts_msSchema = this.source().schema().field(Envelope.FieldName.TIMESTAMP).schema();
return new SchemaAndValue(ts_msSchema, ts_ms);
}
return super.timestamp();
} }
private static SchemaAndValue getHeaderSchemaAndValue(Headers headers, String headerName, JsonConverter jsonHeaderConverter) { private static SchemaAndValue getHeaderSchemaAndValue(Headers headers, String headerName, boolean isOptional, JsonConverter jsonHeaderConverter) {
Header header = headers.lastHeader(headerName); Header header = headers.lastHeader(headerName);
if (header == null) { if (header == null && !isOptional) {
throw new RuntimeException("Header `" + headerName + "` was not provided"); throw new RuntimeException("Header `" + headerName + "` was not provided");
} }
return jsonHeaderConverter.toConnectData(null, header.value()); return jsonHeaderConverter.toConnectData(null, header.value());

View File

@ -325,12 +325,10 @@ public static void shouldConvertToCloudEventsInAvro(SourceRecord record, String
} }
} }
public static void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeaders(SourceRecord record, String connectorName, String serverName) throws Exception { public static void shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(SourceRecord record, String connectorName, String serverName) throws Exception {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "json"); config.put("serializer.type", "json");
config.put("data.serializer.type", "json"); config.put("data.serializer.type", "json");
config.put("id.source", "header");
config.put("type.source", "header");
config.put("metadata.location", "header"); config.put("metadata.location", "header");
CloudEventsConverter cloudEventsConverter = new CloudEventsConverter(); CloudEventsConverter cloudEventsConverter = new CloudEventsConverter();
@ -390,11 +388,11 @@ public static void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHea
} }
} }
public static void shouldConvertToCloudEventsInJsonWithTypeInHeader(SourceRecord record, String connectorName, String serverName) throws Exception { public static void shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader(SourceRecord record, String connectorName, String serverName) throws Exception {
Map<String, Object> config = new HashMap<>(); Map<String, Object> config = new HashMap<>();
config.put("serializer.type", "json"); config.put("serializer.type", "json");
config.put("data.serializer.type", "json"); config.put("data.serializer.type", "json");
config.put("type.source", "header"); config.put("metadata.location", "value,id:generate,type:header");
CloudEventsConverter cloudEventsConverter = new CloudEventsConverter(); CloudEventsConverter cloudEventsConverter = new CloudEventsConverter();
cloudEventsConverter.configure(config, false); cloudEventsConverter.configure(config, false);

View File

@ -90,7 +90,7 @@ public void shouldConvertToCloudEventsInJsonWithoutExtensionAttributes() throws
@Test @Test
@FixFor({ "DBZ-3642", "DBZ-7016" }) @FixFor({ "DBZ-3642", "DBZ-7016" })
public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAfterOutboxEventRouter() throws Exception { public void shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeadersAfterOutboxEventRouter() throws Exception {
HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>(); HeaderFrom<SourceRecord> headerFrom = new HeaderFrom.Value<>();
Map<String, String> headerFromConfig = new LinkedHashMap<>(); Map<String, String> headerFromConfig = new LinkedHashMap<>();
headerFromConfig.put("fields", "source,op,transaction"); headerFromConfig.put("fields", "source,op,transaction");
@ -132,7 +132,7 @@ public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAft
assertThat(routedEvent.key()).isEqualTo("10711fa5"); assertThat(routedEvent.key()).isEqualTo("10711fa5");
assertThat(routedEvent.value()).isInstanceOf(Struct.class); assertThat(routedEvent.value()).isInstanceOf(Struct.class);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeaders(routedEvent, getConnectorName(), getServerName()); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(routedEvent, getConnectorName(), getServerName());
headerFrom.close(); headerFrom.close();
outboxEventRouter.close(); outboxEventRouter.close();
@ -140,7 +140,7 @@ public void shouldConvertToCloudEventsInJsonWithIdAndTypeAndMetadataInHeadersAft
@Test @Test
@FixFor({ "DBZ-7016" }) @FixFor({ "DBZ-7016" })
public void shouldConvertToCloudEventsInJsonWithTypeInHeader() throws Exception { public void shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader() throws Exception {
InsertHeader<SourceRecord> insertHeader = new InsertHeader<>(); InsertHeader<SourceRecord> insertHeader = new InsertHeader<>();
Map<String, String> insertHeaderConfig = new LinkedHashMap<>(); Map<String, String> insertHeaderConfig = new LinkedHashMap<>();
insertHeaderConfig.put("header", "type"); insertHeaderConfig.put("header", "type");
@ -161,7 +161,7 @@ public void shouldConvertToCloudEventsInJsonWithTypeInHeader() throws Exception
assertThat(recordWithTypeInHeader.topic()).isEqualTo(topicName()); assertThat(recordWithTypeInHeader.topic()).isEqualTo(topicName());
assertThat(recordWithTypeInHeader.value()).isInstanceOf(Struct.class); assertThat(recordWithTypeInHeader.value()).isInstanceOf(Struct.class);
CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithTypeInHeader(recordWithTypeInHeader, getConnectorName(), getServerName()); CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithGeneratedIdAndTypeFromHeader(recordWithTypeInHeader, getConnectorName(), getServerName());
insertHeader.close(); insertHeader.close();
} }