From 9b6d24825ddc9cd622e909cc07dbb7ffc18afe34 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 28 Jul 2020 17:31:40 -0700 Subject: [PATCH] DBZ-2345 Add sub abstract class. --- .../cassandra/CommitLogReadHandlerImpl.java | 8 +++++--- .../transforms/CassandraTypeDeserializer.java | 15 ++++++--------- .../deserializer/CollectionTypeDeserializer.java | 8 ++++++++ .../type/deserializer/ListTypeDeserializer.java | 11 ++++++----- .../type/deserializer/MapTypeDeserializer.java | 13 +++++++------ .../type/deserializer/SetTypeDeserializer.java | 11 ++++++----- .../type/deserializer/TypeDeserializer.java | 8 -------- 7 files changed, 38 insertions(+), 36 deletions(-) create mode 100644 debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/CollectionTypeDeserializer.java diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index 95417c14c..437bf9401 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -26,6 +26,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor; import org.apache.cassandra.db.commitlog.CommitLogReadHandler; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.cassandra.db.rows.Row; @@ -423,13 +424,14 @@ private void populateRegularColumns(RowData after, Row row, RowType rowType, Sch try { Object value; Object deletionTs = null; - if (cd.isComplex()) { + AbstractType abstractType = cd.type; + if (abstractType.isCollection()) { ComplexColumnData ccd = row.getComplexColumnData(cd); - value = CassandraTypeDeserializer.deserialize(cd.type, ccd); + value = CassandraTypeDeserializer.deserialize((CollectionType) abstractType, ccd); } else { org.apache.cassandra.db.rows.Cell cell = row.getCell(cd); - value = cell.isTombstone() ? null : CassandraTypeDeserializer.deserialize(cd.type, cell.value()); + value = cell.isTombstone() ? null : CassandraTypeDeserializer.deserialize(abstractType, cell.value()); deletionTs = cell.isExpiring() ? TimeUnit.MICROSECONDS.convert(cell.localDeletionTime(), TimeUnit.SECONDS) : null; } String name = cd.name.toString(); diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java index ff43f2451..f011a12a3 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java @@ -15,6 +15,7 @@ import org.apache.cassandra.db.marshal.BooleanType; import org.apache.cassandra.db.marshal.ByteType; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.CounterColumnType; import org.apache.cassandra.db.marshal.DecimalType; import org.apache.cassandra.db.marshal.DoubleType; @@ -44,6 +45,7 @@ import io.debezium.annotation.Immutable; import io.debezium.annotation.ThreadSafe; import io.debezium.connector.cassandra.transforms.type.deserializer.BasicTypeDeserializer; +import io.debezium.connector.cassandra.transforms.type.deserializer.CollectionTypeDeserializer; import io.debezium.connector.cassandra.transforms.type.deserializer.DurationTypeDeserializer; import io.debezium.connector.cassandra.transforms.type.deserializer.InetAddressDeserializer; import io.debezium.connector.cassandra.transforms.type.deserializer.ListTypeDeserializer; @@ -132,18 +134,13 @@ public static Object deserialize(AbstractType abstractType, ByteBuffer bb) { /** * Deserialize from cdc-log-sourced cassandra data. * - * @param abstractType the {@link AbstractType} of the collection column + * @param collectionType the {@link CollectionType} of the collection column * @param ccd the ComplexColumnData of the collection column to deserialize * @return the deserialized object. */ - public static Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { - // Check if abstract type is reversed, if yes, use the base type for deserialization. - if (abstractType.isReversed()) { - abstractType = ((ReversedType) abstractType).baseType; - } - - TypeDeserializer typeDeserializer = TYPE_MAP.get(abstractType.getClass()); - return typeDeserializer.deserialize(abstractType, ccd); + public static Object deserialize(CollectionType collectionType, ComplexColumnData ccd) { + TypeDeserializer typeDeserializer = TYPE_MAP.get(collectionType.getClass()); + return ((CollectionTypeDeserializer) typeDeserializer).deserialize(collectionType, ccd); } /** diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/CollectionTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/CollectionTypeDeserializer.java new file mode 100644 index 000000000..bff538079 --- /dev/null +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/CollectionTypeDeserializer.java @@ -0,0 +1,8 @@ +package io.debezium.connector.cassandra.transforms.type.deserializer; + +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.rows.ComplexColumnData; + +public abstract class CollectionTypeDeserializer extends TypeDeserializer { + public abstract Object deserialize(CollectionType collectionType, ComplexColumnData ccd); +} diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java index 4918597ff..23abed7fb 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/ListTypeDeserializer.java @@ -10,6 +10,7 @@ import java.util.List; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.kafka.connect.data.Schema; @@ -18,7 +19,7 @@ import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer; -public class ListTypeDeserializer extends TypeDeserializer { +public class ListTypeDeserializer extends CollectionTypeDeserializer { @Override @SuppressWarnings("unchecked") @@ -36,13 +37,13 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { } @Override - public Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { - List bbList = ((ListType) abstractType).serializedValues(ccd.iterator()); - AbstractType innerType = ((ListType) abstractType).getElementsType(); + public Object deserialize(CollectionType collectionType, ComplexColumnData ccd) { + List bbList = ((ListType) collectionType).serializedValues(ccd.iterator()); + AbstractType innerType = ((ListType) collectionType).getElementsType(); List deserializedList = new ArrayList<>(); for (ByteBuffer bb : bbList) { deserializedList.add(super.deserialize(innerType, bb)); } - return Values.convertToList(getSchemaBuilder(abstractType).build(), deserializedList); + return Values.convertToList(getSchemaBuilder(collectionType).build(), deserializedList); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java index c26676b16..8c49098af 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/MapTypeDeserializer.java @@ -11,6 +11,7 @@ import java.util.Map; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.kafka.connect.data.Schema; @@ -19,7 +20,7 @@ import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer; -public class MapTypeDeserializer extends TypeDeserializer { +public class MapTypeDeserializer extends CollectionTypeDeserializer { @Override public Object deserialize(AbstractType abstractType, ByteBuffer bb) { @@ -38,10 +39,10 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { } @Override - public Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { - List bbList = ((MapType) abstractType).serializedValues(ccd.iterator()); - AbstractType keyType = ((MapType) abstractType).getKeysType(); - AbstractType valueType = ((MapType) abstractType).getValuesType(); + public Object deserialize(CollectionType collectionType, ComplexColumnData ccd) { + List bbList = ((MapType) collectionType).serializedValues(ccd.iterator()); + AbstractType keyType = ((MapType) collectionType).getKeysType(); + AbstractType valueType = ((MapType) collectionType).getValuesType(); Map deserializedMap = new HashMap<>(); int i = 0; while (i < bbList.size()) { @@ -49,6 +50,6 @@ public Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { ByteBuffer vbb = bbList.get(i++); deserializedMap.put(super.deserialize(keyType, kbb), super.deserialize(valueType, vbb)); } - return Values.convertToMap(getSchemaBuilder(abstractType).build(), deserializedMap); + return Values.convertToMap(getSchemaBuilder(collectionType).build(), deserializedMap); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java index 6c0558fa4..e81ce3116 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/SetTypeDeserializer.java @@ -12,6 +12,7 @@ import java.util.Set; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.kafka.connect.data.Schema; @@ -20,7 +21,7 @@ import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer; -public class SetTypeDeserializer extends TypeDeserializer { +public class SetTypeDeserializer extends CollectionTypeDeserializer { @Override @SuppressWarnings("unchecked") @@ -39,14 +40,14 @@ public SchemaBuilder getSchemaBuilder(AbstractType abstractType) { } @Override - public Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { - List bbList = ((SetType) abstractType).serializedValues(ccd.iterator()); - AbstractType innerType = ((SetType) abstractType).getElementsType(); + public Object deserialize(CollectionType collectionType, ComplexColumnData ccd) { + List bbList = ((SetType) collectionType).serializedValues(ccd.iterator()); + AbstractType innerType = ((SetType) collectionType).getElementsType(); Set deserializedSet = new HashSet<>(); for (ByteBuffer bb : bbList) { deserializedSet.add(super.deserialize(innerType, bb)); } List deserializedList = new ArrayList<>(deserializedSet); - return Values.convertToList(getSchemaBuilder(abstractType).build(), deserializedList); + return Values.convertToList(getSchemaBuilder(collectionType).build(), deserializedList); } } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TypeDeserializer.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TypeDeserializer.java index 9a291160b..513299555 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TypeDeserializer.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/TypeDeserializer.java @@ -8,7 +8,6 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.kafka.connect.data.SchemaBuilder; public abstract class TypeDeserializer { @@ -17,12 +16,5 @@ public Object deserialize(AbstractType abstractType, ByteBuffer bb) { return abstractType.getSerializer().deserialize(bb); } - // This method will be overwritten in all complex-type deserializers - // including MapTypeDeserializer, SetTypeDeserializer and ListTypeDeserializer, - // and is not supposed to be called for deserialization of any non-complex type column. - public Object deserialize(AbstractType abstractType, ComplexColumnData ccd) { - return null; - } - public abstract SchemaBuilder getSchemaBuilder(AbstractType abstractType); }