DBZ-356 Closing typeResolverConnection upon RecordsStreamProducer#close()

This commit is contained in:
Gunnar Morling 2017-09-18 16:51:02 +02:00 committed by Jiri Pechanec
parent 70672525ad
commit abaf233066

View File

@ -57,6 +57,7 @@ public class RecordsStreamProducer extends RecordsProducer {
private final ReplicationConnection replicationConnection;
private final AtomicReference<ReplicationStream> replicationStream;
private PgConnection typeResolverConnection = null;
/**
* Creates new producer instance for the given task context
*
@ -146,11 +147,25 @@ protected synchronized void commit() {
@Override
protected synchronized void stop() {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
if (replicationStream.get() == null) {
logger.debug("already stopped....");
return;
}
closeConnections();
} finally {
replicationStream.set(null);
executorService.shutdownNow();
previousContext.restore();
}
}
private void closeConnections() {
Exception closingException = null;
try {
if (replicationConnection != null) {
logger.debug("stopping streaming...");
//TODO author=Horia Chiorean date=08/11/2016 description=Ideally we'd close the stream, but it's not reliable atm (see javadoc)
@ -158,12 +173,24 @@ protected synchronized void stop() {
// close the connection - this should also disconnect the current stream even if it's blocking
replicationConnection.close();
}
} catch (Exception e) {
throw new ConnectException(e.getCause() != null ? e.getCause() : e);
} finally {
replicationStream.set(null);
executorService.shutdownNow();
previousContext.restore();
}
catch(Exception e) {
closingException = e;
}
finally {
try {
if (typeResolverConnection != null) {
typeResolverConnection.close();
}
}
catch(Exception e) {
ConnectException rethrown = new ConnectException(e);
if (closingException != null) {
rethrown.addSuppressed(closingException);
}
throw rethrown;
}
}
}