DBZ-1892 Fix sourceInfo bug.

This commit is contained in:
Bingqin Zhou 2020-03-20 13:52:05 -07:00 committed by Gunnar Morling
parent d23676a7fb
commit 393758f75b
7 changed files with 51 additions and 39 deletions

View File

@ -51,7 +51,7 @@ public CommitLogProcessor(CassandraConnectorContext context) throws IOException
context.getOffsetWriter(),
new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(),
new Filters(context.getCassandraConnectorConfig().fieldBlacklist()),
new SourceInfo(context.getCassandraConnectorConfig())),
context.getCassandraConnectorConfig()),
metrics);
cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
watcher = new AbstractDirectoryWatcher(cdcDir.toPath(), context.getCassandraConnectorConfig().cdcDirPollIntervalMs(), Collections.singleton(ENTRY_CREATE)) {

View File

@ -331,9 +331,9 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo
after.addCell(cellData);
}
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()));
recordMaker.delete(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema,
MARK_OFFSET, queue::enqueue);
}
catch (Exception e) {
LOGGER.error("Fail to delete partition at {}. Reason: {}", offsetPosition, e);
@ -367,19 +367,21 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu
populateRegularColumns(after, row, rowType, schema);
long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp();
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts));
switch (rowType) {
case INSERT:
recordMaker.insert(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
break;
case UPDATE:
recordMaker.update(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
break;
case DELETE:
recordMaker.delete(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false,
Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue);
break;
default:

View File

@ -12,6 +12,8 @@
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.function.BlockingConsumer;
import java.time.Instant;
/**
* Responsible for generating ChangeRecord and/or TombstoneRecord for create/update/delete events, as well as EOF events.
*/
@ -19,28 +21,39 @@ public class RecordMaker {
private static final Logger LOGGER = LoggerFactory.getLogger(RecordMaker.class);
private final boolean emitTombstoneOnDelete;
private final Filters filters;
private final SourceInfo sourceInfo;
private final CassandraConnectorConfig config;
public RecordMaker(boolean emitTombstoneOnDelete, Filters filters, SourceInfo sourceInfo) {
public RecordMaker(boolean emitTombstoneOnDelete, Filters filters, CassandraConnectorConfig config) {
this.emitTombstoneOnDelete = emitTombstoneOnDelete;
this.filters = filters;
this.sourceInfo = sourceInfo;
this.config = config;
}
public void insert(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.INSERT);
public void insert(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot,
Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema,
boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro,
data, keySchema, valueSchema, markOffset, consumer, Record.Operation.INSERT);
}
public void update(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.UPDATE);
public void update(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot,
Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema,
boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro,
data, keySchema, valueSchema, markOffset, consumer, Record.Operation.UPDATE);
}
public void delete(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.DELETE);
public void delete(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot,
Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema,
boolean markOffset, BlockingConsumer<Record> consumer) {
createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro,
data, keySchema, valueSchema, markOffset, consumer, Record.Operation.DELETE);
}
private void createRecord(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer<Record> consumer, Record.Operation operation) {
FieldFilterSelector.FieldFilter fieldFilter = filters.getFieldFilter(sourceInfo.keyspaceTable);
private void createRecord(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot,
Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema,
boolean markOffset, BlockingConsumer<Record> consumer, Record.Operation operation) {
FieldFilterSelector.FieldFilter fieldFilter = filters.getFieldFilter(keyspaceTable);
RowData filteredData;
switch (operation) {
case INSERT:
@ -53,7 +66,8 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo
break;
}
ChangeRecord record = new ChangeRecord(sourceInfo, filteredData, keySchema, valueSchema, operation, markOffset);
SourceInfo source = new SourceInfo(config, cluster, offsetPosition, keyspaceTable, snapshot, tsMicro);
ChangeRecord record = new ChangeRecord(source, filteredData, keySchema, valueSchema, operation, markOffset);
try {
consumer.accept(record);
}
@ -64,7 +78,7 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo
if (operation == Record.Operation.DELETE && emitTombstoneOnDelete) {
// generate kafka tombstone event
TombstoneRecord tombstoneRecord = new TombstoneRecord(sourceInfo, filteredData, keySchema);
TombstoneRecord tombstoneRecord = new TombstoneRecord(source, filteredData, keySchema);
try {
consumer.accept(tombstoneRecord);
}
@ -75,8 +89,4 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo
}
}
public SourceInfo getSourceInfo() {
return this.sourceInfo;
}
}

View File

@ -70,7 +70,7 @@ public SnapshotProcessor(CassandraConnectorContext context) {
schemaHolder = context.getSchemaHolder();
recordMaker = new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(),
new Filters(context.getCassandraConnectorConfig().fieldBlacklist()),
new SourceInfo(context.getCassandraConnectorConfig()));
context.getCassandraConnectorConfig());
snapshotMode = context.getCassandraConnectorConfig().snapshotMode();
consistencyLevel = context.getCassandraConnectorConfig().snapshotConsistencyLevel();
}
@ -212,9 +212,9 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet)
RowData after = extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, writeTimeHolder);
// only mark offset if there are no more rows left
boolean markOffset = !rowIter.hasNext();
recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), keyspaceTable, true,
Conversions.toInstantFromMicros(writeTimeHolder.get()));
recordMaker.insert(after, keySchema, valueSchema, markOffset, queue::enqueue);
recordMaker.insert(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(),
keyspaceTable, true, Conversions.toInstantFromMicros(writeTimeHolder.get()),
after, keySchema, valueSchema, markOffset, queue::enqueue);
rowNum++;
if (rowNum % 10_000 == 0) {
LOGGER.info("Queued {} snapshot records from table {}", rowNum, tableName);

View File

@ -35,11 +35,9 @@ public class SourceInfo extends AbstractSourceInfo {
public boolean snapshot;
public Instant tsMicro;
public SourceInfo(CommonConnectorConfig config) {
public SourceInfo(CommonConnectorConfig config, String cluster, OffsetPosition offsetPosition,
KeyspaceTable keyspaceTable, boolean snapshot, Instant tsMicro) {
super(config);
}
public void update(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, Instant tsMicro) {
this.cluster = cluster;
this.offsetPosition = offsetPosition;
this.keyspaceTable = keyspaceTable;

View File

@ -129,8 +129,8 @@ public void testTwoFileWriterCannotCoexist() throws IOException {
private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) {
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update("test-cluster", offsetPosition, keyspaceTable, isSnapshot, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
SourceInfo sourceInfo = new SourceInfo(config, "test-cluster", offsetPosition, keyspaceTable,
isSnapshot, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
return new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset);
}

View File

@ -50,8 +50,9 @@ public void testProcessChangeRecords() throws Exception {
ChangeEventQueue<Event> queue = context.getQueue();
for (int i = 0; i < recordSize; i++) {
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
SourceInfo sourceInfo = new SourceInfo(config, DatabaseDescriptor.getClusterName(),
new OffsetPosition("CommitLog-6-123.log", i),
new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
Record record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false);
queue.enqueue(record);
@ -71,8 +72,9 @@ public void testProcessTombstoneRecords() throws Exception {
ChangeEventQueue<Event> queue = context.getQueue();
for (int i = 0; i < recordSize; i++) {
CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties()));
SourceInfo sourceInfo = new SourceInfo(config);
sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
SourceInfo sourceInfo = new SourceInfo(config, DatabaseDescriptor.getClusterName(),
new OffsetPosition("CommitLog-6-123.log", i),
new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false,
Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
Record record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA);
queue.enqueue(record);