DBZ-1766 Unifying handling of offset commits; updating lastOffset to commit only after a record is committed

This commit is contained in:
Gunnar Morling 2020-02-10 12:16:16 +01:00
parent c78f1f62df
commit f5704322cc
3 changed files with 6 additions and 56 deletions

View File

@ -9,7 +9,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -45,13 +44,6 @@ public class Db2ConnectorTask extends BaseSourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(Db2ConnectorTask.class);
private static final String CONTEXT_NAME = "db2-server-connector-task";
private static enum State {
RUNNING,
STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
private volatile Db2TaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Db2Connection dataConnection;
@ -59,7 +51,6 @@ private static enum State {
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile Db2DatabaseSchema schema;
private volatile Map<String, ?> lastOffset;
@Override
public String version() {
@ -67,12 +58,7 @@ public String version() {
}
@Override
public void start(Configuration config) {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
public ChangeEventSourceCoordinator start(Configuration config) {
final Db2ConnectorConfig connectorConfig = new Db2ConnectorConfig(config);
final TopicSelector<TableId> topicSelector = Db2TopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
@ -137,6 +123,8 @@ public void start(Configuration config) {
schema);
coordinator.start(taskContext, this.queue, metadataProvider);
return coordinator;
}
/**
@ -168,20 +156,10 @@ public List<SourceRecord> poll() throws InterruptedException {
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
if (!sourceRecords.isEmpty()) {
this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset();
}
return sourceRecords;
}
@Override
public void commit() throws InterruptedException {
if (coordinator != null) {
coordinator.commitOffset(lastOffset);
}
}
@Override
public void stop() {
cleanupResources();

View File

@ -336,10 +336,6 @@ private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedExce
return tables.toArray(new ChangeTable[tables.size()]);
}
@Override
public void commitOffset(Map<String, ?> offset) {
}
/**
* The logical representation of a position for the change in the transaction log.
* During each sourcing cycle it is necessary to query all change tables and then

View File

@ -7,8 +7,6 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
@ -35,20 +33,12 @@ public class OracleConnectorTask extends BaseSourceTask {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorTask.class);
private static final String CONTEXT_NAME = "oracle-connector-task";
private static enum State {
RUNNING,
STOPPED;
}
private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
private volatile OracleTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile OracleConnection jdbcConnection;
private volatile ChangeEventSourceCoordinator coordinator;
private volatile ErrorHandler errorHandler;
private volatile OracleDatabaseSchema schema;
private volatile Map<String, ?> lastOffset;
@Override
public String version() {
@ -56,12 +46,7 @@ public String version() {
}
@Override
public void start(Configuration config) {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
public ChangeEventSourceCoordinator start(Configuration config) {
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
@ -105,6 +90,8 @@ public void start(Configuration config) {
schema);
coordinator.start(taskContext, this.queue, metadataProvider);
return coordinator;
}
@Override
@ -115,20 +102,9 @@ public List<SourceRecord> poll() throws InterruptedException {
.map(DataChangeEvent::getRecord)
.collect(Collectors.toList());
if (!sourceRecords.isEmpty()) {
this.lastOffset = sourceRecords.get(sourceRecords.size() - 1).sourceOffset();
}
return sourceRecords;
}
@Override
public void commit() throws InterruptedException {
if (lastOffset != null) {
coordinator.commitOffset(lastOffset);
}
}
@Override
public void stop() {
cleanupResources();