DBZ-1405 Using NanoDuration instead of MicroDuration;

Also making CassandraTypeDeserializer immutable.
This commit is contained in:
Gunnar Morling 2019-09-24 12:39:14 +02:00
parent 2ce6c7d1e3
commit 3e424beb53
4 changed files with 92 additions and 80 deletions

View File

@ -5,19 +5,11 @@
*/ */
package io.debezium.connector.cassandra.transforms; package io.debezium.connector.cassandra.transforms;
import com.datastax.driver.core.DataType; import java.nio.ByteBuffer;
import io.debezium.connector.cassandra.transforms.type.deserializer.BasicTypeDeserializer; import java.util.Collections;
import io.debezium.connector.cassandra.transforms.type.deserializer.DurationTypeDeserializer; import java.util.HashMap;
import io.debezium.connector.cassandra.transforms.type.deserializer.InetAddressDeserializer; import java.util.Map;
import io.debezium.connector.cassandra.transforms.type.deserializer.ListTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.MapTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.SetTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TimeUUIDTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TimestampTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TupleTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.UUIDTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.UserTypeDeserializer;
import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BooleanType; import org.apache.cassandra.db.marshal.BooleanType;
@ -45,41 +37,60 @@
import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.marshal.UserType;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import java.nio.ByteBuffer; import com.datastax.driver.core.DataType;
import java.util.HashMap;
import java.util.Map;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.cassandra.transforms.type.deserializer.BasicTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.DurationTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.InetAddressDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.ListTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.MapTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.SetTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TimeUUIDTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TimestampTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TupleTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.TypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.UUIDTypeDeserializer;
import io.debezium.connector.cassandra.transforms.type.deserializer.UserTypeDeserializer;
@ThreadSafe
@Immutable
public final class CassandraTypeDeserializer { public final class CassandraTypeDeserializer {
private CassandraTypeDeserializer() { } private CassandraTypeDeserializer() { }
private static final Map<Class<? extends AbstractType>, TypeDeserializer> typeMap = new HashMap<>(); private static final Map<Class<? extends AbstractType>, TypeDeserializer> TYPE_MAP;
static { static {
typeMap.put(AsciiType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.STRING_TYPE)); Map<Class<? extends AbstractType>, TypeDeserializer> tmp = new HashMap<>();
typeMap.put(BooleanType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BOOLEAN_TYPE));
typeMap.put(BytesType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BYTES_TYPE)); tmp.put(AsciiType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.STRING_TYPE));
typeMap.put(ByteType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BYTE_TYPE)); tmp.put(BooleanType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BOOLEAN_TYPE));
typeMap.put(CounterColumnType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(BytesType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BYTES_TYPE));
typeMap.put(DecimalType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(ByteType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.BYTE_TYPE));
typeMap.put(DoubleType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE)); tmp.put(CounterColumnType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE));
typeMap.put(DurationType.class, new DurationTypeDeserializer()); tmp.put(DecimalType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE));
typeMap.put(FloatType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.FLOAT_TYPE)); tmp.put(DoubleType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DOUBLE_TYPE));
typeMap.put(InetAddressType.class, new InetAddressDeserializer()); tmp.put(DurationType.class, new DurationTypeDeserializer());
typeMap.put(Int32Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.INT_TYPE)); tmp.put(FloatType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.FLOAT_TYPE));
typeMap.put(ListType.class, new ListTypeDeserializer()); tmp.put(InetAddressType.class, new InetAddressDeserializer());
typeMap.put(LongType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(Int32Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.INT_TYPE));
typeMap.put(MapType.class, new MapTypeDeserializer()); tmp.put(ListType.class, new ListTypeDeserializer());
typeMap.put(SetType.class, new SetTypeDeserializer()); tmp.put(LongType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE));
typeMap.put(ShortType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.SHORT_TYPE)); tmp.put(MapType.class, new MapTypeDeserializer());
typeMap.put(SimpleDateType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DATE_TYPE)); tmp.put(SetType.class, new SetTypeDeserializer());
typeMap.put(TimeType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE)); tmp.put(ShortType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.SHORT_TYPE));
typeMap.put(TimestampType.class, new TimestampTypeDeserializer()); tmp.put(SimpleDateType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.DATE_TYPE));
typeMap.put(TimeUUIDType.class, new TimeUUIDTypeDeserializer()); tmp.put(TimeType.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.LONG_TYPE));
typeMap.put(TupleType.class, new TupleTypeDeserializer()); tmp.put(TimestampType.class, new TimestampTypeDeserializer());
typeMap.put(UserType.class, new UserTypeDeserializer()); tmp.put(TimeUUIDType.class, new TimeUUIDTypeDeserializer());
typeMap.put(UTF8Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.STRING_TYPE)); tmp.put(TupleType.class, new TupleTypeDeserializer());
typeMap.put(UUIDType.class, new UUIDTypeDeserializer()); tmp.put(UserType.class, new UserTypeDeserializer());
tmp.put(UTF8Type.class, new BasicTypeDeserializer(CassandraTypeKafkaSchemaBuilders.STRING_TYPE));
tmp.put(UUIDType.class, new UUIDTypeDeserializer());
TYPE_MAP = Collections.unmodifiableMap(tmp);
} }
/** /**
@ -106,7 +117,7 @@ public static Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
return null; return null;
} }
TypeDeserializer typeDeserializer = typeMap.get(abstractType.getClass()); TypeDeserializer typeDeserializer = TYPE_MAP.get(abstractType.getClass());
return typeDeserializer.deserialize(abstractType, bb); return typeDeserializer.deserialize(abstractType, bb);
} }
@ -116,8 +127,7 @@ public static Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
* @return the kafka connect SchemaBuilder object * @return the kafka connect SchemaBuilder object
*/ */
public static SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) { public static SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
TypeDeserializer typeDeserializer = typeMap.get(abstractType.getClass()); TypeDeserializer typeDeserializer = TYPE_MAP.get(abstractType.getClass());
return typeDeserializer.getSchemaBuilder(abstractType); return typeDeserializer.getSchemaBuilder(abstractType);
} }
} }

View File

@ -5,17 +5,18 @@
*/ */
package io.debezium.connector.cassandra.transforms; package io.debezium.connector.cassandra.transforms;
import com.datastax.driver.core.DataType;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import com.datastax.driver.core.DataType;
import io.debezium.data.Uuid; import io.debezium.data.Uuid;
import io.debezium.time.Date; import io.debezium.time.Date;
import io.debezium.time.Timestamp;
import io.debezium.time.MicroTimestamp; import io.debezium.time.MicroTimestamp;
import io.debezium.time.MicroDuration; import io.debezium.time.NanoDuration;
import io.debezium.time.Timestamp;
/** /**
* Class that maps Cassandra's {@link DataType} to its corresponding kafka connect {@link SchemaBuilder}. * Class that maps Cassandra's {@link DataType} to its corresponding Kafka Connect {@link SchemaBuilder}.
*/ */
public final class CassandraTypeKafkaSchemaBuilders { public final class CassandraTypeKafkaSchemaBuilders {
@ -35,6 +36,5 @@ public final class CassandraTypeKafkaSchemaBuilders {
public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder(); public static final SchemaBuilder TIMESTAMP_MILLI_TYPE = Timestamp.builder();
public static final SchemaBuilder TIMESTAMP_MICRO_TYPE = MicroTimestamp.builder(); public static final SchemaBuilder TIMESTAMP_MICRO_TYPE = MicroTimestamp.builder();
public static final SchemaBuilder UUID_TYPE = Uuid.builder(); public static final SchemaBuilder UUID_TYPE = Uuid.builder();
public static final SchemaBuilder DURATION_TYPE = MicroDuration.builder(); public static final SchemaBuilder DURATION_TYPE = NanoDuration.builder();
} }

View File

@ -5,12 +5,13 @@
*/ */
package io.debezium.connector.cassandra.transforms.type.deserializer; package io.debezium.connector.cassandra.transforms.type.deserializer;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders; import java.nio.ByteBuffer;
import io.debezium.time.MicroDuration;
import org.apache.cassandra.cql3.Duration; import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AbstractType;
import java.nio.ByteBuffer; import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
import io.debezium.time.NanoDuration;
public class DurationTypeDeserializer extends BasicTypeDeserializer { public class DurationTypeDeserializer extends BasicTypeDeserializer {
/* /*
@ -27,6 +28,6 @@ public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
int months = duration.getMonths(); int months = duration.getMonths();
int days = duration.getDays(); int days = duration.getDays();
long nanoSec = duration.getNanoseconds(); long nanoSec = duration.getNanoseconds();
return MicroDuration.durationMicros(0, months, days, 0, 0, 0, (int) nanoSec/1000, 0.0D); return NanoDuration.durationNanos(0, months, days, 0, 0, 0, nanoSec);
} }
} }

View File

@ -5,11 +5,22 @@
*/ */
package io.debezium.connector.cassandra.transforms; package io.debezium.connector.cassandra.transforms;
import com.datastax.driver.core.DataType; import java.math.BigDecimal;
import io.debezium.time.MicroDuration; import java.net.InetAddress;
import org.apache.kafka.connect.data.Struct; import java.net.UnknownHostException;
import org.apache.kafka.connect.data.Schema; import java.nio.ByteBuffer;
import org.apache.kafka.connect.data.Values; import java.nio.charset.Charset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.cassandra.cql3.Duration; import org.apache.cassandra.cql3.Duration;
import org.apache.cassandra.cql3.FieldIdentifier; import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryOptions;
@ -39,23 +50,13 @@
import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.marshal.UserType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Values;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.math.BigDecimal; import com.datastax.driver.core.DataType;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/** /**
* This class ONLY tests the {@link CassandraTypeDeserializer#deserialize(AbstractType, ByteBuffer)} * This class ONLY tests the {@link CassandraTypeDeserializer#deserialize(AbstractType, ByteBuffer)}
@ -149,13 +150,13 @@ public void testDoubleType() {
public void testDurationType() { public void testDurationType() {
Duration sourceDuration = Duration.newInstance(1, 3, 500); Duration sourceDuration = Duration.newInstance(1, 3, 500);
double expectedMicroDuration = MicroDuration.durationMicros(0, 1, 3, 0, 0, 0, 500/1000, 0.0D); long expectedNanoDuration = (30 + 3) * ChronoUnit.DAYS.getDuration().toNanos() + 500;
ByteBuffer serializedDuration = DurationType.instance.decompose(sourceDuration); ByteBuffer serializedDuration = DurationType.instance.decompose(sourceDuration);
Object deserializedDuration = CassandraTypeDeserializer.deserialize(DurationType.instance, serializedDuration); Object deserializedDuration = CassandraTypeDeserializer.deserialize(DurationType.instance, serializedDuration);
Assert.assertEquals(expectedMicroDuration, deserializedDuration); Assert.assertEquals(expectedNanoDuration, deserializedDuration);
} }
@Test @Test
@ -269,7 +270,7 @@ public void testSetType() {
// non-frozen // non-frozen
SetType<Float> nonFrozenSetType = SetType.getInstance(FloatType.instance, true); SetType<Float> nonFrozenSetType = SetType.getInstance(FloatType.instance, true);
ByteBuffer serializedSet = nonFrozenSetType.decompose(sourceSet); ByteBuffer serializedSet = nonFrozenSetType.decompose(sourceSet);
Collection<?> deserializedSet = (Collection) CassandraTypeDeserializer.deserialize(nonFrozenSetType, serializedSet); Collection<?> deserializedSet = (Collection<?>) CassandraTypeDeserializer.deserialize(nonFrozenSetType, serializedSet);
// order may be different in the resulting collection. // order may be different in the resulting collection.
Assert.assertTrue(sourceSet.containsAll(deserializedSet)); Assert.assertTrue(sourceSet.containsAll(deserializedSet));
Assert.assertTrue(deserializedSet.containsAll(sourceSet)); Assert.assertTrue(deserializedSet.containsAll(sourceSet));
@ -277,7 +278,7 @@ public void testSetType() {
// frozen // frozen
SetType<Float> frozenSetType = SetType.getInstance(FloatType.instance, false); SetType<Float> frozenSetType = SetType.getInstance(FloatType.instance, false);
serializedSet = frozenSetType.decompose(sourceSet); serializedSet = frozenSetType.decompose(sourceSet);
deserializedSet = (Collection) CassandraTypeDeserializer.deserialize(frozenSetType, serializedSet); deserializedSet = (Collection<?>) CassandraTypeDeserializer.deserialize(frozenSetType, serializedSet);
Assert.assertTrue(sourceSet.containsAll(deserializedSet)); Assert.assertTrue(sourceSet.containsAll(deserializedSet));
Assert.assertTrue(deserializedSet.containsAll(sourceSet)); Assert.assertTrue(deserializedSet.containsAll(sourceSet));
} }
@ -380,12 +381,12 @@ public void testUserType() {
Schema userSchema = CassandraTypeDeserializer.getSchemaBuilder(userType).build(); Schema userSchema = CassandraTypeDeserializer.getSchemaBuilder(userType).build();
double expectedMicroDuration = MicroDuration.durationMicros(0, 1, 2, 0, 0, 0, (int) 3L/1000, 0.0D); long expectedNanoDuration = (30 + 2) * ChronoUnit.DAYS.getDuration().toNanos() + 3;
Struct expectedUserTypeData = new Struct(userSchema) Struct expectedUserTypeData = new Struct(userSchema)
.put("asciiField", "foobar") .put("asciiField", "foobar")
.put("doubleField", 1.5d) .put("doubleField", 1.5d)
.put("durationField", expectedMicroDuration); .put("durationField", expectedNanoDuration);
Map<String, Object> jsonObject = new HashMap<>(3); Map<String, Object> jsonObject = new HashMap<>(3);
jsonObject.put("\"asciiField\"", "foobar"); jsonObject.put("\"asciiField\"", "foobar");