DBZ-1863 Centralizing coordinator shutdown

This commit is contained in:
Gunnar Morling 2020-03-11 14:09:16 +01:00 committed by Jiri Pechanec
parent 9c2bd0cc23
commit 449154beaa
4 changed files with 14 additions and 39 deletions

View File

@ -52,7 +52,6 @@ public final class MongoDbConnectorTask extends BaseSourceTask {
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile String taskName;
private volatile MongoDbTaskContext taskContext;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile MongoDbSchema schema;
@ -99,7 +98,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
DataChangeEvent::new,
metadataProvider);
coordinator = new ChangeEventSourceCoordinator(
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
previousOffsets,
errorHandler,
MongoDbConnector.class,
@ -133,17 +132,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {
public void doStop() {
PreviousContext previousLogContext = this.taskContext.configureLoggingContext(taskName);
try {
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
logger.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
if (schema != null) {
schema.close();
}

View File

@ -49,7 +49,6 @@ public class PostgresConnectorTask extends BaseSourceTask {
private volatile PostgresTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile PostgresConnection jdbcConnection;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile PostgresSchema schema;
@ -140,7 +139,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
PostgresChangeRecordEmitter::updateSchema,
metadataProvider);
coordinator = new ChangeEventSourceCoordinator(
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
previousOffset,
errorHandler,
PostgresConnector.class,
@ -212,17 +211,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {
@Override
protected void doStop() {
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
if (jdbcConnection != null) {
jdbcConnection.close();
}

View File

@ -46,7 +46,6 @@ public class SqlServerConnectorTask extends BaseSourceTask {
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile SqlServerConnection dataConnection;
private volatile SqlServerConnection metadataConnection;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile SqlServerDatabaseSchema schema;
@ -111,7 +110,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
DataChangeEvent::new,
metadataProvider);
coordinator = new ChangeEventSourceCoordinator(
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
previousOffset,
errorHandler,
SqlServerConnector.class,
@ -138,17 +137,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {
@Override
protected void doStop() {
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
try {
if (dataConnection != null) {
dataConnection.close();

View File

@ -180,6 +180,17 @@ private void stop(boolean restart) {
LOGGER.info("Stopping down connector");
}
try {
if (coordinator != null) {
coordinator.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping coordinator", e);
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
doStop();
if (restart && restartDelay == null) {