DBZ-1292 Wrapping up work on schema and value conversion;

* There were a few attributes missing in the schema of the CE event
* Conversion of extension attributes now correctly handles optional attributes
* Removed guards against missing and duplicate fields, as the only caller is this class itself, so it seems superfluous.
This commit is contained in:
Gunnar Morling 2020-01-09 13:14:27 +01:00
parent de2655a89e
commit 390c8f8090

View File

@ -12,16 +12,13 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Set;import java.util.concurrent.ConcurrentHashMap;
import io.debezium.data.Envelope;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
@ -43,6 +40,8 @@
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.data.Envelope;
import io.debezium.util.SchemaNameAdjuster;
/**
@ -341,29 +340,25 @@ private SchemaAndValue convertToCloudEventsFormat(RecordParser parser, CloudEven
// construct schema of CloudEvents envelope
CESchemaBuilder ceSchemaBuilder = defineSchema()
.withName(schemaNameAdjuster.adjust(maker.ceEnvelopeSchemaName()))
.withSchema(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.TIME, Schema.STRING_SCHEMA)
.withSchema(CloudEventsMaker.FieldName.DATA, dataSchemaType)
.withSchema(attributeAdjuster.adjust(Envelope.FieldName.OPERATION), Schema.STRING_SCHEMA)
.withSchema(attributeAdjuster.adjust(Envelope.FieldName.TIMESTAMP), Schema.STRING_SCHEMA);
.withSchema(CloudEventsMaker.FieldName.DATACONTENTTYPE, Schema.STRING_SCHEMA);
if (dataSchema != null) {
ceSchemaBuilder.withSchema(CloudEventsMaker.FieldName.DATASCHEMA, Schema.STRING_SCHEMA);
}
ceSchemaBuilder.withSchema(attributeAdjuster.adjust(Envelope.FieldName.OPERATION), Schema.STRING_SCHEMA);
for (Field field : sourceSchema.fields()) {
Schema schema = field.schema();
if (Schema.BOOLEAN_SCHEMA.equals(schema)) {
ceSchemaBuilder.withSchema(attributeAdjuster.adjust(field.name()), Schema.BOOLEAN_SCHEMA);
}
else if (Schema.INT8_SCHEMA.equals(schema) || Schema.INT16_SCHEMA.equals(schema) || Schema.INT32_SCHEMA.equals(schema)) {
ceSchemaBuilder.withSchema(attributeAdjuster.adjust(field.name()), Schema.INT32_SCHEMA);
}
else {
ceSchemaBuilder.withSchema(attributeAdjuster.adjust(field.name()), Schema.STRING_SCHEMA);
}
ceSchemaBuilder.withSchema(attributeAdjuster.adjust(field.name()), convertToCeExtensionSchema(field.schema()));
}
ceSchemaBuilder.withSchema(CloudEventsMaker.FieldName.DATA, dataSchemaType);
Schema ceSchema = ceSchemaBuilder.build();
// construct value of CloudEvents Envelope
@ -372,23 +367,58 @@ else if (Schema.INT8_SCHEMA.equals(schema) || Schema.INT16_SCHEMA.equals(schema)
.withValue(CloudEventsMaker.FieldName.SOURCE, maker.ceSource())
.withValue(CloudEventsMaker.FieldName.SPECVERSION, maker.ceSpecversion())
.withValue(CloudEventsMaker.FieldName.TYPE, maker.ceType())
.withValue(CloudEventsMaker.FieldName.DATACONTENTTYPE, maker.ceDatacontenttype())
.withValue(CloudEventsMaker.FieldName.TIME, maker.ceTime())
.withValue(CloudEventsMaker.FieldName.DATA, serializedData)
.withValue(attributeAdjuster.adjust(Envelope.FieldName.OPERATION), parser.op())
.withValue(attributeAdjuster.adjust(Envelope.FieldName.TIMESTAMP), parser.ts_ms());
.withValue(CloudEventsMaker.FieldName.DATACONTENTTYPE, maker.ceDatacontenttype());
if (dataSchema != null) {
ceValueBuilder.withValue(CloudEventsMaker.FieldName.DATASCHEMA, dataSchema);
}
ceValueBuilder.withValue(attributeAdjuster.adjust(Envelope.FieldName.OPERATION), parser.op());
for (Field field : sourceSchema.fields()) {
ceValueBuilder.withValue(attributeAdjuster.adjust(field.name()), source.get(field));
Object value = source.get(field);
if (field.schema().type() == Type.INT64 && value != null) {
value = String.valueOf((long) value);
}
ceValueBuilder.withValue(attributeAdjuster.adjust(field.name()), value);
}
ceValueBuilder.withValue(CloudEventsMaker.FieldName.DATA, serializedData);
return new SchemaAndValue(ceSchema, ceValueBuilder.build());
}
/**
* Converts the given source attribute schema into a corresponding CE extension schema.
* The types supported there are limited, e.g. int64 can only be represented as string.
*/
private Schema convertToCeExtensionSchema(Schema schema) {
SchemaBuilder ceExtensionSchema;
if (schema.type() == Type.BOOLEAN) {
ceExtensionSchema = SchemaBuilder.bool();
}
// all numbers up to int32 go as int32
else if (schema.type() == Type.INT8 || schema.type() == Type.INT16 || schema.type() == Type.INT16) {
ceExtensionSchema = SchemaBuilder.int32();
}
// int64 isn't supported as per CE spec
else if (schema.type() == Type.STRING || schema.type() == Type.INT64) {
ceExtensionSchema = SchemaBuilder.string();
}
// further attribute types may be supported in the future, but the ones above are the ones
// currently used in the "source" block of Debezium events
else {
throw new IllegalArgumentException("Source field of type " + schema.type() + " cannot be converted into CloudEvents extension attribute.");
}
if (schema.isOptional()) {
ceExtensionSchema.optional();
}
return ceExtensionSchema.build();
}
private JsonNode convertToJsonNode(Schema schema, Object value, boolean enableJsonSchemas) {
try {
JsonNode withoutSchemaNode = (JsonNode) CONVERT_TO_JSON_METHOD.invoke(jsonCEConverter, schema, value);
@ -409,10 +439,9 @@ private JsonNode convertToJsonNode(Schema schema, Object value, boolean enableJs
}
}
public static CESchemaBuilder defineSchema() {
private static CESchemaBuilder defineSchema() {
return new CESchemaBuilder() {
private final SchemaBuilder builder = SchemaBuilder.struct();
private final Set<String> missingFields = new HashSet<>();
@Override
public CESchemaBuilder withName(String name) {
@ -428,34 +457,12 @@ public CESchemaBuilder withSchema(String fieldName, Schema fieldSchema) {
@Override
public Schema build() {
// Required attributes
builder.field(CloudEventsMaker.FieldName.ID, Schema.STRING_SCHEMA);
builder.field(CloudEventsMaker.FieldName.SOURCE, Schema.STRING_SCHEMA);
builder.field(CloudEventsMaker.FieldName.SPECVERSION, Schema.STRING_SCHEMA);
builder.field(CloudEventsMaker.FieldName.TYPE, Schema.STRING_SCHEMA);
// Check required extension attribute names
checkFieldIsDefined(adjustExtensionName(Envelope.FieldName.OPERATION));
checkFieldIsDefined(adjustExtensionName(Envelope.FieldName.TIMESTAMP));
// Check the data attribute
checkFieldIsDefined(CloudEventsMaker.FieldName.DATA);
if (!missingFields.isEmpty()) {
throw new IllegalStateException("The envelope schema is missing field(s) " + String.join(", ", missingFields));
}
return builder.build();
}
private void checkFieldIsDefined(String fieldName) {
if (builder.field(fieldName) == null) {
missingFields.add(fieldName);
}
}
};
}
public static CEValueBuilder withValue(Schema schema) {
private static CEValueBuilder withValue(Schema schema) {
return new CEValueBuilder() {
private final Schema ceSchema = schema;
private final Struct ceValue = new Struct(ceSchema);
@ -519,13 +526,8 @@ public interface AttributeNameAdjuster {
* @return the new function; never null
*/
default AttributeNameAdjuster uniqueName() {
Set<String> alreadySeen = Collections.newSetFromMap(new ConcurrentHashMap<>());
return original -> {
String replacement = this.adjust(original);
if (!alreadySeen.add(replacement)) {
throw new DataException("Duplicated CloudEvents extension attribute name " + replacement);
}
return replacement;
return this.adjust(original);
};
}
}
@ -541,7 +543,8 @@ default AttributeNameAdjuster uniqueName() {
* @param original the original field name
* @return the valid extension attribute name
*/
public static String adjustExtensionName(String original) {
@VisibleForTesting
static String adjustExtensionName(String original) {
if (original.length() == 0) {
return original;
}