DBZ-2345 Add sub abstract class.

This commit is contained in:
Bingqin Zhou 2020-07-28 17:31:40 -07:00 committed by Gunnar Morling
parent 75cbbaa3df
commit 9b6d24825d
7 changed files with 38 additions and 36 deletions

View File

@ -26,6 +26,7 @@
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
@ -423,13 +424,14 @@ private void populateRegularColumns(RowData after, Row row, RowType rowType, Sch
try {
Object value;
Object deletionTs = null;
if (cd.isComplex()) {
AbstractType abstractType = cd.type;
if (abstractType.isCollection()) {
ComplexColumnData ccd = row.getComplexColumnData(cd);
value = CassandraTypeDeserializer.deserialize(cd.type, ccd);
value = CassandraTypeDeserializer.deserialize((CollectionType) abstractType, ccd);
}
else {
org.apache.cassandra.db.rows.Cell cell = row.getCell(cd);
value = cell.isTombstone() ? null : CassandraTypeDeserializer.deserialize(cd.type, cell.value());
value = cell.isTombstone() ? null : CassandraTypeDeserializer.deserialize(abstractType, cell.value());
deletionTs = cell.isExpiring() ? TimeUnit.MICROSECONDS.convert(cell.localDeletionTime(), TimeUnit.SECONDS) : null;
}
String name = cd.name.toString();

View File

@ -15,6 +15,7 @@
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.db.marshal.DecimalType;
import org.apache.cassandra.db.marshal.DoubleType;
@ -44,6 +45,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.cassandra.transforms.type.deserializer.BasicTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.CollectionTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.DurationTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.InetAddressDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.ListTypeDeserializer;
@ -132,18 +134,13 @@ public static Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
/**
* Deserialize from cdc-log-sourced cassandra data.
*
* @param abstractType the {@link AbstractType} of the collection column
* @param collectionType the {@link CollectionType} of the collection column
* @param ccd the ComplexColumnData of the collection column to deserialize
* @return the deserialized object.
*/
public static Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
// Check if abstract type is reversed, if yes, use the base type for deserialization.
if (abstractType.isReversed()) {
abstractType = ((ReversedType) abstractType).baseType;
}
TypeDeserializer typeDeserializer = TYPE_MAP.get(abstractType.getClass());
return typeDeserializer.deserialize(abstractType, ccd);
public static Object deserialize(CollectionType<?> collectionType, ComplexColumnData ccd) {
TypeDeserializer typeDeserializer = TYPE_MAP.get(collectionType.getClass());
return ((CollectionTypeDeserializer) typeDeserializer).deserialize(collectionType, ccd);
}
/**

View File

@ -0,0 +1,8 @@
package io.debezium.connector.cassandra.transforms.type.deserializer;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.rows.ComplexColumnData;
public abstract class CollectionTypeDeserializer extends TypeDeserializer {
public abstract Object deserialize(CollectionType<?> collectionType, ComplexColumnData ccd);
}

View File

@ -10,6 +10,7 @@
import java.util.List;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.kafka.connect.data.Schema;
@ -18,7 +19,7 @@
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class ListTypeDeserializer extends TypeDeserializer {
public class ListTypeDeserializer extends CollectionTypeDeserializer {
@Override
@SuppressWarnings("unchecked")
@ -36,13 +37,13 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
}
@Override
public Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((ListType) abstractType).serializedValues(ccd.iterator());
AbstractType innerType = ((ListType) abstractType).getElementsType();
public Object deserialize(CollectionType<?> collectionType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((ListType) collectionType).serializedValues(ccd.iterator());
AbstractType innerType = ((ListType) collectionType).getElementsType();
List<Object> deserializedList = new ArrayList<>();
for (ByteBuffer bb : bbList) {
deserializedList.add(super.deserialize(innerType, bb));
}
return Values.convertToList(getSchemaBuilder(abstractType).build(), deserializedList);
return Values.convertToList(getSchemaBuilder(collectionType).build(), deserializedList);
}
}

View File

@ -11,6 +11,7 @@
import java.util.Map;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.kafka.connect.data.Schema;
@ -19,7 +20,7 @@
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class MapTypeDeserializer extends TypeDeserializer {
public class MapTypeDeserializer extends CollectionTypeDeserializer {
@Override
public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
@ -38,10 +39,10 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
}
@Override
public Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((MapType) abstractType).serializedValues(ccd.iterator());
AbstractType keyType = ((MapType) abstractType).getKeysType();
AbstractType valueType = ((MapType) abstractType).getValuesType();
public Object deserialize(CollectionType<?> collectionType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((MapType) collectionType).serializedValues(ccd.iterator());
AbstractType keyType = ((MapType) collectionType).getKeysType();
AbstractType valueType = ((MapType) collectionType).getValuesType();
Map<Object, Object> deserializedMap = new HashMap<>();
int i = 0;
while (i < bbList.size()) {
@ -49,6 +50,6 @@ public Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
ByteBuffer vbb = bbList.get(i++);
deserializedMap.put(super.deserialize(keyType, kbb), super.deserialize(valueType, vbb));
}
return Values.convertToMap(getSchemaBuilder(abstractType).build(), deserializedMap);
return Values.convertToMap(getSchemaBuilder(collectionType).build(), deserializedMap);
}
}

View File

@ -12,6 +12,7 @@
import java.util.Set;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.kafka.connect.data.Schema;
@ -20,7 +21,7 @@
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
public class SetTypeDeserializer extends TypeDeserializer {
public class SetTypeDeserializer extends CollectionTypeDeserializer {
@Override
@SuppressWarnings("unchecked")
@ -39,14 +40,14 @@ public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
}
@Override
public Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((SetType) abstractType).serializedValues(ccd.iterator());
AbstractType innerType = ((SetType) abstractType).getElementsType();
public Object deserialize(CollectionType<?> collectionType, ComplexColumnData ccd) {
List<ByteBuffer> bbList = ((SetType) collectionType).serializedValues(ccd.iterator());
AbstractType innerType = ((SetType) collectionType).getElementsType();
Set<Object> deserializedSet = new HashSet<>();
for (ByteBuffer bb : bbList) {
deserializedSet.add(super.deserialize(innerType, bb));
}
List<Object> deserializedList = new ArrayList<>(deserializedSet);
return Values.convertToList(getSchemaBuilder(abstractType).build(), deserializedList);
return Values.convertToList(getSchemaBuilder(collectionType).build(), deserializedList);
}
}

View File

@ -8,7 +8,6 @@
import java.nio.ByteBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.kafka.connect.data.SchemaBuilder;
public abstract class TypeDeserializer {
@ -17,12 +16,5 @@ public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
return abstractType.getSerializer().deserialize(bb);
}
// This method will be overwritten in all complex-type deserializers
// including MapTypeDeserializer, SetTypeDeserializer and ListTypeDeserializer,
// and is not supposed to be called for deserialization of any non-complex type column.
public Object deserialize(AbstractType<?> abstractType, ComplexColumnData ccd) {
return null;
}
public abstract SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType);
}