DBZ-784 Terminate execute loops during shutdown

This commit is contained in:
Jiri Pechanec 2018-09-18 08:54:03 +02:00 committed by Gunnar Morling
parent 2253eaa6a6
commit a459c2cc4d
2 changed files with 25 additions and 0 deletions

View File

@ -10,6 +10,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@ -222,6 +223,7 @@ public static class MongoPrimary {
private final Supplier<MongoClient> primaryConnectionSupplier;
private final Filters filters;
private final BiConsumer<String, Throwable> errorHandler;
private final AtomicBoolean stop = new AtomicBoolean();
protected MongoPrimary(ConnectionContext context, ReplicaSet replicaSet, Filters filters, BiConsumer<String, Throwable> errorHandler) {
this.replicaSet = replicaSet;
@ -272,6 +274,9 @@ public void execute(String desc, Consumer<MongoClient> operation) {
return;
} catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
}
try {
errorMetronome.pause();
}
@ -299,6 +304,9 @@ public <T> T execute(String desc, Function<MongoClient, T> operation) {
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
}
try {
errorMetronome.pause();
}
@ -330,6 +338,9 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t);
}
errorMetronome.pause();
}
}
@ -386,6 +397,17 @@ public List<CollectionId> collections() {
return collections;
});
}
private boolean isRunning() {
return stop.get();
}
/**
* Terminates the execution loop of the current primary
*/
public void stop() {
stop.set(true);
}
}
/**

View File

@ -154,6 +154,9 @@ public void run() {
onFailure.accept(t);
}
finally {
if (primaryClient != null) {
primaryClient.stop();
}
this.running.set(false);
}
}