DBZ-580 Using ChangeEventQueue in MongoDB connector

This commit is contained in:
Gunnar Morling 2018-01-29 14:42:35 +01:00 committed by Jiri Pechanec
parent 45a9847d42
commit 1626f385ad

View File

@ -12,13 +12,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
@ -27,16 +24,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.time.Temporals;
import io.debezium.util.Clock;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.util.LoggingContext;
import io.debezium.util.LoggingContext.PreviousContext;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import io.debezium.util.Threads.Timer;
/**
* A Kafka Connect source task that replicates the changes from one or more MongoDB replica sets, using one {@link Replicator}
@ -54,13 +47,15 @@
@ThreadSafe
public final class MongoDbConnectorTask extends SourceTask {
private static final String CONTEXT_NAME = "mongodb-connector-task";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicBoolean running = new AtomicBoolean(false);
private final Deque<Replicator> replicators = new ConcurrentLinkedDeque<>();
private final RecordBatchSummarizer recordSummarizer = new RecordBatchSummarizer();
// These are all effectively constants between start(...) and stop(...)
private volatile TaskRecordQueue queue;
private volatile ChangeEventQueue<SourceRecord> queue;
private volatile String taskName;
private volatile ReplicationContext replContext;
@ -116,7 +111,12 @@ public void start(Map<String, String> props) {
}
// Set up the task record queue ...
this.queue = new TaskRecordQueue(config, replicaSets.replicaSetCount(), running::get, recordSummarizer);
this.queue = new ChangeEventQueue.Builder<SourceRecord>()
.pollInterval(Duration.ofMillis(config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS)))
.maxBatchSize(config.getInteger(MongoDbConnectorConfig.MAX_BATCH_SIZE))
.maxQueueSize(config.getInteger(MongoDbConnectorConfig.MAX_QUEUE_SIZE))
.loggingContextSupplier(this::getLoggingContext)
.build();
// Get the offsets for each of replica set partition ...
SourceInfo source = replicationContext.source();
@ -170,7 +170,9 @@ public void start(Map<String, String> props) {
@Override
public List<SourceRecord> poll() throws InterruptedException {
return this.queue.poll();
List<SourceRecord> records = queue.poll();
recordSummarizer.accept(records);
return records;
}
@Override
@ -196,52 +198,8 @@ public void stop() {
}
}
@Immutable
protected static class TaskRecordQueue {
// These are all effectively constants between start(...) and stop(...)
private final int maxBatchSize;
private final Metronome metronome;
private final BlockingQueue<SourceRecord> records;
private final BooleanSupplier isRunning;
private final Consumer<List<SourceRecord>> batchConsumer;
private final Duration pollInterval;
protected TaskRecordQueue(Configuration config, int numThreads, BooleanSupplier isRunning,
Consumer<List<SourceRecord>> batchConsumer) {
final int maxQueueSize = config.getInteger(MongoDbConnectorConfig.MAX_QUEUE_SIZE);
pollInterval = Duration.ofMillis(config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS));
maxBatchSize = config.getInteger(MongoDbConnectorConfig.MAX_BATCH_SIZE);
metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
records = new LinkedBlockingDeque<>(maxQueueSize);
this.isRunning = isRunning;
this.batchConsumer = batchConsumer != null ? batchConsumer : (records) -> {};
}
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
while (isRunning.getAsBoolean() && records.drainTo(batch, maxBatchSize) == 0) {
// No events to process, so sleep for a bit ...
metronome.pause();
if (timeout.expired()) {
break;
}
}
this.batchConsumer.accept(batch);
return batch;
}
/**
* Adds the event into the queue for subsequent batch processing.
*
* @param record a record from the MongoDB oplog
* @throws InterruptedException if the thread is interrupted while waiting to enqueue the record
*/
public void enqueue(SourceRecord record) throws InterruptedException {
if (record != null) {
records.put(record);
}
}
private LoggingContext.PreviousContext getLoggingContext() {
return replContext.configureLoggingContext(CONTEXT_NAME);
}
protected final class RecordBatchSummarizer implements Consumer<List<SourceRecord>> {