DBZ-2066 Add logs when the table snapshot is taken and when emitting a kafka message for a single record/mutation
This commit is contained in:
parent
11d9a34a16
commit
a3de9c8c10
@ -52,7 +52,9 @@ public KafkaRecordEmitter(String kafkaTopicPrefix, String heartbeatPrefix, Prope
|
|||||||
public void emit(Record record) {
|
public void emit(Record record) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
|
ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
|
||||||
|
LOGGER.debug("Sending the record '{}'", record.toString());
|
||||||
Future<RecordMetadata> future = producer.send(producerRecord);
|
Future<RecordMetadata> future = producer.send(producerRecord);
|
||||||
|
LOGGER.debug("The record '{}' has been sent", record.toString());
|
||||||
futures.put(record, future);
|
futures.put(record, future);
|
||||||
maybeFlushAndMarkOffset();
|
maybeFlushAndMarkOffset();
|
||||||
}
|
}
|
||||||
|
@ -155,6 +155,7 @@ private void takeTableSnapshot(TableMetadata tableMetadata) throws IOException {
|
|||||||
LOGGER.info("Executing snapshot query '{}' with consistency level {}", statement.getQueryString(), statement.getConsistencyLevel());
|
LOGGER.info("Executing snapshot query '{}' with consistency level {}", statement.getQueryString(), statement.getConsistencyLevel());
|
||||||
ResultSet resultSet = cassandraClient.execute(statement);
|
ResultSet resultSet = cassandraClient.execute(statement);
|
||||||
processResultSet(tableMetadata, resultSet);
|
processResultSet(tableMetadata, resultSet);
|
||||||
|
LOGGER.debug("The snapshot of table '{}' has been taken", tableName(tableMetadata));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user