DBZ-2103 Fix for data mismatch error

This commit is contained in:
Bingqin Zhou 2020-07-25 01:42:25 -07:00 committed by GitHub
parent 5561e97f6d
commit e6944c5507
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 22 additions and 45 deletions

View File

@ -8,7 +8,6 @@
import java.util.Objects; import java.util.Objects;
import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
@ -23,6 +22,7 @@
* type of a column in a Cassandra table. * type of a column in a Cassandra table.
*/ */
public class CellData implements KafkaRecord { public class CellData implements KafkaRecord {
/** /**
* The type of a column in a Cassandra table * The type of a column in a Cassandra table
*/ */
@ -69,35 +69,14 @@ public boolean isPrimary() {
public Struct record(Schema schema) { public Struct record(Schema schema) {
Struct cellStruct = new Struct(schema) Struct cellStruct = new Struct(schema)
.put(CELL_DELETION_TS_KEY, deletionTs) .put(CELL_DELETION_TS_KEY, deletionTs)
.put(CELL_SET_KEY, true); .put(CELL_SET_KEY, true)
.put(CELL_VALUE_KEY, value);
if (value instanceof Struct) {
Schema valueSchema = schema.field(CELL_VALUE_KEY).schema();
Struct clonedValue = cloneValue(valueSchema, (Struct) value);
cellStruct.put(CELL_VALUE_KEY, clonedValue);
}
else {
cellStruct.put(CELL_VALUE_KEY, value);
}
return cellStruct; return cellStruct;
} }
// Encountered DataException("Struct schemas do not match.") when value is a Struct.
// The error is because the valueSchema is optional, but the schema of value formed during deserialization is not.
// This is a temporary workaround to fix this problem.
private Struct cloneValue(Schema valueSchema, Struct value) {
Struct clonedValue = new Struct(valueSchema);
for (Field field : valueSchema.fields()) {
String fieldName = field.name();
clonedValue.put(fieldName, value.get(fieldName));
}
return clonedValue;
}
static Schema cellSchema(ColumnMetadata cm, boolean optional) { static Schema cellSchema(ColumnMetadata cm, boolean optional) {
AbstractType<?> convertedType = CassandraTypeConverter.convert(cm.getType()); AbstractType<?> convertedType = CassandraTypeConverter.convert(cm.getType());
Schema valueSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).optional().build(); Schema valueSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).build();
if (valueSchema != null) { if (valueSchema != null) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(cm.getName()) SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(cm.getName())
.field(CELL_VALUE_KEY, valueSchema) .field(CELL_VALUE_KEY, valueSchema)

View File

@ -11,7 +11,6 @@
import io.debezium.data.Uuid; import io.debezium.data.Uuid;
import io.debezium.time.Date; import io.debezium.time.Date;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoDuration; import io.debezium.time.NanoDuration;
import io.debezium.time.Timestamp; import io.debezium.time.Timestamp;
@ -21,20 +20,19 @@
public final class CassandraTypeKafkaSchemaBuilders { public final class CassandraTypeKafkaSchemaBuilders {
// native types // native types
public static final SchemaBuilder STRING_TYPE = SchemaBuilder.string(); public static final SchemaBuilder STRING_TYPE = SchemaBuilder.string().optional();
public static final SchemaBuilder BOOLEAN_TYPE = SchemaBuilder.bool(); public static final SchemaBuilder BOOLEAN_TYPE = SchemaBuilder.bool().optional();
public static final SchemaBuilder BYTES_TYPE = SchemaBuilder.bytes(); public static final SchemaBuilder BYTES_TYPE = SchemaBuilder.bytes().optional();
public static final SchemaBuilder BYTE_TYPE = SchemaBuilder.int8(); public static final SchemaBuilder BYTE_TYPE = SchemaBuilder.int8().optional();
public static final SchemaBuilder SHORT_TYPE = SchemaBuilder.int16(); public static final SchemaBuilder SHORT_TYPE = SchemaBuilder.int16().optional();
public static final SchemaBuilder INT_TYPE = SchemaBuilder.int32(); public static final SchemaBuilder INT_TYPE = SchemaBuilder.int32().optional();
public static final SchemaBuilder LONG_TYPE = SchemaBuilder.int64(); public static final SchemaBuilder LONG_TYPE = SchemaBuilder.int64().optional();
public static final SchemaBuilder FLOAT_TYPE = SchemaBuilder.float32(); public static final SchemaBuilder FLOAT_TYPE = SchemaBuilder.float32().optional();
public static final SchemaBuilder DOUBLE_TYPE = SchemaBuilder.float64(); public static final SchemaBuilder DOUBLE_TYPE = SchemaBuilder.float64().optional();
// logical types // logical types
public static final SchemaBuilder DATE_TYPE = Date.builder(); public static final SchemaBuilder DATE_TYPE = Date.builder().optional();
public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder(); public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder().optional();
public static final SchemaBuilder TIMESTAMP_MICRO_TYPE = MicroTimestamp.builder(); public static final SchemaBuilder UUID_TYPE = Uuid.builder().optional();
public static final SchemaBuilder UUID_TYPE = Uuid.builder(); public static final SchemaBuilder DURATION_TYPE = NanoDuration.builder().optional();
public static final SchemaBuilder DURATION_TYPE = NanoDuration.builder();
} }

View File

@ -30,6 +30,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
ListType<?> listType = (ListType<?>) abstractType; ListType<?> listType = (ListType<?>) abstractType;
AbstractType<?> elementsType = listType.getElementsType(); AbstractType<?> elementsType = listType.getElementsType();
Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build(); Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build();
return SchemaBuilder.array(innerSchema); return SchemaBuilder.array(innerSchema).optional();
} }
} }

View File

@ -32,6 +32,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
AbstractType<?> valuesType = mapType.getValuesType(); AbstractType<?> valuesType = mapType.getValuesType();
Schema keySchema = CassandraTypeDeserializer.getSchemaBuilder(keysType).build(); Schema keySchema = CassandraTypeDeserializer.getSchemaBuilder(keysType).build();
Schema valuesSchema = CassandraTypeDeserializer.getSchemaBuilder(valuesType).build(); Schema valuesSchema = CassandraTypeDeserializer.getSchemaBuilder(valuesType).build();
return SchemaBuilder.map(keySchema, valuesSchema); return SchemaBuilder.map(keySchema, valuesSchema).optional();
} }
} }

View File

@ -33,6 +33,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
SetType<?> listType = (SetType<?>) abstractType; SetType<?> listType = (SetType<?>) abstractType;
AbstractType<?> elementsType = listType.getElementsType(); AbstractType<?> elementsType = listType.getElementsType();
Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build(); Schema innerSchema = CassandraTypeDeserializer.getSchemaBuilder(elementsType).build();
return SchemaBuilder.array(innerSchema); return SchemaBuilder.array(innerSchema).optional();
} }
} }

View File

@ -53,7 +53,7 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
schemaBuilder.field(createFieldNameForIndex(i), CassandraTypeDeserializer.getSchemaBuilder(innerType).build()); schemaBuilder.field(createFieldNameForIndex(i), CassandraTypeDeserializer.getSchemaBuilder(innerType).build());
} }
return schemaBuilder; return schemaBuilder.optional();
} }
private String createTupleName(List<AbstractType<?>> innerTypes) { private String createTupleName(List<AbstractType<?>> innerTypes) {

View File

@ -47,6 +47,6 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
Schema fieldSchema = CassandraTypeDeserializer.getSchemaBuilder(fieldTypes.get(i)).build(); Schema fieldSchema = CassandraTypeDeserializer.getSchemaBuilder(fieldTypes.get(i)).build();
schemaBuilder.field(fieldIdentifiers.get(i).toString(), fieldSchema); schemaBuilder.field(fieldIdentifiers.get(i).toString(), fieldSchema);
} }
return schemaBuilder; return schemaBuilder.optional();
} }
} }