diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java index d182462d0..94b6958b8 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnector.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mongodb; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -118,7 +119,7 @@ public void start(Map props) { // Set up and start the thread that monitors the members of all of the replica sets ... replicaSetMonitorExecutor = Threads.newSingleThreadExecutor(MongoDbConnector.class, taskContext.serverName(), "replica-set-monitor"); ReplicaSetDiscovery monitor = new ReplicaSetDiscovery(taskContext); - monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, connectionContext.pollPeriodInSeconds(), TimeUnit.SECONDS, + monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, Duration.ofSeconds(connectionContext.pollPeriodInSeconds()), Clock.SYSTEM, () -> taskContext.configureLoggingContext("disc"), this::replicaSetsChanged); replicaSetMonitorExecutor.execute(monitorThread); logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica sets", connectionContext.hosts()); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java index e3ca2c507..3018030b4 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.mongodb; +import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -21,7 +22,7 @@ /** * A thread that can be used to when new replica sets are added or existing replica sets are removed. The logic does not evaluate * membership changes of individual replica sets, since that is handled independently by each task. - * + * * @author Randall Hauch */ public final class ReplicaSetMonitorThread implements Runnable { @@ -37,16 +38,15 @@ public final class ReplicaSetMonitorThread implements Runnable { /** * @param monitor the component used to periodically obtain the replica set specifications; may not be null * @param period the time period between polling checks; must be non-negative - * @param unit the time unit for the {@code period}; may not be null * @param clock the clock to use; may be null if the system clock should be used * @param onStartup the function to call when the thread is started; may be null if not needed * @param onChange the function to call when the set of replica set specifications has changed; may be null if not needed */ - public ReplicaSetMonitorThread(Supplier monitor, long period, TimeUnit unit, Clock clock, Runnable onStartup, + public ReplicaSetMonitorThread(Supplier monitor, Duration period, Clock clock, Runnable onStartup, Consumer onChange) { if (clock == null) clock = Clock.system(); this.monitor = monitor; - this.metronome = Metronome.sleeper(period, unit, clock); + this.metronome = Metronome.sleeper(period, clock); this.onChange = onChange != null ? onChange : (rsSpecs) -> {}; this.onStartup = onStartup != null ? onStartup : () -> {}; } @@ -97,7 +97,7 @@ public void run() { /** * Get the information about each of the replica sets. - * + * * @param timeout the time to block until the replica sets are first obtained from MongoDB; may not be negative * @param unit the time unit for the {@code timeout}; may not be null * @return the replica sets, or {@code null} if the timeout occurred before the replica set information was obtained diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java index 674cfaa38..5cfe95bb2 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/ReplicationConnectionIT.java @@ -11,6 +11,7 @@ import static org.junit.Assert.fail; import java.sql.SQLException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -282,7 +283,7 @@ private List expectedMessagesFromStream(ReplicationStream st ExecutorService executorService = Executors.newSingleThreadExecutor(); Semaphore latch = new Semaphore(0); - Metronome metronome = Metronome.sleeper(50, TimeUnit.MILLISECONDS, Clock.SYSTEM); + Metronome metronome = Metronome.sleeper(Duration.ofMillis(50), Clock.SYSTEM); Future result = executorService.submit(() -> { while (!Thread.interrupted()) { for(;;) { diff --git a/debezium-core/src/main/java/io/debezium/util/Metronome.java b/debezium-core/src/main/java/io/debezium/util/Metronome.java index 8bde8c093..bc58f75a7 100644 --- a/debezium-core/src/main/java/io/debezium/util/Metronome.java +++ b/debezium-core/src/main/java/io/debezium/util/Metronome.java @@ -39,12 +39,11 @@ public interface Metronome { * be used when specifying a {@code period} of 20 milliseconds or smaller. * * @param period the period of time that the metronome ticks and for which {@link #pause()} waits - * @param unit the unit of time; may not be null * @param timeSystem the time system that will provide the current time; may not be null * @return the new metronome; never null */ - public static Metronome sleeper(long period, TimeUnit unit, Clock timeSystem) { - long periodInMillis = unit.toMillis(period); + public static Metronome sleeper(Duration period, Clock timeSystem) { + long periodInMillis = period.toMillis(); return new Metronome() { private long next = timeSystem.currentTimeInMillis() + periodInMillis; @@ -67,10 +66,6 @@ public String toString() { }; } - public static Metronome sleeper(Duration period, Clock timeSystem) { - return sleeper(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem); - } - /** * Create a new metronome that starts ticking immediately and that uses {@link LockSupport#parkNanos(long)} to wait. *

@@ -83,12 +78,11 @@ public static Metronome sleeper(Duration period, Clock timeSystem) { * be used when specifying a {@code period} of 10-15 milliseconds or smaller. * * @param period the period of time that the metronome ticks and for which {@link #pause()} waits - * @param unit the unit of time; may not be null * @param timeSystem the time system that will provide the current time; may not be null * @return the new metronome; never null */ - public static Metronome parker(long period, TimeUnit unit, Clock timeSystem) { - long periodInNanos = unit.toNanos(period); + public static Metronome parker(Duration period, Clock timeSystem) { + long periodInNanos = period.toNanos(); return new Metronome() { private long next = timeSystem.currentTimeInNanos() + periodInNanos; @@ -109,8 +103,4 @@ public String toString() { } }; } - - public static Metronome parker(Duration period, Clock timeSystem) { - return parker(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem); - } }