DBZ-117 Improved logging when MongoDB connector is reading oplog

The MongoDB connector now outputs an INFO log message whenever its task's `poll()` method returns a non-empty list of `SourceRecord` objects, where the message includes the number of records and the offset of the last record.
This commit is contained in:
Randall Hauch 2016-09-06 10:32:52 -05:00
parent de1edce895
commit 63e2e48ab3
2 changed files with 66 additions and 3 deletions

View File

@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@ -19,6 +20,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@ -52,6 +54,7 @@ public final class MongoDbConnectorTask extends SourceTask {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicBoolean running = new AtomicBoolean(false);
private final Deque<Replicator> replicators = new ConcurrentLinkedDeque<>();
private final RecordBatchSummarizer recordSummarizer = new RecordBatchSummarizer();
// These are all effectively constants between start(...) and stop(...)
private volatile TaskRecordQueue queue;
@ -80,7 +83,6 @@ public void start(Map<String, String> props) {
throw new ConnectException("Unexpected null context");
}
// Read the configuration and set up the replication context ...
final Configuration config = Configuration.from(props);
this.taskName = "task" + config.getInteger(MongoDbConnectorConfig.TASK_ID);
@ -107,7 +109,7 @@ public void start(Map<String, String> props) {
final ReplicaSets replicaSets = ReplicaSets.parse(hosts);
// Set up the task record queue ...
this.queue = new TaskRecordQueue(config, replicaSets.replicaSetCount(), running::get);
this.queue = new TaskRecordQueue(config, replicaSets.replicaSetCount(), running::get, recordSummarizer);
// Get the offsets for each of replica set partition ...
SourceInfo source = replicationContext.source();
@ -191,14 +193,17 @@ protected static class TaskRecordQueue {
private final Metronome metronome;
private final BlockingQueue<SourceRecord> records;
private final BooleanSupplier isRunning;
private final Consumer<List<SourceRecord>> batchConsumer;
protected TaskRecordQueue(Configuration config, int numThreads, BooleanSupplier isRunning) {
protected TaskRecordQueue(Configuration config, int numThreads, BooleanSupplier isRunning,
Consumer<List<SourceRecord>> batchConsumer) {
final int maxQueueSize = config.getInteger(MongoDbConnectorConfig.MAX_QUEUE_SIZE);
final long pollIntervalMs = config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS);
maxBatchSize = config.getInteger(MongoDbConnectorConfig.MAX_BATCH_SIZE);
metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);
records = new LinkedBlockingDeque<>(maxQueueSize);
this.isRunning = isRunning;
this.batchConsumer = batchConsumer != null ? batchConsumer : (records) -> {};
}
public List<SourceRecord> poll() throws InterruptedException {
@ -207,6 +212,7 @@ public List<SourceRecord> poll() throws InterruptedException {
// No events to process, so sleep for a bit ...
metronome.pause();
}
this.batchConsumer.accept(batch);
return batch;
}
@ -222,4 +228,50 @@ public void enqueue(SourceRecord record) throws InterruptedException {
}
}
}
protected final class RecordBatchSummarizer implements Consumer<List<SourceRecord>> {
private final Map<String, ReplicaSetSummary> summaryByReplicaSet = new HashMap<>();
@Override
public void accept(List<SourceRecord> records) {
if (records.isEmpty()) return;
if (!logger.isInfoEnabled()) return;
summaryByReplicaSet.clear();
records.forEach(record -> {
String replicaSetName = SourceInfo.replicaSetNameForPartition(record.sourcePartition());
if (replicaSetName != null) {
summaryByReplicaSet.computeIfAbsent(replicaSetName, rsName -> new ReplicaSetSummary()).add(record);
}
});
if (!summaryByReplicaSet.isEmpty()) {
PreviousContext prevContext = replContext.configureLoggingContext("task");
try {
summaryByReplicaSet.forEach((rsName, summary) -> {
logger.info("{} records sent for replica set '{}', last offset: {}",
summary.recordCount(), rsName, summary.lastOffset());
});
} finally {
prevContext.restore();
}
}
}
}
protected static final class ReplicaSetSummary {
private int numRecords = 0;
private Map<String, ?> lastOffset;
public void add(SourceRecord record) {
++numRecords;
lastOffset = record.sourceOffset();
}
public int recordCount() {
return numRecords;
}
public Map<String, ?> lastOffset() {
return lastOffset;
}
}
}

View File

@ -131,6 +131,17 @@ public Long getOperationId() {
}
}
/**
* Get the replica set name for the given partition.
*
* @param partition the partition map
* @return the replica set name (when the partition is valid), or {@code null} if the partition is null or has no replica
* set name entry
*/
public static String replicaSetNameForPartition(Map<String, ?> partition) {
return partition != null ? (String) partition.get(REPLICA_SET_NAME) : null;
}
private final String serverName;
public SourceInfo(String serverName) {