DBZ-1723 Encapsulating common poll() and stop() logic in BaseSourceTask

This commit is contained in:
Gunnar Morling 2020-03-02 11:25:15 +01:00 committed by Jiri Pechanec
parent 6318cf0f2b
commit 8ce69dc597
5 changed files with 15 additions and 4 deletions

View File

@ -124,7 +124,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
List<DataChangeEvent> records = queue.poll();
return records.stream().map(DataChangeEvent::getRecord).collect(Collectors.toList());
}

View File

@ -423,7 +423,7 @@ private static Filters getAllFilters(Configuration config) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
Reader currentReader = readers;
if (currentReader == null) {
return null;

View File

@ -200,7 +200,7 @@ public ReplicationConnection createReplicationConnection(PostgresTaskContext tas
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
final List<DataChangeEvent> records = queue.poll();
final List<SourceRecord> sourceRecords = records.stream()

View File

@ -126,7 +126,7 @@ public ChangeEventSourceCoordinator start(Configuration config) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
public List<SourceRecord> doPoll() throws InterruptedException {
final List<DataChangeEvent> records = queue.poll();
final List<SourceRecord> sourceRecords = records.stream()

View File

@ -6,6 +6,7 @@
package io.debezium.connector.common;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -86,6 +87,16 @@ public final void start(Map<String, String> props) {
*/
protected abstract ChangeEventSourceCoordinator start(Configuration config);
@Override
public final List<SourceRecord> poll() throws InterruptedException {
return doPoll();
}
/**
* Returns the next batch of source records, if any are available.
*/
public abstract List<SourceRecord> doPoll() throws InterruptedException;
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
Map<String, ?> currentOffset = record.sourceOffset();