diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java index 8dc2cc94d..da78d51e1 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogProcessor.java @@ -51,7 +51,7 @@ public CommitLogProcessor(CassandraConnectorContext context) throws IOException context.getOffsetWriter(), new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(context.getCassandraConnectorConfig().fieldBlacklist()), - new SourceInfo(context.getCassandraConnectorConfig())), + context.getCassandraConnectorConfig()), metrics); cdcDir = new File(DatabaseDescriptor.getCDCLogLocation()); watcher = new AbstractDirectoryWatcher(cdcDir.toPath(), context.getCassandraConnectorConfig().cdcDirPollIntervalMs(), Collections.singleton(ENTRY_CREATE)) { diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java index cac781b48..f75f2df0d 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/CommitLogReadHandlerImpl.java @@ -331,9 +331,9 @@ private void handlePartitionDeletion(PartitionUpdate pu, OffsetPosition offsetPo after.addCell(cellData); } - recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, - Conversions.toInstantFromMicros(pu.maxTimestamp())); - recordMaker.delete(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); + recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, + Conversions.toInstantFromMicros(pu.maxTimestamp()), after, keySchema, valueSchema, + MARK_OFFSET, queue::enqueue); } catch (Exception e) { LOGGER.error("Fail to delete partition at {}. Reason: {}", offsetPosition, e); @@ -367,19 +367,21 @@ private void handleRowModifications(Row row, RowType rowType, PartitionUpdate pu populateRegularColumns(after, row, rowType, schema); long ts = rowType == DELETE ? row.deletion().time().markedForDeleteAt() : pu.maxTimestamp(); - recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, Conversions.toInstantFromMicros(ts)); switch (rowType) { case INSERT: - recordMaker.insert(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); + recordMaker.insert(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, + Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); break; case UPDATE: - recordMaker.update(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); + recordMaker.update(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, + Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); break; case DELETE: - recordMaker.delete(after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); + recordMaker.delete(DatabaseDescriptor.getClusterName(), offsetPosition, keyspaceTable, false, + Conversions.toInstantFromMicros(ts), after, keySchema, valueSchema, MARK_OFFSET, queue::enqueue); break; default: diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RecordMaker.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RecordMaker.java index 41d42ec7a..58993e247 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RecordMaker.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/RecordMaker.java @@ -12,6 +12,8 @@ import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException; import io.debezium.function.BlockingConsumer; +import java.time.Instant; + /** * Responsible for generating ChangeRecord and/or TombstoneRecord for create/update/delete events, as well as EOF events. */ @@ -19,28 +21,39 @@ public class RecordMaker { private static final Logger LOGGER = LoggerFactory.getLogger(RecordMaker.class); private final boolean emitTombstoneOnDelete; private final Filters filters; - private final SourceInfo sourceInfo; + private final CassandraConnectorConfig config; - public RecordMaker(boolean emitTombstoneOnDelete, Filters filters, SourceInfo sourceInfo) { + public RecordMaker(boolean emitTombstoneOnDelete, Filters filters, CassandraConnectorConfig config) { this.emitTombstoneOnDelete = emitTombstoneOnDelete; this.filters = filters; - this.sourceInfo = sourceInfo; + this.config = config; } - public void insert(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer consumer) { - createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.INSERT); + public void insert(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, + Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema, + boolean markOffset, BlockingConsumer consumer) { + createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro, + data, keySchema, valueSchema, markOffset, consumer, Record.Operation.INSERT); } - public void update(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer consumer) { - createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.UPDATE); + public void update(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, + Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema, + boolean markOffset, BlockingConsumer consumer) { + createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro, + data, keySchema, valueSchema, markOffset, consumer, Record.Operation.UPDATE); } - public void delete(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer consumer) { - createRecord(data, keySchema, valueSchema, markOffset, consumer, Record.Operation.DELETE); + public void delete(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, + Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema, + boolean markOffset, BlockingConsumer consumer) { + createRecord(cluster, offsetPosition, keyspaceTable, snapshot, tsMicro, + data, keySchema, valueSchema, markOffset, consumer, Record.Operation.DELETE); } - private void createRecord(RowData data, Schema keySchema, Schema valueSchema, boolean markOffset, BlockingConsumer consumer, Record.Operation operation) { - FieldFilterSelector.FieldFilter fieldFilter = filters.getFieldFilter(sourceInfo.keyspaceTable); + private void createRecord(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, + Instant tsMicro, RowData data, Schema keySchema, Schema valueSchema, + boolean markOffset, BlockingConsumer consumer, Record.Operation operation) { + FieldFilterSelector.FieldFilter fieldFilter = filters.getFieldFilter(keyspaceTable); RowData filteredData; switch (operation) { case INSERT: @@ -53,7 +66,8 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo break; } - ChangeRecord record = new ChangeRecord(sourceInfo, filteredData, keySchema, valueSchema, operation, markOffset); + SourceInfo source = new SourceInfo(config, cluster, offsetPosition, keyspaceTable, snapshot, tsMicro); + ChangeRecord record = new ChangeRecord(source, filteredData, keySchema, valueSchema, operation, markOffset); try { consumer.accept(record); } @@ -64,7 +78,7 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo if (operation == Record.Operation.DELETE && emitTombstoneOnDelete) { // generate kafka tombstone event - TombstoneRecord tombstoneRecord = new TombstoneRecord(sourceInfo, filteredData, keySchema); + TombstoneRecord tombstoneRecord = new TombstoneRecord(source, filteredData, keySchema); try { consumer.accept(tombstoneRecord); } @@ -75,8 +89,4 @@ private void createRecord(RowData data, Schema keySchema, Schema valueSchema, bo } } - public SourceInfo getSourceInfo() { - return this.sourceInfo; - } - } diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java index 93b9f0b54..1a803a479 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SnapshotProcessor.java @@ -70,7 +70,7 @@ public SnapshotProcessor(CassandraConnectorContext context) { schemaHolder = context.getSchemaHolder(); recordMaker = new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(context.getCassandraConnectorConfig().fieldBlacklist()), - new SourceInfo(context.getCassandraConnectorConfig())); + context.getCassandraConnectorConfig()); snapshotMode = context.getCassandraConnectorConfig().snapshotMode(); consistencyLevel = context.getCassandraConnectorConfig().snapshotConsistencyLevel(); } @@ -212,9 +212,9 @@ private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) RowData after = extractRowData(row, tableMetadata.getColumns(), partitionKeyNames, clusteringKeyNames, writeTimeHolder); // only mark offset if there are no more rows left boolean markOffset = !rowIter.hasNext(); - recordMaker.getSourceInfo().update(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), keyspaceTable, true, - Conversions.toInstantFromMicros(writeTimeHolder.get())); - recordMaker.insert(after, keySchema, valueSchema, markOffset, queue::enqueue); + recordMaker.insert(DatabaseDescriptor.getClusterName(), OffsetPosition.defaultOffsetPosition(), + keyspaceTable, true, Conversions.toInstantFromMicros(writeTimeHolder.get()), + after, keySchema, valueSchema, markOffset, queue::enqueue); rowNum++; if (rowNum % 10_000 == 0) { LOGGER.info("Queued {} snapshot records from table {}", rowNum, tableName); diff --git a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java index 2750e8e1c..798a47137 100644 --- a/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java +++ b/debezium-connector-cassandra/src/main/java/io/debezium/connector/cassandra/SourceInfo.java @@ -35,11 +35,9 @@ public class SourceInfo extends AbstractSourceInfo { public boolean snapshot; public Instant tsMicro; - public SourceInfo(CommonConnectorConfig config) { + public SourceInfo(CommonConnectorConfig config, String cluster, OffsetPosition offsetPosition, + KeyspaceTable keyspaceTable, boolean snapshot, Instant tsMicro) { super(config); - } - - public void update(String cluster, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable, boolean snapshot, Instant tsMicro) { this.cluster = cluster; this.offsetPosition = offsetPosition; this.keyspaceTable = keyspaceTable; diff --git a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/FileOffsetWriterTest.java b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/FileOffsetWriterTest.java index ad112592c..1693c4fd6 100644 --- a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/FileOffsetWriterTest.java +++ b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/FileOffsetWriterTest.java @@ -129,8 +129,8 @@ public void testTwoFileWriterCannotCoexist() throws IOException { private ChangeRecord generateRecord(boolean markOffset, boolean isSnapshot, OffsetPosition offsetPosition, KeyspaceTable keyspaceTable) { CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties())); - SourceInfo sourceInfo = new SourceInfo(config); - sourceInfo.update("test-cluster", offsetPosition, keyspaceTable, isSnapshot, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)); + SourceInfo sourceInfo = new SourceInfo(config, "test-cluster", offsetPosition, keyspaceTable, + isSnapshot, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)); return new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, markOffset); } diff --git a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/QueueProcessorTest.java b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/QueueProcessorTest.java index a3795b041..d85125dd0 100644 --- a/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/QueueProcessorTest.java +++ b/debezium-connector-cassandra/src/test/java/io/debezium/connector/cassandra/QueueProcessorTest.java @@ -50,8 +50,9 @@ public void testProcessChangeRecords() throws Exception { ChangeEventQueue queue = context.getQueue(); for (int i = 0; i < recordSize; i++) { CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties())); - SourceInfo sourceInfo = new SourceInfo(config); - sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, + SourceInfo sourceInfo = new SourceInfo(config, DatabaseDescriptor.getClusterName(), + new OffsetPosition("CommitLog-6-123.log", i), + new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)); Record record = new ChangeRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false); queue.enqueue(record); @@ -71,8 +72,9 @@ public void testProcessTombstoneRecords() throws Exception { ChangeEventQueue queue = context.getQueue(); for (int i = 0; i < recordSize; i++) { CassandraConnectorConfig config = new CassandraConnectorConfig(Configuration.from(new Properties())); - SourceInfo sourceInfo = new SourceInfo(config); - sourceInfo.update(DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, + SourceInfo sourceInfo = new SourceInfo(config, DatabaseDescriptor.getClusterName(), + new OffsetPosition("CommitLog-6-123.log", i), + new KeyspaceTable(TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)); Record record = new TombstoneRecord(sourceInfo, new RowData(), Schema.INT32_SCHEMA); queue.enqueue(record);