DBZ-1019 Removing legacy methods from Metronome

This commit is contained in:
Gunnar Morling 2019-02-11 17:44:06 +01:00
parent fa6fcd8487
commit 2a486ecfff
4 changed files with 13 additions and 21 deletions

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -118,7 +119,7 @@ public void start(Map<String, String> props) {
// Set up and start the thread that monitors the members of all of the replica sets ...
replicaSetMonitorExecutor = Threads.newSingleThreadExecutor(MongoDbConnector.class, taskContext.serverName(), "replica-set-monitor");
ReplicaSetDiscovery monitor = new ReplicaSetDiscovery(taskContext);
monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, connectionContext.pollPeriodInSeconds(), TimeUnit.SECONDS,
monitorThread = new ReplicaSetMonitorThread(monitor::getReplicaSets, Duration.ofSeconds(connectionContext.pollPeriodInSeconds()),
Clock.SYSTEM, () -> taskContext.configureLoggingContext("disc"), this::replicaSetsChanged);
replicaSetMonitorExecutor.execute(monitorThread);
logger.info("Successfully started MongoDB connector, and continuing to discover changes in replica sets", connectionContext.hosts());

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@ -21,7 +22,7 @@
/**
* A thread that can be used to when new replica sets are added or existing replica sets are removed. The logic does not evaluate
* membership changes of individual replica sets, since that is handled independently by each task.
*
*
* @author Randall Hauch
*/
public final class ReplicaSetMonitorThread implements Runnable {
@ -37,16 +38,15 @@ public final class ReplicaSetMonitorThread implements Runnable {
/**
* @param monitor the component used to periodically obtain the replica set specifications; may not be null
* @param period the time period between polling checks; must be non-negative
* @param unit the time unit for the {@code period}; may not be null
* @param clock the clock to use; may be null if the system clock should be used
* @param onStartup the function to call when the thread is started; may be null if not needed
* @param onChange the function to call when the set of replica set specifications has changed; may be null if not needed
*/
public ReplicaSetMonitorThread(Supplier<ReplicaSets> monitor, long period, TimeUnit unit, Clock clock, Runnable onStartup,
public ReplicaSetMonitorThread(Supplier<ReplicaSets> monitor, Duration period, Clock clock, Runnable onStartup,
Consumer<ReplicaSets> onChange) {
if (clock == null) clock = Clock.system();
this.monitor = monitor;
this.metronome = Metronome.sleeper(period, unit, clock);
this.metronome = Metronome.sleeper(period, clock);
this.onChange = onChange != null ? onChange : (rsSpecs) -> {};
this.onStartup = onStartup != null ? onStartup : () -> {};
}
@ -97,7 +97,7 @@ public void run() {
/**
* Get the information about each of the replica sets.
*
*
* @param timeout the time to block until the replica sets are first obtained from MongoDB; may not be negative
* @param unit the time unit for the {@code timeout}; may not be null
* @return the replica sets, or {@code null} if the timeout occurred before the replica set information was obtained

View File

@ -11,6 +11,7 @@
import static org.junit.Assert.fail;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@ -282,7 +283,7 @@ private List<ReplicationMessage> expectedMessagesFromStream(ReplicationStream st
ExecutorService executorService = Executors.newSingleThreadExecutor();
Semaphore latch = new Semaphore(0);
Metronome metronome = Metronome.sleeper(50, TimeUnit.MILLISECONDS, Clock.SYSTEM);
Metronome metronome = Metronome.sleeper(Duration.ofMillis(50), Clock.SYSTEM);
Future<?> result = executorService.submit(() -> {
while (!Thread.interrupted()) {
for(;;) {

View File

@ -39,12 +39,11 @@ public interface Metronome {
* be used when specifying a {@code period} of 20 milliseconds or smaller.
*
* @param period the period of time that the metronome ticks and for which {@link #pause()} waits
* @param unit the unit of time; may not be null
* @param timeSystem the time system that will provide the current time; may not be null
* @return the new metronome; never null
*/
public static Metronome sleeper(long period, TimeUnit unit, Clock timeSystem) {
long periodInMillis = unit.toMillis(period);
public static Metronome sleeper(Duration period, Clock timeSystem) {
long periodInMillis = period.toMillis();
return new Metronome() {
private long next = timeSystem.currentTimeInMillis() + periodInMillis;
@ -67,10 +66,6 @@ public String toString() {
};
}
public static Metronome sleeper(Duration period, Clock timeSystem) {
return sleeper(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem);
}
/**
* Create a new metronome that starts ticking immediately and that uses {@link LockSupport#parkNanos(long)} to wait.
* <p>
@ -83,12 +78,11 @@ public static Metronome sleeper(Duration period, Clock timeSystem) {
* be used when specifying a {@code period} of 10-15 milliseconds or smaller.
*
* @param period the period of time that the metronome ticks and for which {@link #pause()} waits
* @param unit the unit of time; may not be null
* @param timeSystem the time system that will provide the current time; may not be null
* @return the new metronome; never null
*/
public static Metronome parker(long period, TimeUnit unit, Clock timeSystem) {
long periodInNanos = unit.toNanos(period);
public static Metronome parker(Duration period, Clock timeSystem) {
long periodInNanos = period.toNanos();
return new Metronome() {
private long next = timeSystem.currentTimeInNanos() + periodInNanos;
@ -109,8 +103,4 @@ public String toString() {
}
};
}
public static Metronome parker(Duration period, Clock timeSystem) {
return parker(period.toNanos(), TimeUnit.NANOSECONDS, timeSystem);
}
}