DBZ-2117 Fixing ttl() and write_time() error for collection-type column during snapshot
This commit is contained in:
parent
c689d99415
commit
65006361b6
@ -33,6 +33,7 @@
|
|||||||
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
|
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
|
||||||
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
|
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
|
||||||
import io.debezium.time.Conversions;
|
import io.debezium.time.Conversions;
|
||||||
|
import io.debezium.util.Collect;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This reader is responsible for initial bootstrapping of a table,
|
* This reader is responsible for initial bootstrapping of a table,
|
||||||
@ -50,6 +51,7 @@ public class SnapshotProcessor extends AbstractProcessor {
|
|||||||
private static final String NAME = "Snapshot Processor";
|
private static final String NAME = "Snapshot Processor";
|
||||||
private static final String CASSANDRA_NOW_UNIXTIMESTAMP = "UNIXTIMESTAMPOF(NOW())";
|
private static final String CASSANDRA_NOW_UNIXTIMESTAMP = "UNIXTIMESTAMPOF(NOW())";
|
||||||
private static final String EXECUTION_TIME_ALIAS = "execution_time";
|
private static final String EXECUTION_TIME_ALIAS = "execution_time";
|
||||||
|
private static final Set<DataType.Name> collectionTypes = Collect.unmodifiableSet(DataType.Name.LIST, DataType.Name.SET, DataType.Name.MAP);
|
||||||
|
|
||||||
private final CassandraClient cassandraClient;
|
private final CassandraClient cassandraClient;
|
||||||
private final ChangeEventQueue<Event> queue;
|
private final ChangeEventQueue<Event> queue;
|
||||||
@ -171,14 +173,15 @@ private void takeTableSnapshot(TableMetadata tableMetadata) throws IOException {
|
|||||||
private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetadata) {
|
private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetadata) {
|
||||||
List<String> allCols = tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
|
List<String> allCols = tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(Collectors.toList());
|
||||||
Set<String> primaryCols = tableMetadata.getPrimaryKey().stream().map(ColumnMetadata::getName).collect(Collectors.toSet());
|
Set<String> primaryCols = tableMetadata.getPrimaryKey().stream().map(ColumnMetadata::getName).collect(Collectors.toSet());
|
||||||
|
List<String> collectionCols = tableMetadata.getColumns().stream().filter(cm -> collectionTypes.contains(cm.getType().getName()))
|
||||||
|
.map(ColumnMetadata::getName).collect(Collectors.toList());
|
||||||
|
|
||||||
Select.Selection selection = QueryBuilder.select().raw(CASSANDRA_NOW_UNIXTIMESTAMP).as(EXECUTION_TIME_ALIAS);
|
Select.Selection selection = QueryBuilder.select().raw(CASSANDRA_NOW_UNIXTIMESTAMP).as(EXECUTION_TIME_ALIAS);
|
||||||
for (String col : allCols) {
|
for (String col : allCols) {
|
||||||
selection.column(withQuotes(col));
|
selection.column(withQuotes(col));
|
||||||
|
|
||||||
if (!primaryCols.contains(col)) {
|
if (!primaryCols.contains(col) && !collectionCols.contains(col)) {
|
||||||
selection.ttl(withQuotes(col)).as(ttlAlias(col));
|
selection.ttl(withQuotes(col)).as(ttlAlias(col));
|
||||||
selection.writeTime(withQuotes(col)).as(writetimeAlias(col));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return selection.from(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
|
return selection.from(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
|
||||||
@ -209,12 +212,12 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
|
|||||||
while (rowIter.hasNext()) {
|
while (rowIter.hasNext()) {
|
||||||
if (isRunning()) {
|
if (isRunning()) {
|
||||||
Row row = rowIter.next();
|
Row row = rowIter.next();
|
||||||
WriteTimeHolder writeTimeHolder = new WriteTimeHolder();
|
Object executionTime = readExecutionTime(row);
|
||||||
RowData after = extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, writeTimeHolder);
|
RowData after = extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, executionTime);
|
||||||
// only mark offset if there are no more rows left
|
// only mark offset if there are no more rows left
|
||||||
boolean markOffset = !rowIter.hasNext();
|
boolean markOffset = !rowIter.hasNext();
|
||||||
recordMaker.insert(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(),
|
recordMaker.insert(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(),
|
||||||
keyspaceTable, true, Conversions.toInstantFromMicros(writeTimeHolder.get()),
|
keyspaceTable, true, Conversions.toInstantFromMicros(TimeUnit.MICROSECONDS.convert((long) executionTime, TimeUnit.MILLISECONDS)),
|
||||||
after, keySchema, valueSchema, markOffset, queue::enqueue);
|
after, keySchema, valueSchema, markOffset, queue::enqueue);
|
||||||
rowNum++;
|
rowNum++;
|
||||||
if (rowNum % 10_000 == 0) {
|
if (rowNum % 10_000 == 0) {
|
||||||
@ -234,27 +237,20 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
|
|||||||
/**
|
/**
|
||||||
* This function extracts the relevant row data from {@link Row} and updates the maximum writetime for each row.
|
* This function extracts the relevant row data from {@link Row} and updates the maximum writetime for each row.
|
||||||
*/
|
*/
|
||||||
private static RowData extractRowData(Row row, List<ColumnMetadata> columns, Set<String> partitionKeyNames, Set<String> clusteringKeyNames,
|
private static RowData extractRowData(Row row, List<ColumnMetadata> columns, Set<String> partitionKeyNames, Set<String> clusteringKeyNames, Object executionTime) {
|
||||||
WriteTimeHolder writeTimeHolder) {
|
|
||||||
RowData rowData = new RowData();
|
RowData rowData = new RowData();
|
||||||
|
|
||||||
Object executionTime = readExecutionTime(row);
|
|
||||||
for (ColumnMetadata columnMetadata : columns) {
|
for (ColumnMetadata columnMetadata : columns) {
|
||||||
String name = columnMetadata.getName();
|
String name = columnMetadata.getName();
|
||||||
Object value = readCol(row, name, columnMetadata);
|
Object value = readCol(row, name, columnMetadata);
|
||||||
Object deletionTs = null;
|
Object deletionTs = null;
|
||||||
CellData.ColumnType type = getType(name, partitionKeyNames, clusteringKeyNames);
|
CellData.ColumnType type = getType(name, partitionKeyNames, clusteringKeyNames);
|
||||||
|
|
||||||
if (type == CellData.ColumnType.REGULAR && value != null) {
|
if (type == CellData.ColumnType.REGULAR && value != null && !collectionTypes.contains(columnMetadata.getType().getName())) {
|
||||||
Object ttl = readColTtl(row, name);
|
Object ttl = readColTtl(row, name);
|
||||||
if (ttl != null && executionTime != null) {
|
if (ttl != null && executionTime != null) {
|
||||||
deletionTs = calculateDeletionTs(executionTime, ttl);
|
deletionTs = calculateDeletionTs(executionTime, ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
Object writeTime = readColWritetime(row, name);
|
|
||||||
if (writeTime != null) {
|
|
||||||
writeTimeHolder.setIfMax((long) writeTime);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CellData cellData = new CellData(name, value, deletionTs, type);
|
CellData cellData = new CellData(name, value, deletionTs, type);
|
||||||
@ -284,10 +280,6 @@ private static Object readCol(Row row, String col, ColumnMetadata cm) {
|
|||||||
return CassandraTypeDeserializer.deserialize(cm.getType(), row.getBytesUnsafe(col));
|
return CassandraTypeDeserializer.deserialize(cm.getType(), row.getBytesUnsafe(col));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Object readColWritetime(Row row, String col) {
|
|
||||||
return CassandraTypeDeserializer.deserialize(DataType.bigint(), row.getBytesUnsafe(writetimeAlias(col)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Object readColTtl(Row row, String col) {
|
private static Object readColTtl(Row row, String col) {
|
||||||
return CassandraTypeDeserializer.deserialize(DataType.cint(), row.getBytesUnsafe(ttlAlias(col)));
|
return CassandraTypeDeserializer.deserialize(DataType.cint(), row.getBytesUnsafe(ttlAlias(col)));
|
||||||
}
|
}
|
||||||
@ -299,10 +291,6 @@ private static long calculateDeletionTs(Object executionTime, Object ttl) {
|
|||||||
return TimeUnit.MICROSECONDS.convert((long) executionTime, TimeUnit.MILLISECONDS) + TimeUnit.MICROSECONDS.convert((int) ttl, TimeUnit.SECONDS);
|
return TimeUnit.MICROSECONDS.convert((long) executionTime, TimeUnit.MILLISECONDS) + TimeUnit.MICROSECONDS.convert((int) ttl, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String writetimeAlias(String colName) {
|
|
||||||
return colName + "_writetime";
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String ttlAlias(String colName) {
|
private static String ttlAlias(String colName) {
|
||||||
return colName + "_ttl";
|
return colName + "_ttl";
|
||||||
}
|
}
|
||||||
@ -314,21 +302,4 @@ private static String withQuotes(String s) {
|
|||||||
private static String tableName(TableMetadata tm) {
|
private static String tableName(TableMetadata tm) {
|
||||||
return tm.getKeyspace().getName() + "." + tm.getName();
|
return tm.getKeyspace().getName() + "." + tm.getName();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A mutable structure which is used to hold the maximum writetime value of a given row.
|
|
||||||
*/
|
|
||||||
private static class WriteTimeHolder {
|
|
||||||
private long maxTs = -1;
|
|
||||||
|
|
||||||
void setIfMax(long ts) {
|
|
||||||
if (ts > maxTs) {
|
|
||||||
maxTs = ts;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long get() {
|
|
||||||
return maxTs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user