DBZ-5136: Refactored SnapshotContext setters

This commit is contained in:
Andrew Walker 2022-05-25 14:57:36 +10:00 committed by Jiri Pechanec
parent b6222d42bf
commit 01a81f8bc1
3 changed files with 23 additions and 37 deletions

View File

@ -14,8 +14,8 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;

View File

@ -18,9 +18,7 @@ public BaseSourceInfo(CommonConnectorConfig config) {
}
public boolean isSnapshot() {
return snapshotRecord == SnapshotRecord.TRUE ||
snapshotRecord == SnapshotRecord.FIRST || snapshotRecord == SnapshotRecord.FIRST_IN_DATA_COLLECTION ||
snapshotRecord == SnapshotRecord.LAST || snapshotRecord == SnapshotRecord.LAST_IN_DATA_COLLECTION;
return snapshotRecord != SnapshotRecord.INCREMENTAL && snapshotRecord != SnapshotRecord.FALSE;
}
/**

View File

@ -356,13 +356,6 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext,
Timer logTimer = getTableScanLogTimer();
snapshotContext.lastRecordInTable = false;
if (snapshotContext.firstTable) {
firstSnapshotRecord(snapshotContext);
}
else {
firstRecordInDataCollection(snapshotContext);
}
if (rs.next()) {
while (!snapshotContext.lastRecordInTable) {
if (!sourceContext.isRunning()) {
@ -372,7 +365,6 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext,
rows++;
final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray);
snapshotContext.lastRecordInTable = !rs.next();
if (logTimer.expired()) {
long stop = clock.currentTimeInMillis();
if (rowCount.isPresent()) {
@ -387,18 +379,10 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext,
logTimer = getTableScanLogTimer();
}
if (rows > 1) {
snapshotRecord(snapshotContext);
}
snapshotContext.firstRecordInTable = rows == 1;
snapshotContext.lastRecordInTable = !rs.next();
setSnapshotMarker(snapshotContext);
if (snapshotContext.lastRecordInTable) {
if (snapshotContext.lastTable) {
lastSnapshotRecord(snapshotContext);
}
else {
lastRecordInDataCollection(snapshotContext);
}
}
dispatcher.dispatchSnapshotEvent(snapshotContext.partition, table.id(),
getChangeRecordEmitter(snapshotContext, table.id(), row), snapshotReceiver);
}
@ -416,20 +400,23 @@ else if (snapshotContext.lastTable) {
}
}
protected void snapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markSnapshotRecord();
}
protected void firstSnapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markFirstSnapshotRecord();
}
protected void firstRecordInDataCollection(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markFirstRecordInDataCollection();
}
protected void lastRecordInDataCollection(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markLastRecordInDataCollection();
private void setSnapshotMarker(RelationalSnapshotContext<P, O> snapshotContext) {
if (snapshotContext.lastRecordInTable) {
if (snapshotContext.lastTable) {
lastSnapshotRecord(snapshotContext);
}
else {
snapshotContext.offset.markLastRecordInDataCollection();
}
} else if (snapshotContext.firstRecordInTable) {
if (snapshotContext.firstTable) {
snapshotContext.offset.markFirstSnapshotRecord();
} else {
snapshotContext.offset.markFirstRecordInDataCollection();
}
} else {
snapshotContext.offset.markSnapshotRecord();
}
}
protected void lastSnapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
@ -572,6 +559,7 @@ public static class RelationalSnapshotContext<P extends Partition, O extends Off
public Set<TableId> capturedTables;
public Set<TableId> capturedSchemaTables;
public boolean firstTable;
public boolean firstRecordInTable;
public boolean lastTable;
public boolean lastRecordInTable;