DBZ-1723 Adjusting to changes in BaseSourceTask

This commit is contained in:
Gunnar Morling 2020-03-11 10:50:49 +01:00 committed by Jiri Pechanec
parent e84db3030b
commit f290606633
2 changed files with 6 additions and 44 deletions

View File

@ -100,7 +100,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
errorHandler = new ErrorHandler(Db2Connector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
errorHandler = new ErrorHandler(Db2Connector.class, connectorConfig.getLogicalName(), queue);
final Db2EventMetadataProvider metadataProvider = new Db2EventMetadataProvider();
@ -149,7 +149,7 @@ protected OffsetContext getPreviousOffset(OffsetContext.Loader loader) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
final List<DataChangeEvent> records = queue.poll();
final List<SourceRecord> sourceRecords = records.stream()
@ -160,16 +160,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}
@Override
public void stop() {
cleanupResources();
}
private void cleanupResources() {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
public void doStop() {
try {
if (coordinator != null) {
coordinator.stop();
@ -181,16 +172,6 @@ private void cleanupResources() {
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
try {
if (errorHandler != null) {
errorHandler.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping", e);
}
try {
if (dataConnection != null) {
dataConnection.close();

View File

@ -73,7 +73,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();
errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue, this::cleanupResources);
errorHandler = new ErrorHandler(OracleConnector.class, connectorConfig.getLogicalName(), queue);
final OracleEventMetadataProvider metadataProvider = new OracleEventMetadataProvider();
@ -95,7 +95,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
List<DataChangeEvent> records = queue.poll();
List<SourceRecord> sourceRecords = records.stream()
@ -106,16 +106,7 @@ public List<SourceRecord> poll() throws InterruptedException {
}
@Override
public void stop() {
cleanupResources();
}
private void cleanupResources() {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
public void doStop() {
try {
if (coordinator != null) {
coordinator.stop();
@ -128,16 +119,6 @@ private void cleanupResources() {
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
}
try {
if (errorHandler != null) {
errorHandler.stop();
}
}
catch (InterruptedException e) {
Thread.interrupted();
LOGGER.error("Interrupted while stopping", e);
}
try {
if (jdbcConnection != null) {
jdbcConnection.close();