From 2a35eae1789ccbcfa58ba97843974e8003f6513d Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Fri, 9 Mar 2018 12:42:44 +0100 Subject: [PATCH] DBZ-648 InterrupedException handled by MongoDB --- .../connector/mongodb/ReplicaSetDiscovery.java | 6 ++++++ .../mongodb/ReplicaSetMonitorThread.java | 18 +++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java index f6aec5456..d3dee4920 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java @@ -13,6 +13,7 @@ import com.mongodb.MongoClient; import com.mongodb.MongoException; +import com.mongodb.MongoInterruptedException; import com.mongodb.ReplicaSetStatus; import io.debezium.annotation.ThreadSafe; @@ -70,6 +71,11 @@ public ReplicaSets getReplicaSets() { String replicaSetName = MongoUtil.replicaSetUsedIn(hostStr); replicaSetSpecs.add(new ReplicaSet(hostStr, replicaSetName, shardName)); }); + } + catch (MongoInterruptedException e) { + logger.error("Interrupted while reading the '{}' collection in the '{}' database: {}", + shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e); + Thread.currentThread().interrupt(); } catch (MongoException e) { logger.error("Error while reading the '{}' collection in the '{}' database: {}", shardsCollection, CONFIG_DATABASE_NAME, e.getMessage(), e); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java index 82a293c62..e3ca2c507 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetMonitorThread.java @@ -13,6 +13,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.mongodb.MongoInterruptedException; + import io.debezium.util.Clock; import io.debezium.util.Metronome; @@ -65,11 +67,21 @@ public void run() { // At least one of the replica sets been added or been removed ... try { onChange.accept(replicaSets); - } catch (Throwable t) { + } + catch (MongoInterruptedException t) { + logger.error("Interrupted while calling the function with the new replica set specifications", t); + Thread.currentThread().interrupt(); + } + catch (Throwable t) { logger.error("Error while calling the function with the new replica set specifications", t); } } - } catch (Throwable t) { + } + catch (MongoInterruptedException t) { + logger.error("interrupted while trying to get information about the replica sets", t); + Thread.currentThread().interrupt(); + } + catch (Throwable t) { logger.error("Error while trying to get information about the replica sets", t); } // Check again whether we are running before we pause ... @@ -77,7 +89,7 @@ public void run() { try { metronome.pause(); } catch (InterruptedException e) { - Thread.interrupted(); + Thread.currentThread().interrupt(); } } }