DBZ-516 Using Duration instead of long in a few more places
This commit is contained in:
parent
24bdcaf059
commit
0c4190c493
@ -17,7 +17,6 @@
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BooleanSupplier;
|
||||
@ -33,6 +32,7 @@
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.ConfigurationDefaults;
|
||||
import io.debezium.time.Temporals;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.LoggingContext.PreviousContext;
|
||||
import io.debezium.util.Metronome;
|
||||
@ -47,7 +47,7 @@
|
||||
* replica sets will be assigned to each task when the maximum number of tasks is limited. Regardless, every task will use a
|
||||
* separate thread to replicate the contents of each replica set, and each replication thread may use multiple threads
|
||||
* to perform an initial sync of the replica set.
|
||||
*
|
||||
*
|
||||
* @see MongoDbConnector
|
||||
* @see MongoDbConnectorConfig
|
||||
* @author Randall Hauch
|
||||
@ -205,14 +205,14 @@ protected static class TaskRecordQueue {
|
||||
private final BlockingQueue<SourceRecord> records;
|
||||
private final BooleanSupplier isRunning;
|
||||
private final Consumer<List<SourceRecord>> batchConsumer;
|
||||
private final long pollIntervalMs;
|
||||
private final Duration pollInterval;
|
||||
|
||||
protected TaskRecordQueue(Configuration config, int numThreads, BooleanSupplier isRunning,
|
||||
Consumer<List<SourceRecord>> batchConsumer) {
|
||||
final int maxQueueSize = config.getInteger(MongoDbConnectorConfig.MAX_QUEUE_SIZE);
|
||||
pollIntervalMs = config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS);
|
||||
pollInterval = Duration.ofMillis(config.getLong(MongoDbConnectorConfig.POLL_INTERVAL_MS));
|
||||
maxBatchSize = config.getInteger(MongoDbConnectorConfig.MAX_BATCH_SIZE);
|
||||
metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);
|
||||
metronome = Metronome.parker(pollInterval, Clock.SYSTEM);
|
||||
records = new LinkedBlockingDeque<>(maxQueueSize);
|
||||
this.isRunning = isRunning;
|
||||
this.batchConsumer = batchConsumer != null ? batchConsumer : (records) -> {};
|
||||
@ -220,7 +220,7 @@ protected TaskRecordQueue(Configuration config, int numThreads, BooleanSupplier
|
||||
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Duration.ofMillis(Math.max(pollIntervalMs, ConfigurationDefaults.RETURN_CONTROL_INTERVAL.toMillis())));
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
|
||||
while (isRunning.getAsBoolean() && records.drainTo(batch, maxBatchSize) == 0) {
|
||||
// No events to process, so sleep for a bit ...
|
||||
metronome.pause();
|
||||
@ -234,7 +234,7 @@ public List<SourceRecord> poll() throws InterruptedException {
|
||||
|
||||
/**
|
||||
* Adds the event into the queue for subsequent batch processing.
|
||||
*
|
||||
*
|
||||
* @param record a record from the MongoDB oplog
|
||||
* @throws InterruptedException if the thread is interrupted while waiting to enqueue the record
|
||||
*/
|
||||
|
@ -23,6 +23,7 @@
|
||||
import com.github.shyiko.mysql.binlog.network.ServerException;
|
||||
|
||||
import io.debezium.config.ConfigurationDefaults;
|
||||
import io.debezium.time.Temporals;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.Metronome;
|
||||
import io.debezium.util.Threads;
|
||||
@ -219,7 +220,7 @@ public List<SourceRecord> poll() throws InterruptedException {
|
||||
|
||||
logger.trace("Polling for next batch of records");
|
||||
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Duration.ofMillis(Math.max(context.pollIntervalInMillseconds(), ConfigurationDefaults.RETURN_CONTROL_INTERVAL.toMillis())));
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(Duration.ofMillis(context.pollIntervalInMillseconds()), ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
|
||||
while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
|
||||
// No records are available even though the snapshot has not yet completed, so sleep for a bit ...
|
||||
metronome.pause();
|
||||
|
@ -25,6 +25,7 @@
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.ConfigurationDefaults;
|
||||
import io.debezium.connector.postgresql.connection.PostgresConnection;
|
||||
import io.debezium.time.Temporals;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.LoggingContext;
|
||||
import io.debezium.util.Metronome;
|
||||
@ -32,8 +33,8 @@
|
||||
import io.debezium.util.Threads.Timer;
|
||||
|
||||
/**
|
||||
* Kafka connect source task which uses Postgres logical decoding over a streaming replication connection to process DB changes.
|
||||
*
|
||||
* Kafka connect source task which uses Postgres logical decoding over a streaming replication connection to process DB changes.
|
||||
*
|
||||
* @author Horia Chiorean (hchiorea@redhat.com)
|
||||
*/
|
||||
public class PostgresConnectorTask extends SourceTask {
|
||||
@ -47,7 +48,7 @@ public class PostgresConnectorTask extends SourceTask {
|
||||
private int maxBatchSize;
|
||||
private RecordsProducer producer;
|
||||
private Metronome metronome;
|
||||
private long pollIntervalMs;
|
||||
private Duration pollInterval;
|
||||
|
||||
public PostgresConnectorTask() {
|
||||
}
|
||||
@ -58,26 +59,25 @@ public void start(Map<String, String> props) {
|
||||
// already running
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (context == null) {
|
||||
throw new ConnectException("Unexpected null context");
|
||||
}
|
||||
|
||||
|
||||
// Validate the configuration ...
|
||||
PostgresConnectorConfig config = new PostgresConnectorConfig(Configuration.from(props));
|
||||
if (!config.validateAndRecord(logger::error)) {
|
||||
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
|
||||
}
|
||||
|
||||
|
||||
|
||||
// create the task context and schema...
|
||||
PostgresSchema schema = new PostgresSchema(config);
|
||||
this.taskContext = new PostgresTaskContext(config, schema);
|
||||
|
||||
|
||||
// create the queue in which records will be produced
|
||||
this.queue = new LinkedBlockingDeque<>(config.maxQueueSize());
|
||||
this.maxBatchSize = config.maxBatchSize();
|
||||
|
||||
|
||||
SourceInfo sourceInfo = new SourceInfo(config.serverName());
|
||||
Map<String, Object> existingOffset = context.offsetStorageReader().offset(sourceInfo.partition());
|
||||
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
|
||||
@ -86,7 +86,7 @@ public void start(Map<String, String> props) {
|
||||
try (PostgresConnection connection = taskContext.createConnection()) {
|
||||
logger.info(connection.serverInfo().toString());
|
||||
}
|
||||
|
||||
|
||||
if (existingOffset == null) {
|
||||
logger.info("No previous offset found");
|
||||
if (config.snapshotNeverAllowed()) {
|
||||
@ -118,9 +118,9 @@ public void start(Map<String, String> props) {
|
||||
producer = new RecordsStreamProducer(taskContext, sourceInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
metronome = Metronome.sleeper(config.pollIntervalMs(), TimeUnit.MILLISECONDS, Clock.SYSTEM);
|
||||
pollIntervalMs = config.pollIntervalMs();
|
||||
pollInterval = Duration.ofMillis(config.pollIntervalMs());
|
||||
producer.start(this::enqueueRecord);
|
||||
running.compareAndSet(false, true);
|
||||
} catch (SQLException e) {
|
||||
@ -129,11 +129,11 @@ public void start(Map<String, String> props) {
|
||||
previousContext.restore();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void enqueueRecord(SourceRecord record) {
|
||||
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
|
||||
try {
|
||||
queue.put(record);
|
||||
queue.put(record);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Placed source record '{}' into queue", record);
|
||||
}
|
||||
@ -145,31 +145,31 @@ private void enqueueRecord(SourceRecord record) {
|
||||
previousContext.restore();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, boolean initialOnlySnapshot) {
|
||||
if (initialOnlySnapshot) {
|
||||
logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
|
||||
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, false);
|
||||
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, false);
|
||||
} else {
|
||||
logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
|
||||
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, true);
|
||||
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void commit() throws InterruptedException {
|
||||
if (running.get()) {
|
||||
producer.commit();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<SourceRecord> poll() throws InterruptedException {
|
||||
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
|
||||
try {
|
||||
logger.debug("polling records...");
|
||||
List<SourceRecord> records = new ArrayList<>();
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Duration.ofMillis(Math.max(pollIntervalMs, ConfigurationDefaults.RETURN_CONTROL_INTERVAL.toMillis())));
|
||||
final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
|
||||
while (running.get() && queue.drainTo(records, maxBatchSize) == 0) {
|
||||
if (taskContext.getTaskFailure() != null) {
|
||||
throw new ConnectException(taskContext.getTaskFailure());
|
||||
@ -190,17 +190,17 @@ public List<SourceRecord> poll() throws InterruptedException {
|
||||
}
|
||||
return records;
|
||||
} finally {
|
||||
previousContext.restore();
|
||||
previousContext.restore();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (running.compareAndSet(true, false)) {
|
||||
producer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String version() {
|
||||
return Module.version();
|
||||
|
25
debezium-core/src/main/java/io/debezium/time/Temporals.java
Normal file
25
debezium-core/src/main/java/io/debezium/time/Temporals.java
Normal file
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.time;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
/**
|
||||
* Misc. functionality dealing with temporal data types.
|
||||
*
|
||||
* @author Gunnar Morling
|
||||
*/
|
||||
public class Temporals {
|
||||
|
||||
/**
|
||||
* Returns that duration from the given ones which represents the larger amount
|
||||
* of time ("is longer"). If both durations are equal, that same value will be
|
||||
* returned.
|
||||
*/
|
||||
public static Duration max(Duration d1, Duration d2) {
|
||||
return d1.compareTo(d2) == 1 ? d1 : d2;
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package io.debezium.util;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
|
||||
@ -62,6 +63,10 @@ public String toString() {
|
||||
};
|
||||
}
|
||||
|
||||
public static Metronome sleeper(Duration period, Clock timeSystem) {
|
||||
return sleeper(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new metronome that starts ticking immediately and that uses {@link LockSupport#parkNanos(long)} to wait.
|
||||
* <p>
|
||||
@ -100,4 +105,8 @@ public String toString() {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
public static Metronome parker(Duration period, Clock timeSystem) {
|
||||
return parker(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.time;
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit test for {@code Temporals}.
|
||||
*
|
||||
* @author Gunnar Morling
|
||||
*/
|
||||
public class TemporalsTest {
|
||||
|
||||
@Test
|
||||
public void maxHandlesSameUnit() {
|
||||
Duration hundredMillis = Duration.ofMillis(100);
|
||||
Duration thousandMillis = Duration.ofMillis(1000);
|
||||
assertThat(Temporals.max(hundredMillis, thousandMillis)).isEqualTo(thousandMillis);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxHandlesDifferentUnits() {
|
||||
Duration sixtyOneMinutes = Duration.ofMinutes(61);
|
||||
Duration oneHour = Duration.ofHours(1);
|
||||
assertThat(Temporals.max(sixtyOneMinutes, oneHour)).isEqualTo(sixtyOneMinutes);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void maxHandlesEqualValue() {
|
||||
Duration oneMilli = Duration.ofMillis(1);
|
||||
Duration oneMillionNanos = Duration.ofNanos(1_000_000);
|
||||
assertThat(Temporals.max(oneMilli, oneMillionNanos)).isEqualTo(oneMilli);
|
||||
assertThat(Temporals.max(oneMilli, oneMillionNanos)).isEqualTo(oneMillionNanos);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user