diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index b74ea4bda..4b12f4174 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -57,6 +57,7 @@ public class RecordsStreamProducer extends RecordsProducer { private final ReplicationConnection replicationConnection; private final AtomicReference replicationStream; private PgConnection typeResolverConnection = null; + /** * Creates new producer instance for the given task context * @@ -146,11 +147,25 @@ protected synchronized void commit() { @Override protected synchronized void stop() { LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); + try { if (replicationStream.get() == null) { logger.debug("already stopped...."); return; } + + closeConnections(); + } finally { + replicationStream.set(null); + executorService.shutdownNow(); + previousContext.restore(); + } + } + + private void closeConnections() { + Exception closingException = null; + + try { if (replicationConnection != null) { logger.debug("stopping streaming..."); //TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the stream, but it's not reliable atm (see javadoc) @@ -158,12 +173,24 @@ protected synchronized void stop() { // close the connection - this should also disconnect the current stream even if it's blocking replicationConnection.close(); } - } catch (Exception e) { - throw new ConnectException(e.getCause() != null ? e.getCause() : e); - } finally { - replicationStream.set(null); - executorService.shutdownNow(); - previousContext.restore(); + } + catch(Exception e) { + closingException = e; + } + finally { + try { + if (typeResolverConnection != null) { + typeResolverConnection.close(); + } + } + catch(Exception e) { + ConnectException rethrown = new ConnectException(e); + if (closingException != null) { + rethrown.addSuppressed(closingException); + } + + throw rethrown; + } } }