From e6944c55075f8f1a30a991c1ea90bcb43ede0bf9 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Sat, 25 Jul 2020 01:42:25 -0700 Subject: [PATCH] DBZ-2103 Fix for data mismatch error --- .../connector/cassandra/CellData.java | 29 +++---------------- .../CassandraTypeKafkaSchemaBuilders.java | 28 +++++++++--------- .../deserializer/ListTypeDeserializer.java | 2 +- .../deserializer/MapTypeDeserializer.java | 2 +- .../deserializer/SetTypeDeserializer.java | 2 +- .../deserializer/TupleTypeDeserializer.java | 2 +- .../deserializer/UserTypeDeserializer.java | 2 +- 7 files changed, 22 insertions(+), 45 deletions(-) diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java index afe5d7431..1fc27ad8c 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CellData.java @@ -8,7 +8,6 @@ import java.util.Objects; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -23,6 +22,7 @@ * type of a column in a Cassandra table. */ public class CellData implements KafkaRecord { + /** * The type of a column in a Cassandra table */ @@ -69,35 +69,14 @@ public boolean isPrimary() { public Struct record(Schema schema) { Struct cellStruct = new Struct(schema) .put(CELL_DELETION_TS_KEY, deletionTs) - .put(CELL_SET_KEY, true); - - if (value instanceof Struct) { - Schema valueSchema = schema.field(CELL_VALUE_KEY).schema(); - Struct clonedValue = cloneValue(valueSchema, (Struct) value); - cellStruct.put(CELL_VALUE_KEY, clonedValue); - } - else { - cellStruct.put(CELL_VALUE_KEY, value); - } - + .put(CELL_SET_KEY, true) + .put(CELL_VALUE_KEY, value); return cellStruct; } - // Encountered DataException("Struct schemas do not match.") when value is a Struct. - // The error is because the valueSchema is optional, but the schema of value formed during deserialization is not. - // This is a temporary workaround to fix this problem. - private Struct cloneValue(Schema valueSchema, Struct value) { - Struct clonedValue = new Struct(valueSchema); - for (Field field : valueSchema.fields()) { - String fieldName = field.name(); - clonedValue.put(fieldName, value.get(fieldName)); - } - return clonedValue; - } - static Schema cellSchema(ColumnMetadata cm, boolean optional) { AbstractType convertedType = CassandraTypeConverter.convert(cm.getType()); - Schema valueSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).optional().build(); + Schema valueSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).build(); if (valueSchema != null) { SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(cm.getName()) .field(CELL_VALUE_KEY, valueSchema) diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeKafkaSchemaBuilders.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeKafkaSchemaBuilders.java index 23393e963..f528869ab 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeKafkaSchemaBuilders.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeKafkaSchemaBuilders.java @@ -11,7 +11,6 @@ import io.debezium.data.Uuid; import io.debezium.time.Date; -import io.debezium.time.MicroTimestamp; import io.debezium.time.NanoDuration; import io.debezium.time.Timestamp; @@ -21,20 +20,19 @@ public final class CassandraTypeKafkaSchemaBuilders { // native types - public static final SchemaBuilder STRING_TYPE = SchemaBuilder.string(); - public static final SchemaBuilder BOOLEAN_TYPE = SchemaBuilder.bool(); - public static final SchemaBuilder BYTES_TYPE = SchemaBuilder.bytes(); - public static final SchemaBuilder BYTE_TYPE = SchemaBuilder.int8(); - public static final SchemaBuilder SHORT_TYPE = SchemaBuilder.int16(); - public static final SchemaBuilder INT_TYPE = SchemaBuilder.int32(); - public static final SchemaBuilder LONG_TYPE = SchemaBuilder.int64(); - public static final SchemaBuilder FLOAT_TYPE = SchemaBuilder.float32(); - public static final SchemaBuilder DOUBLE_TYPE = SchemaBuilder.float64(); + public static final SchemaBuilder STRING_TYPE = SchemaBuilder.string().optional(); + public static final SchemaBuilder BOOLEAN_TYPE = SchemaBuilder.bool().optional(); + public static final SchemaBuilder BYTES_TYPE = SchemaBuilder.bytes().optional(); + public static final SchemaBuilder BYTE_TYPE = SchemaBuilder.int8().optional(); + public static final SchemaBuilder SHORT_TYPE = SchemaBuilder.int16().optional(); + public static final SchemaBuilder INT_TYPE = SchemaBuilder.int32().optional(); + public static final SchemaBuilder LONG_TYPE = SchemaBuilder.int64().optional(); + public static final SchemaBuilder FLOAT_TYPE = SchemaBuilder.float32().optional(); + public static final SchemaBuilder DOUBLE_TYPE = SchemaBuilder.float64().optional(); // logical types - public static final SchemaBuilder DATE_TYPE = Date.builder(); - public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder(); - public static final SchemaBuilder TIMESTAMP_MICRO_TYPE = MicroTimestamp.builder(); - public static final SchemaBuilder UUID_TYPE = Uuid.builder(); - public static final SchemaBuilder DURATION_TYPE = NanoDuration.builder(); + public static final SchemaBuilder DATE_TYPE = Date.builder().optional(); + public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder().optional(); + public static final SchemaBuilder UUID_TYPE = Uuid.builder().optional(); + public static final SchemaBuilder DURATION_TYPE = NanoDuration.builder().optional(); } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java index c5f33c328..5692736ea 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java @@ -30,6 +30,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { ListType listType = (ListType) abstractType; AbstractType elementsType = listType.getElementsType(); Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build(); - return SchemaBuilder.array(innerSchema); + return SchemaBuilder.array(innerSchema).optional(); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java index 617748802..e808ded55 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java @@ -32,6 +32,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { AbstractType valuesType = mapType.getValuesType(); Schema keySchema = CassandraTypeDeserializer.getSchemaBuilder(keysType).build(); Schema valuesSchema = CassandraTypeDeserializer.getSchemaBuilder(valuesType).build(); - return SchemaBuilder.map(keySchema, valuesSchema); + return SchemaBuilder.map(keySchema, valuesSchema).optional(); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java index b8f423f38..9652052db 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java @@ -33,6 +33,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { SetType listType = (SetType) abstractType; AbstractType elementsType = listType.getElementsType(); Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build(); - return SchemaBuilder.array(innerSchema); + return SchemaBuilder.array(innerSchema).optional(); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TupleTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TupleTypeDeserializer.java index 2b1d87267..8916d4d9b 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TupleTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TupleTypeDeserializer.java @@ -53,7 +53,7 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { schemaBuilder.field(createFieldNameForIndex(i), CassandraTypeDeserializer.getSchemaBuilder(innerType).build()); } - return schemaBuilder; + return schemaBuilder.optional(); } private String createTupleName(List> innerTypes) { diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/UserTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/UserTypeDeserializer.java index 68c6fddb8..009c67412 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/UserTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/UserTypeDeserializer.java @@ -47,6 +47,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { Schema fieldSchema = CassandraTypeDeserializer.getSchemaBuilder(fieldTypes.get(i)).build(); schemaBuilder.field(fieldIdentifiers.get(i).toString(), fieldSchema); } - return schemaBuilder; + return schemaBuilder.optional(); } }