From a459c2cc4db017774952e3eb04730eafaf5a0468 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 18 Sep 2018 08:54:03 +0200 Subject: [PATCH] DBZ-784 Terminate execute loops during shutdown --- .../connector/mongodb/ConnectionContext.java | 22 +++++++++++++++++++ .../connector/mongodb/Replicator.java | 3 +++ 2 files changed, 25 insertions(+) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java index bfee6b5d7..274c98150 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ConnectionContext.java @@ -10,6 +10,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -222,6 +223,7 @@ public static class MongoPrimary { private final Supplier primaryConnectionSupplier; private final Filters filters; private final BiConsumer errorHandler; + private final AtomicBoolean stop = new AtomicBoolean(); protected MongoPrimary(ConnectionContext context, ReplicaSet replicaSet, Filters filters, BiConsumer errorHandler) { this.replicaSet = replicaSet; @@ -272,6 +274,9 @@ public void execute(String desc, Consumer operation) { return; } catch (Throwable t) { errorHandler.accept(desc, t); + if (!isRunning()) { + throw new ConnectException("Operation failed and MongoDB primary termination requested", t); + } try { errorMetronome.pause(); } @@ -299,6 +304,9 @@ public T execute(String desc, Function operation) { } catch (Throwable t) { errorHandler.accept(desc, t); + if (!isRunning()) { + throw new ConnectException("Operation failed and MongoDB primary termination requested", t); + } try { errorMetronome.pause(); } @@ -330,6 +338,9 @@ public void executeBlocking(String desc, BlockingConsumer operation } catch (Throwable t) { errorHandler.accept(desc, t); + if (!isRunning()) { + throw new ConnectException("Operation failed and MongoDB primary termination requested", t); + } errorMetronome.pause(); } } @@ -386,6 +397,17 @@ public List collections() { return collections; }); } + + private boolean isRunning() { + return stop.get(); + } + + /** + * Terminates the execution loop of the current primary + */ + public void stop() { + stop.set(true); + } } /** diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java index 8b00b8b2a..ac8d5d450 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Replicator.java @@ -154,6 +154,9 @@ public void run() { onFailure.accept(t); } finally { + if (primaryClient != null) { + primaryClient.stop(); + } this.running.set(false); } }