DBZ-5136: Add first flag and refactor OffsetContext

This commit is contained in:
Andrew Walker 2022-05-25 13:50:08 +10:00 committed by Jiri Pechanec
parent 593c2769e6
commit b6222d42bf
12 changed files with 154 additions and 111 deletions

View File

@ -11,6 +11,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.debezium.connector.common.BaseSourceInfo;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument;
@ -71,6 +72,10 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning();
@ -95,6 +100,16 @@ public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markFirstRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
}
@Override
public void markFirstSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.FIRST);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);

View File

@ -8,6 +8,7 @@
import java.time.Instant;
import java.util.Map;
import io.debezium.connector.common.BaseSourceInfo;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.bson.BsonDocument;
@ -64,6 +65,10 @@ public Struct getSourceInfo() {
return offsetContext.getSourceInfo();
}
public BaseSourceInfo getSourceInfoObject() {
return offsetContext.getSourceInfoObject();
}
@Override
public boolean isSnapshotRunning() {
return offsetContext.isSnapshotRunning();
@ -74,6 +79,16 @@ public void markSnapshotRecord() {
offsetContext.markSnapshotRecord();
}
@Override
public void markFirstRecordInDataCollection() {
offsetContext.markFirstRecordInDataCollection();
}
@Override
public void markFirstSnapshotRecord() {
offsetContext.markFirstSnapshotRecord();
}
@Override
public void markLastRecordInDataCollection() {
offsetContext.markLastRecordInDataCollection();

View File

@ -15,14 +15,16 @@
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class MySqlOffsetContext implements OffsetContext {
public class MySqlOffsetContext extends RelationalOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
@ -114,6 +116,10 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;
@ -134,11 +140,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
private void setTransactionId() {
// use GTID if it is available
if (sourceInfo.getCurrentGtid() != null) {
@ -221,21 +222,6 @@ private long longOffsetValue(Map<String, ?> values, String key) {
}
}
@Override
public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -259,11 +245,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -174,7 +174,16 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCa
if (i.hasNext()) {
final Object snapshotOffsetField = record.sourceOffset().get("snapshot");
assertThat(snapshotOffsetField).isEqualTo(true);
assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection"));
if (Objects.equals(snapshotSourceField, "first")) {
assertThat(previousRecordTable).isNull();
}
else if (Objects.equals(snapshotSourceField, "first_in_data_collection")) {
assertThat(previousRecordTable).isNotEqualTo(currentRecordTable);
}
else {
assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection"));
}
if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) {
assertThat(previousRecordTable).isNotEqualTo(currentRecordTable);

View File

@ -19,10 +19,11 @@
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class OracleOffsetContext implements OffsetContext {
public class OracleOffsetContext extends RelationalOffsetContext {
public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx";
@ -204,7 +205,10 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@ -273,11 +277,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn());
@ -294,21 +293,6 @@ public String toString() {
return sb.toString();
}
@Override
public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.tableEvent((TableId) tableId);
@ -330,11 +314,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -17,6 +17,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
@ -24,12 +25,13 @@
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalOffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.time.Conversions;
import io.debezium.util.Clock;
public class PostgresOffsetContext implements OffsetContext {
public class PostgresOffsetContext extends RelationalOffsetContext {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
@ -105,6 +107,10 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot();
@ -121,11 +127,6 @@ public void preSnapshotCompletion() {
lastSnapshotRecord = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
sourceInfo.update(lsn, commitTime, txId, xmin, tableId);
@ -258,21 +259,6 @@ public OffsetState asOffsetState() {
sourceInfo.isSnapshot());
}
@Override
public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override
public void event(DataCollectionId tableId, Instant instant) {
sourceInfo.update(instant, (TableId) tableId);
@ -283,11 +269,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -8,6 +8,8 @@
import java.time.Instant;
import java.util.Map;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.relational.RelationalOffsetContext;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -20,7 +22,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext implements OffsetContext {
public class SqlServerOffsetContext extends RelationalOffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
@ -82,7 +84,10 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
@Override
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@ -127,11 +132,6 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public static class Loader implements OffsetContext.Loader<SqlServerOffsetContext> {
private final SqlServerConnectorConfig connectorConfig;
@ -168,21 +168,6 @@ public String toString() {
"]";
}
@Override
public void markSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.TRUE);
}
@Override
public void markLastRecordInDataCollection() {
sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
@Override
public void markLastSnapshotRecord() {
sourceInfo.setSnapshot(SnapshotRecord.LAST);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -194,11 +179,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -18,6 +18,14 @@ public enum SnapshotRecord {
* Record is from snapshot is not the last one.
*/
TRUE,
/**
* Record is from snapshot is the first record generated in snapshot phase.
*/
FIRST,
/**
* Record is from snapshot and the first record generated from the table, but not in the entire snapshot.
*/
FIRST_IN_DATA_COLLECTION,
/**
* Record is from snapshot and the last record generated from the table, but not in the entire snapshot.
*/

View File

@ -18,7 +18,9 @@ public BaseSourceInfo(CommonConnectorConfig config) {
}
public boolean isSnapshot() {
return snapshotRecord == SnapshotRecord.TRUE || snapshotRecord == SnapshotRecord.LAST || snapshotRecord == SnapshotRecord.LAST_IN_DATA_COLLECTION;
return snapshotRecord == SnapshotRecord.TRUE ||
snapshotRecord == SnapshotRecord.FIRST || snapshotRecord == SnapshotRecord.FIRST_IN_DATA_COLLECTION ||
snapshotRecord == SnapshotRecord.LAST || snapshotRecord == SnapshotRecord.LAST_IN_DATA_COLLECTION;
}
/**

View File

@ -55,6 +55,16 @@ interface Loader<O extends OffsetContext> {
*/
void markLastRecordInDataCollection();
/**
* mark current record as the first one in the snapshot
*/
void markFirstSnapshotRecord();
/**
* mark current record as the first one in the table or collection
*/
void markFirstRecordInDataCollection();
/**
* mark current record as the last one in the snapshot
*/

View File

@ -0,0 +1,43 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.spi.OffsetContext;
public abstract class RelationalOffsetContext implements OffsetContext {
public abstract BaseSourceInfo getSourceInfoObject();
public void markSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.TRUE);
}
public void markFirstSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST);
}
public void markFirstRecordInDataCollection() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
}
public void markLastSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.LAST);
}
public void markLastRecordInDataCollection() {
getSourceInfoObject().setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
public void incrementalSnapshotEvents() {
getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL);
}
public void postSnapshotCompletion() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE);
}
}

View File

@ -52,7 +52,7 @@
*
* @author Gunnar Morling
*/
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends RelationalOffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
@ -303,6 +303,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext,
LOGGER.info("Snapshotting contents of {} tables while still in transaction", tableCount);
for (Iterator<TableId> tableIdIterator = snapshotContext.capturedTables.iterator(); tableIdIterator.hasNext();) {
final TableId tableId = tableIdIterator.next();
snapshotContext.firstTable = tableOrder == 1;
snapshotContext.lastTable = !tableIdIterator.hasNext();
if (!sourceContext.isRunning()) {
@ -348,13 +349,19 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext,
final OptionalLong rowCount = rowCountForTable(table.id());
try (Statement statement = readTableStatement(rowCount);
ResultSet rs = CancellableResultSet.from(statement.executeQuery(selectStatement.get()))) {
ResultSet rs = CancellableResultSet.from(statement.executeQuery(selectStatement.get()))) {
ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);
long rows = 0;
Timer logTimer = getTableScanLogTimer();
snapshotContext.lastRecordInTable = false;
snapshotRecord(snapshotContext);
if (snapshotContext.firstTable) {
firstSnapshotRecord(snapshotContext);
}
else {
firstRecordInDataCollection(snapshotContext);
}
if (rs.next()) {
while (!snapshotContext.lastRecordInTable) {
@ -380,6 +387,10 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext,
logTimer = getTableScanLogTimer();
}
if (rows > 1) {
snapshotRecord(snapshotContext);
}
if (snapshotContext.lastRecordInTable) {
if (snapshotContext.lastTable) {
lastSnapshotRecord(snapshotContext);
@ -409,6 +420,14 @@ protected void snapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markSnapshotRecord();
}
protected void firstSnapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markFirstSnapshotRecord();
}
protected void firstRecordInDataCollection(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markFirstRecordInDataCollection();
}
protected void lastRecordInDataCollection(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markLastRecordInDataCollection();
}
@ -552,6 +571,7 @@ public static class RelationalSnapshotContext<P extends Partition, O extends Off
public Set<TableId> capturedTables;
public Set<TableId> capturedSchemaTables;
public boolean firstTable;
public boolean lastTable;
public boolean lastRecordInTable;