DBZ-1405 Replace avro generic record with kafka connect struct.

This commit is contained in:
Bingqin Zhou 2019-09-09 13:05:03 -07:00 committed by Gunnar Morling
parent f6d25e6fb8
commit 58f43b4cae
15 changed files with 105 additions and 174 deletions

View File

@ -1,21 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.cassandra;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
/**
* An interface that indicates the record can be converted to a {@link GenericRecord}.
*/
public interface AvroRecord {
/**
* return an Avro GenericRecord based on the schema passed into the method
* @param schema of the generic record
* @return a GenericRecord
*/
GenericRecord record(Schema schema);
}

View File

@ -7,11 +7,10 @@
import com.datastax.driver.core.ColumnMetadata;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeToAvroSchemaMapper;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import java.util.Objects;
@ -20,7 +19,7 @@
* Cell-level data about the source event. Each cell contains the name, value and
* type of a column in a Cassandra table.
*/
public class CellData implements AvroRecord {
public class CellData {
/**
* The type of a column in a Cassandra table
*/
@ -63,30 +62,30 @@ public boolean isPrimary() {
return columnType == ColumnType.PARTITION || columnType == ColumnType.CLUSTERING;
}
@Override
public GenericRecord record(Schema schema) {
return new GenericRecordBuilder(schema)
.set(CELL_VALUE_KEY, value)
.set(CELL_DELETION_TS_KEY, deletionTs)
.set(CELL_SET_KEY, true)
.build();
public Struct record(Schema schema) {
return new Struct(schema)
.put(CELL_VALUE_KEY, value)
.put(CELL_DELETION_TS_KEY, deletionTs)
.put(CELL_SET_KEY, true);
}
static Schema cellSchema(ColumnMetadata cm) {
static Schema cellSchema(ColumnMetadata cm, boolean optional) {
AbstractType<?> convertedType = CassandraTypeConverter.convert(cm.getType());
Schema valueSchema = CassandraTypeToAvroSchemaMapper.getSchema(convertedType, true);
Schema valueSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).optional().build();
if (valueSchema != null) {
return SchemaBuilder.builder().record(cm.getName()).fields()
.name(CELL_VALUE_KEY).type(valueSchema).noDefault()
.name(CELL_DELETION_TS_KEY).type(CassandraTypeToAvroSchemaMapper.nullable(CassandraTypeToAvroSchemaMapper.LONG_TYPE)).withDefault(null)
.name(CELL_SET_KEY).type().booleanType().noDefault()
.endRecord();
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(cm.getName())
.field(CELL_VALUE_KEY, valueSchema)
.field(CELL_DELETION_TS_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(CELL_SET_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA);
if (optional) {
schemaBuilder.optional();
}
return schemaBuilder.build();
} else {
return null;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -101,12 +100,10 @@ public boolean equals(Object o) {
&& columnType == that.columnType;
}
@Override
public int hashCode() {
return Objects.hash(name, value, deletionTs, columnType);
}
@Override
public String toString() {
return "{"
+ "name=" + name

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.cassandra;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
/**
* An internal representation of a create/update/delete event.

View File

@ -10,7 +10,7 @@
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.LivenessInfo;

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.cassandra;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@ -25,7 +25,7 @@
public class KafkaRecordEmitter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRecordEmitter.class);
private final KafkaProducer<GenericRecord, GenericRecord> producer;
private final KafkaProducer<Struct, Struct> producer;
private final CassandraTopicSelector topicSelector;
private final OffsetWriter offsetWriter;
private final OffsetFlushPolicy offsetFlushPolicy;
@ -43,14 +43,14 @@ public KafkaRecordEmitter(String kafkaTopicPrefix, Properties kafkaProperties, O
public void emit(Record record) {
synchronized (lock) {
ProducerRecord<GenericRecord, GenericRecord> producerRecord = toProducerRecord(record);
ProducerRecord<Struct, Struct> producerRecord = toProducerRecord(record);
Future<RecordMetadata> future = producer.send(producerRecord);
futures.put(record, future);
maybeFlushAndMarkOffset();
}
}
private ProducerRecord<GenericRecord, GenericRecord> toProducerRecord(Record record) {
private ProducerRecord<Struct, Struct> toProducerRecord(Record record) {
String topic = topicSelector.topicNameFor(record.getSource().keyspaceTable);
return new ProducerRecord<>(topic, record.buildKey(), record.buildValue());
}

View File

@ -8,11 +8,10 @@
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.cassandra.transforms.CassandraTypeConverter;
import io.debezium.connector.cassandra.transforms.CassandraTypeToAvroSchemaMapper;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.cassandra.db.marshal.AbstractType;
import java.util.List;
@ -22,7 +21,7 @@
/**
* An immutable data structure representing a change event, and can be converted
* to a GenericRecord representing key/value of the change event.
* to a kafka connect Struct representing key/value of the change event.
*/
public abstract class Record implements Event {
static final String NAMESPACE = "io.debezium.connector.cassandra";
@ -70,57 +69,56 @@ public String toString() {
this.ts = ts;
}
public GenericRecord buildKey() {
public Struct buildKey() {
if (keySchema == null) {
return null;
}
List<CellData> primary = rowData.getPrimary();
GenericRecordBuilder builder = new GenericRecordBuilder(keySchema);
Struct struct = new Struct(keySchema);
for (CellData cellData : primary) {
builder.set(cellData.name, cellData.value);
struct.put(cellData.name, cellData.value);
}
return builder.build();
return struct;
}
public GenericRecord buildValue() {
public Struct buildValue() {
if (valueSchema == null) {
return null;
}
return new GenericRecordBuilder(valueSchema)
.set(TIMESTAMP, ts)
.set(OPERATION, op.getValue())
.set(SOURCE, source.record(getFieldSchema(SOURCE, valueSchema)))
.set(AFTER, rowData.record(getFieldSchema(AFTER, valueSchema)))
.build();
return new Struct(valueSchema)
.put(TIMESTAMP, ts)
.put(OPERATION, op.getValue())
.put(SOURCE, source.record(getFieldSchema(SOURCE, valueSchema)))
.put(AFTER, rowData.record(getFieldSchema(AFTER, valueSchema)));
}
public static Schema keySchema(String connectorName, TableMetadata tm) {
if (tm == null) {
return null;
}
SchemaBuilder.FieldAssembler assembler = SchemaBuilder.builder().record(getKeyName(connectorName, tm)).namespace(NAMESPACE).fields();
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(NAMESPACE+"."+getKeyName(connectorName, tm));
for (ColumnMetadata cm : tm.getPrimaryKey()) {
AbstractType<?> convertedType = CassandraTypeConverter.convert(cm.getType());
Schema colSchema = CassandraTypeToAvroSchemaMapper.getSchema(convertedType, false);
Schema colSchema = CassandraTypeDeserializer.getSchemaBuilder(convertedType).build();
if (colSchema != null) {
assembler.name(cm.getName()).type(colSchema).noDefault();
schemaBuilder.field(cm.getName(), colSchema);
}
}
return (Schema) assembler.endRecord();
return schemaBuilder.build();
}
public static Schema valueSchema(String connectorName, TableMetadata tm) {
if (tm == null) {
return null;
}
return SchemaBuilder.builder().record(getValueName(connectorName, tm)).namespace(NAMESPACE).fields()
.name(TIMESTAMP).type().longType().noDefault()
.name(OPERATION).type().stringType().noDefault()
.name(SOURCE).type(SourceInfo.SOURCE_SCHEMA).noDefault()
.name(AFTER).type(RowData.rowSchema(tm)).noDefault()
.endRecord();
return SchemaBuilder.struct().name(NAMESPACE+"."+getValueName(connectorName, tm))
.field(TIMESTAMP, Schema.INT64_SCHEMA)
.field(OPERATION, Schema.STRING_SCHEMA)
.field(SOURCE, SourceInfo.SOURCE_SCHEMA)
.field(AFTER, RowData.rowSchema(tm))
.build();
}
@Override

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.cassandra;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
import java.util.function.Consumer;

View File

@ -7,10 +7,10 @@
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.TableMetadata;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Field;
import java.util.LinkedHashMap;
import java.util.List;
@ -18,14 +18,13 @@
import java.util.Objects;
import java.util.stream.Collectors;
import static io.debezium.connector.cassandra.Record.AFTER;
import static io.debezium.connector.cassandra.SchemaHolder.getFieldSchema;
/**
* Row-level data about the source event. Contains a map where the key is the table column
* name and the value is the {@link CellData}.
*/
public class RowData implements AvroRecord {
public class RowData {
private final Map<String, CellData> cellMap = new LinkedHashMap<>();
public void addCell(CellData cellData) {
@ -42,23 +41,17 @@ public boolean hasCell(String columnName) {
return cellMap.containsKey(columnName);
}
@Override
public GenericRecord record(Schema schema) {
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (Schema.Field field : schema.getFields()) {
// note: calling getFieldSchema twice since the field has UNION type,
// and we only want to extract the non-null field schema in the UNION type.
// they currently have the same name
Schema unionSchema = getFieldSchema(field.name(), schema);
Schema cellSchema = getFieldSchema(field.name(), unionSchema);
public Struct record(Schema schema) {
Struct struct = new Struct(schema);
for (Field field : schema.fields()) {
Schema cellSchema = getFieldSchema(field.name(), schema);
CellData cellData = cellMap.get(field.name());
// only add the cell if it is not null
if (cellData != null) {
builder.set(field.name(), cellData.record(cellSchema));
struct.put(field.name(), cellData.record(cellSchema));
}
}
return builder.build();
return struct;
}
public RowData copy() {
@ -70,41 +63,30 @@ public RowData copy() {
}
/**
* Assemble the Avro {@link Schema} for the "after" field of the change event
* Assemble the Kafka connect {@link Schema} for the "after" field of the change event
* based on the Cassandra table schema.
* @param tm metadata of a table that contains the Cassandra table schema
* @return a schema for the "after" field of a change event
*/
static Schema rowSchema(TableMetadata tm) {
SchemaBuilder.FieldAssembler assembler = SchemaBuilder.builder().record(AFTER).fields();
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(Record.AFTER);
for (ColumnMetadata cm : tm.getColumns()) {
Schema nullableCellSchema = nullableCellSchema(cm);
if (nullableCellSchema != null) {
assembler.name(cm.getName()).type(nullableCellSchema).withDefault(null);
Schema optionalCellSchema = CellData.cellSchema(cm, true);
if (optionalCellSchema != null) {
schemaBuilder.field(cm.getName(), optionalCellSchema);
}
}
return (Schema) assembler.endRecord();
}
private static Schema nullableCellSchema(ColumnMetadata cm) {
Schema cellSchema = CellData.cellSchema(cm);
if (cellSchema != null) {
return SchemaBuilder.builder().unionOf().nullType().and().type(cellSchema).endUnion();
} else {
return null;
}
return schemaBuilder.build();
}
List<CellData> getPrimary() {
return this.cellMap.values().stream().filter(CellData::isPrimary).collect(Collectors.toList());
}
@Override
public String toString() {
return this.cellMap.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -116,7 +98,6 @@ public boolean equals(Object o) {
return Objects.equals(cellMap, rowData.cellMap);
}
@Override
public int hashCode() {
return Objects.hash(cellMap);
}

View File

@ -7,12 +7,11 @@
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorSchemaException;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -65,19 +64,10 @@ public Set<TableMetadata> getCdcEnabledTableMetadataSet() {
* @return Schema
*/
public static Schema getFieldSchema(String fieldName, Schema schema) {
if (schema.getType().equals(Schema.Type.UNION)) {
List<Schema> unionOfSchemas = schema.getTypes();
for (Schema innerSchema : unionOfSchemas) {
if (innerSchema.getName().equals(fieldName)) {
return innerSchema;
}
}
throw new CassandraConnectorSchemaException("Union type does not contain field " + fieldName);
} else if (schema.getType().equals(Schema.Type.RECORD)) {
return schema.getField(fieldName).schema();
} else {
throw new CassandraConnectorSchemaException("Only UNION and RECORD types are supported for this method, but encountered " + schema.getType());
if (schema.type().equals(Schema.Type.STRUCT)) {
return schema.field(fieldName).schema();
}
throw new CassandraConnectorSchemaException("Only STRUCT type is supported for this method, but encountered " + schema.type());
}
private void refreshSchema(KeyspaceTable keyspaceTable) {

View File

@ -16,7 +16,7 @@
import com.datastax.driver.core.querybuilder.Select;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -5,22 +5,19 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.transforms.CassandraTypeToAvroSchemaMapper;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static io.debezium.connector.cassandra.Record.SOURCE;
/**
* Metadata about the source of the change event
*/
public class SourceInfo implements AvroRecord {
public class SourceInfo {
public static final String DEBEZIUM_VERSION_KEY = "version";
public static final String DEBEZIUM_CONNECTOR_KEY = "connector";
public static final String CLUSTER_KEY = "cluster";
@ -31,17 +28,15 @@ public class SourceInfo implements AvroRecord {
public static final String SNAPSHOT_KEY = "snapshot";
public static final String TIMESTAMP_KEY = "ts_micro";
public static final Schema SOURCE_SCHEMA = SchemaBuilder.builder().record(SOURCE).fields()
.requiredString(DEBEZIUM_VERSION_KEY)
.requiredString(DEBEZIUM_CONNECTOR_KEY)
.requiredString(CLUSTER_KEY)
.requiredString(COMMITLOG_FILENAME_KEY)
.requiredInt(COMMITLOG_POSITION_KEY)
.requiredBoolean(SNAPSHOT_KEY)
.requiredString(KEYSPACE_NAME_KEY)
.requiredString(TABLE_NAME_KEY)
.name(TIMESTAMP_KEY).type(CassandraTypeToAvroSchemaMapper.TIMESTAMP_MICRO_TYPE).noDefault()
.endRecord();
public static final Schema SOURCE_SCHEMA = SchemaBuilder.struct().name(Record.SOURCE)
.field(CLUSTER_KEY, Schema.STRING_SCHEMA)
.field(COMMITLOG_FILENAME_KEY, Schema.STRING_SCHEMA)
.field(COMMITLOG_POSITION_KEY, Schema.INT32_SCHEMA)
.field(SNAPSHOT_KEY, Schema.BOOLEAN_SCHEMA)
.field(KEYSPACE_NAME_KEY, Schema.STRING_SCHEMA)
.field(TABLE_NAME_KEY, Schema.STRING_SCHEMA)
.field(TIMESTAMP_KEY, CassandraTypeKafkaSchemaBuilders.TIMESTAMP_MICRO_TYPE)
.build();
public final String version = Module.version();
public final String connector = Module.name();
@ -59,22 +54,18 @@ public SourceInfo(String cluster, OffsetPosition offsetPosition, KeyspaceTable k
this.snapshot = snapshot;
}
@Override
public GenericRecord record(Schema schema) {
return new GenericRecordBuilder(schema)
.set(DEBEZIUM_VERSION_KEY, version)
.set(DEBEZIUM_CONNECTOR_KEY, connector)
.set(CLUSTER_KEY, cluster)
.set(COMMITLOG_FILENAME_KEY, offsetPosition.fileName)
.set(COMMITLOG_POSITION_KEY, offsetPosition.filePosition)
.set(SNAPSHOT_KEY, snapshot)
.set(KEYSPACE_NAME_KEY, keyspaceTable.keyspace)
.set(TABLE_NAME_KEY, keyspaceTable.table)
.set(TIMESTAMP_KEY, tsMicro)
.build();
public Struct record(Schema schema) {
return new Struct(schema)
.put(CLUSTER_KEY, cluster)
.put(COMMITLOG_FILENAME_KEY, offsetPosition.fileName)
.put(COMMITLOG_POSITION_KEY, offsetPosition.filePosition)
.put(SNAPSHOT_KEY, snapshot)
.put(KEYSPACE_NAME_KEY, keyspaceTable.keyspace)
.put(TABLE_NAME_KEY, keyspaceTable.table)
.put(TIMESTAMP_KEY, tsMicro);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
@ -92,12 +83,10 @@ public boolean equals(Object o) {
&& tsMicro == that.tsMicro;
}
@Override
public int hashCode() {
return Objects.hash(cluster, snapshot, offsetPosition, keyspaceTable, tsMicro);
}
@Override
public String toString() {
Map<String, Object> map = new HashMap<>();
map.put(DEBEZIUM_VERSION_KEY, version);

View File

@ -5,7 +5,7 @@
*/
package io.debezium.connector.cassandra;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Schema;
/**
* A TombstoneRecord is a record which has the same key as a delete event but has null value.

View File

@ -15,10 +15,7 @@
public class DurationTypeDeserializer extends BasicTypeDeserializer {
/*
* According to the official spec, Avro has a duration type of (almost) the same format of the cassandra
* duration type, but sadly it's not actually represented in the code anywhere!
* issue: https://issues.apache.org/jira/browse/AVRO-2123
* So, for now at least, duration is serialized into a record with fields months, days, and nanos.
* Cassandra Duration type is serialized into a struct with fields months, days, and nanos.
*/
public DurationTypeDeserializer() {

View File

@ -6,7 +6,7 @@
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import org.apache.avro.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.junit.Before;
import org.junit.Test;
@ -126,7 +126,7 @@ public void testTwoFileWriterCannotCoexist() throws IOException {
private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
SourceInfo source = new SourceInfo("test-cluster", offsetPosition, keyspaceTable, isSnapshot, System.currentTimeMillis() * 1000);
return new ChangeRecord(source, new RowData(), SchemaBuilder.builder().intType(), SchemaBuilder.builder().intType(), Record.Operation.INSERT, markOffset);
return new ChangeRecord(source, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset);
}
private boolean isProcessed(ChangeRecord record) {

View File

@ -5,8 +5,8 @@
*/
package io.debezium.connector.cassandra;
import io.debezium.connector.cassandra.transforms.CassandraTypeToAvroSchemaMapper;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -46,7 +46,7 @@ public void testProcessChangeRecords() throws Exception {
BlockingEventQueue<Event> queue = context.getQueue();
for (int i = 0; i < recordSize; i++) {
SourceInfo sourceInfo = new SourceInfo(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, System.currentTimeMillis() * 1000);
Record record = new ChangeRecord(sourceInfo, new RowData(), CassandraTypeToAvroSchemaMapper.INT_TYPE, CassandraTypeToAvroSchemaMapper.INT_TYPE, Record.Operation.INSERT, false);
Record record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
queue.enqueue(record);
}
@ -64,7 +64,7 @@ public void testProcessTombstoneRecords() throws Exception {
BlockingEventQueue<Event> queue = context.getQueue();
for (int i = 0; i < recordSize; i++) {
SourceInfo sourceInfo = new SourceInfo(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, System.currentTimeMillis() * 1000);
Record record = new TombstoneRecord(sourceInfo, new RowData(), CassandraTypeToAvroSchemaMapper.INT_TYPE);
Record record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA);
queue.enqueue(record);
}