DBZ-1863 Centralizing coordinator shutdown
This commit is contained in:
parent
cc0edc10c3
commit
d23676a7fb
@ -48,7 +48,6 @@ public class Db2ConnectorTask extends BaseSourceTask {
|
|||||||
private volatile ChangeEventQueue<DataChangeEvent> queue;
|
private volatile ChangeEventQueue<DataChangeEvent> queue;
|
||||||
private volatile Db2Connection dataConnection;
|
private volatile Db2Connection dataConnection;
|
||||||
private volatile Db2Connection metadataConnection;
|
private volatile Db2Connection metadataConnection;
|
||||||
private volatile ChangeEventSourceCoordinator coordinator;
|
|
||||||
private volatile ErrorHandler errorHandler;
|
private volatile ErrorHandler errorHandler;
|
||||||
private volatile Db2DatabaseSchema schema;
|
private volatile Db2DatabaseSchema schema;
|
||||||
|
|
||||||
@ -113,7 +112,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
|
|||||||
DataChangeEvent::new,
|
DataChangeEvent::new,
|
||||||
metadataProvider);
|
metadataProvider);
|
||||||
|
|
||||||
coordinator = new ChangeEventSourceCoordinator(
|
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
|
||||||
previousOffset,
|
previousOffset,
|
||||||
errorHandler,
|
errorHandler,
|
||||||
Db2Connector.class,
|
Db2Connector.class,
|
||||||
@ -161,17 +160,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doStop() {
|
public void doStop() {
|
||||||
try {
|
|
||||||
if (coordinator != null) {
|
|
||||||
coordinator.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.interrupted();
|
|
||||||
LOGGER.error("Interrupted while stopping coordinator", e);
|
|
||||||
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (dataConnection != null) {
|
if (dataConnection != null) {
|
||||||
dataConnection.close();
|
dataConnection.close();
|
||||||
|
@ -9,7 +9,6 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -36,7 +35,6 @@ public class OracleConnectorTask extends BaseSourceTask {
|
|||||||
private volatile OracleTaskContext taskContext;
|
private volatile OracleTaskContext taskContext;
|
||||||
private volatile ChangeEventQueue<DataChangeEvent> queue;
|
private volatile ChangeEventQueue<DataChangeEvent> queue;
|
||||||
private volatile OracleConnection jdbcConnection;
|
private volatile OracleConnection jdbcConnection;
|
||||||
private volatile ChangeEventSourceCoordinator coordinator;
|
|
||||||
private volatile ErrorHandler errorHandler;
|
private volatile ErrorHandler errorHandler;
|
||||||
private volatile OracleDatabaseSchema schema;
|
private volatile OracleDatabaseSchema schema;
|
||||||
|
|
||||||
@ -80,7 +78,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
|
|||||||
EventDispatcher<TableId> dispatcher = new EventDispatcher<>(connectorConfig, topicSelector, schema, queue,
|
EventDispatcher<TableId> dispatcher = new EventDispatcher<>(connectorConfig, topicSelector, schema, queue,
|
||||||
connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider);
|
connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, metadataProvider);
|
||||||
|
|
||||||
coordinator = new ChangeEventSourceCoordinator(
|
ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator(
|
||||||
previousOffset,
|
previousOffset,
|
||||||
errorHandler,
|
errorHandler,
|
||||||
OracleConnector.class,
|
OracleConnector.class,
|
||||||
@ -107,18 +105,6 @@ public List<SourceRecord> doPoll() throws InterruptedException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doStop() {
|
public void doStop() {
|
||||||
try {
|
|
||||||
if (coordinator != null) {
|
|
||||||
coordinator.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (InterruptedException e) {
|
|
||||||
Thread.interrupted();
|
|
||||||
LOGGER.error("Interrupted while stopping coordinator", e);
|
|
||||||
// XStream code can end in SIGSEGV so fail the task instead of JVM crash
|
|
||||||
throw new ConnectException("Interrupted while stopping coordinator, failing the task");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (jdbcConnection != null) {
|
if (jdbcConnection != null) {
|
||||||
jdbcConnection.close();
|
jdbcConnection.close();
|
||||||
|
Loading…
Reference in New Issue
Block a user