From b6222d42bf3e4aaced5a52cc4e5e16ac52ad5f40 Mon Sep 17 00:00:00 2001 From: Andrew Walker Date: Wed, 25 May 2022 13:50:08 +1000 Subject: [PATCH] DBZ-5136: Add first flag and refactor OffsetContext --- .../mongodb/MongoDbOffsetContext.java | 15 +++++++ .../mongodb/ReplicaSetOffsetContext.java | 15 +++++++ .../connector/mysql/MySqlOffsetContext.java | 33 +++----------- .../connector/mysql/SnapshotSourceIT.java | 11 ++++- .../connector/oracle/OracleOffsetContext.java | 33 +++----------- .../postgresql/PostgresOffsetContext.java | 33 +++----------- .../sqlserver/SqlServerOffsetContext.java | 34 +++------------ .../io/debezium/connector/SnapshotRecord.java | 8 ++++ .../connector/common/BaseSourceInfo.java | 4 +- .../debezium/pipeline/spi/OffsetContext.java | 10 +++++ .../relational/RelationalOffsetContext.java | 43 +++++++++++++++++++ .../RelationalSnapshotChangeEventSource.java | 26 +++++++++-- 12 files changed, 154 insertions(+), 111 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index 079058afe..471562ee4 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -11,6 +11,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import io.debezium.connector.common.BaseSourceInfo; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; @@ -71,6 +72,10 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } + public BaseSourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning(); @@ -95,6 +100,16 @@ public void markSnapshotRecord() { sourceInfo.setSnapshot(SnapshotRecord.TRUE); } + @Override + public void markFirstRecordInDataCollection() { + sourceInfo.setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION); + } + + @Override + public void markFirstSnapshotRecord() { + sourceInfo.setSnapshot(SnapshotRecord.FIRST); + } + @Override public void markLastRecordInDataCollection() { sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java index bd6cdd704..db026457d 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java @@ -8,6 +8,7 @@ import java.time.Instant; import java.util.Map; +import io.debezium.connector.common.BaseSourceInfo; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; @@ -64,6 +65,10 @@ public Struct getSourceInfo() { return offsetContext.getSourceInfo(); } + public BaseSourceInfo getSourceInfoObject() { + return offsetContext.getSourceInfoObject(); + } + @Override public boolean isSnapshotRunning() { return offsetContext.isSnapshotRunning(); @@ -74,6 +79,16 @@ public void markSnapshotRecord() { offsetContext.markSnapshotRecord(); } + @Override + public void markFirstRecordInDataCollection() { + offsetContext.markFirstRecordInDataCollection(); + } + + @Override + public void markFirstSnapshotRecord() { + offsetContext.markFirstSnapshotRecord(); + } + @Override public void markLastRecordInDataCollection() { offsetContext.markLastRecordInDataCollection(); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index 325394701..9f9ab4fb2 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -15,14 +15,16 @@ import org.apache.kafka.connect.errors.ConnectException; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class MySqlOffsetContext implements OffsetContext { +public class MySqlOffsetContext extends RelationalOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; @@ -114,6 +116,10 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } + public BaseSourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; @@ -134,11 +140,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - private void setTransactionId() { // use GTID if it is available if (sourceInfo.getCurrentGtid() != null) { @@ -221,21 +222,6 @@ private long longOffsetValue(Map values, String key) { } } - @Override - public void markSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - } - - @Override - public void markLastRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - @Override - public void markLastSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.LAST); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -259,11 +245,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java index dd608dc77..e21e1de8e 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java @@ -174,7 +174,16 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCa if (i.hasNext()) { final Object snapshotOffsetField = record.sourceOffset().get("snapshot"); assertThat(snapshotOffsetField).isEqualTo(true); - assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection")); + + if (Objects.equals(snapshotSourceField, "first")) { + assertThat(previousRecordTable).isNull(); + } + else if (Objects.equals(snapshotSourceField, "first_in_data_collection")) { + assertThat(previousRecordTable).isNotEqualTo(currentRecordTable); + } + else { + assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection")); + } if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) { assertThat(previousRecordTable).isNotEqualTo(currentRecordTable); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index f0814da28..f9de936bc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -19,10 +19,11 @@ import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class OracleOffsetContext implements OffsetContext { +public class OracleOffsetContext extends RelationalOffsetContext { public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx"; @@ -204,7 +205,10 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override + public BaseSourceInfo getSourceInfoObject() { + return sourceInfo; + } + public Struct getSourceInfo() { return sourceInfo.struct(); } @@ -273,11 +277,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - @Override public String toString() { StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn()); @@ -294,21 +293,6 @@ public String toString() { return sb.toString(); } - @Override - public void markSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - } - - @Override - public void markLastRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - @Override - public void markLastSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.LAST); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.tableEvent((TableId) tableId); @@ -330,11 +314,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 222d0fe65..5994f1ea7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory; import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.spi.OffsetState; @@ -24,12 +25,13 @@ import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; +import io.debezium.relational.RelationalOffsetContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; import io.debezium.time.Conversions; import io.debezium.util.Clock; -public class PostgresOffsetContext implements OffsetContext { +public class PostgresOffsetContext extends RelationalOffsetContext { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -105,6 +107,10 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } + public BaseSourceInfo getSourceInfoObject() { + return sourceInfo; + } + @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot(); @@ -121,11 +127,6 @@ public void preSnapshotCompletion() { lastSnapshotRecord = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; sourceInfo.update(lsn, commitTime, txId, xmin, tableId); @@ -258,21 +259,6 @@ public OffsetState asOffsetState() { sourceInfo.isSnapshot()); } - @Override - public void markSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - } - - @Override - public void markLastRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - @Override - public void markLastSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.LAST); - } - @Override public void event(DataCollectionId tableId, Instant instant) { sourceInfo.update(instant, (TableId) tableId); @@ -283,11 +269,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java index d1c496d35..9ea840666 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerOffsetContext.java @@ -8,6 +8,8 @@ import java.time.Instant; import java.util.Map; +import io.debezium.connector.common.BaseSourceInfo; +import io.debezium.relational.RelationalOffsetContext; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; @@ -20,7 +22,7 @@ import io.debezium.schema.DataCollectionId; import io.debezium.util.Collect; -public class SqlServerOffsetContext implements OffsetContext { +public class SqlServerOffsetContext extends RelationalOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; @@ -82,7 +84,10 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - @Override + public BaseSourceInfo getSourceInfoObject() { + return sourceInfo; + } + public Struct getSourceInfo() { return sourceInfo.struct(); } @@ -127,11 +132,6 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } - @Override - public void postSnapshotCompletion() { - sourceInfo.setSnapshot(SnapshotRecord.FALSE); - } - public static class Loader implements OffsetContext.Loader { private final SqlServerConnectorConfig connectorConfig; @@ -168,21 +168,6 @@ public String toString() { "]"; } - @Override - public void markSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.TRUE); - } - - @Override - public void markLastRecordInDataCollection() { - sourceInfo.setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - @Override - public void markLastSnapshotRecord() { - sourceInfo.setSnapshot(SnapshotRecord.LAST); - } - @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -194,11 +179,6 @@ public TransactionContext getTransactionContext() { return transactionContext; } - @Override - public void incrementalSnapshotEvents() { - sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); - } - @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java index ba763aa90..68cb361e5 100644 --- a/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java +++ b/debezium-core/src/main/java/io/debezium/connector/SnapshotRecord.java @@ -18,6 +18,14 @@ public enum SnapshotRecord { * Record is from snapshot is not the last one. */ TRUE, + /** + * Record is from snapshot is the first record generated in snapshot phase. + */ + FIRST, + /** + * Record is from snapshot and the first record generated from the table, but not in the entire snapshot. + */ + FIRST_IN_DATA_COLLECTION, /** * Record is from snapshot and the last record generated from the table, but not in the entire snapshot. */ diff --git a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceInfo.java b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceInfo.java index f9520c8d7..7655ee3cd 100644 --- a/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceInfo.java +++ b/debezium-core/src/main/java/io/debezium/connector/common/BaseSourceInfo.java @@ -18,7 +18,9 @@ public BaseSourceInfo(CommonConnectorConfig config) { } public boolean isSnapshot() { - return snapshotRecord == SnapshotRecord.TRUE || snapshotRecord == SnapshotRecord.LAST || snapshotRecord == SnapshotRecord.LAST_IN_DATA_COLLECTION; + return snapshotRecord == SnapshotRecord.TRUE || + snapshotRecord == SnapshotRecord.FIRST || snapshotRecord == SnapshotRecord.FIRST_IN_DATA_COLLECTION || + snapshotRecord == SnapshotRecord.LAST || snapshotRecord == SnapshotRecord.LAST_IN_DATA_COLLECTION; } /** diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index 22c361a69..c86acedc7 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -55,6 +55,16 @@ interface Loader { */ void markLastRecordInDataCollection(); + /** + * mark current record as the first one in the snapshot + */ + void markFirstSnapshotRecord(); + + /** + * mark current record as the first one in the table or collection + */ + void markFirstRecordInDataCollection(); + /** * mark current record as the last one in the snapshot */ diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java b/debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java new file mode 100644 index 000000000..fd25c5a00 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalOffsetContext.java @@ -0,0 +1,43 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational; + +import io.debezium.connector.SnapshotRecord; +import io.debezium.connector.common.BaseSourceInfo; +import io.debezium.pipeline.spi.OffsetContext; + +public abstract class RelationalOffsetContext implements OffsetContext { + + public abstract BaseSourceInfo getSourceInfoObject(); + + public void markSnapshotRecord() { + getSourceInfoObject().setSnapshot(SnapshotRecord.TRUE); + } + + public void markFirstSnapshotRecord() { + getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST); + } + + public void markFirstRecordInDataCollection() { + getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION); + } + + public void markLastSnapshotRecord() { + getSourceInfoObject().setSnapshot(SnapshotRecord.LAST); + } + + public void markLastRecordInDataCollection() { + getSourceInfoObject().setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); + } + + public void incrementalSnapshotEvents() { + getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL); + } + + public void postSnapshotCompletion() { + getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE); + } +} diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index d6f4f7f43..62671f780 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -52,7 +52,7 @@ * * @author Gunnar Morling */ -public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { +public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class); @@ -303,6 +303,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, LOGGER.info("Snapshotting contents of {} tables while still in transaction", tableCount); for (Iterator tableIdIterator = snapshotContext.capturedTables.iterator(); tableIdIterator.hasNext();) { final TableId tableId = tableIdIterator.next(); + snapshotContext.firstTable = tableOrder == 1; snapshotContext.lastTable = !tableIdIterator.hasNext(); if (!sourceContext.isRunning()) { @@ -348,13 +349,19 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext, final OptionalLong rowCount = rowCountForTable(table.id()); try (Statement statement = readTableStatement(rowCount); - ResultSet rs = CancellableResultSet.from(statement.executeQuery(selectStatement.get()))) { + ResultSet rs = CancellableResultSet.from(statement.executeQuery(selectStatement.get()))) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); long rows = 0; Timer logTimer = getTableScanLogTimer(); snapshotContext.lastRecordInTable = false; - snapshotRecord(snapshotContext); + + if (snapshotContext.firstTable) { + firstSnapshotRecord(snapshotContext); + } + else { + firstRecordInDataCollection(snapshotContext); + } if (rs.next()) { while (!snapshotContext.lastRecordInTable) { @@ -380,6 +387,10 @@ private void createDataEventsForTable(ChangeEventSourceContext sourceContext, logTimer = getTableScanLogTimer(); } + if (rows > 1) { + snapshotRecord(snapshotContext); + } + if (snapshotContext.lastRecordInTable) { if (snapshotContext.lastTable) { lastSnapshotRecord(snapshotContext); @@ -409,6 +420,14 @@ protected void snapshotRecord(RelationalSnapshotContext snapshotContext) { snapshotContext.offset.markSnapshotRecord(); } + protected void firstSnapshotRecord(RelationalSnapshotContext snapshotContext) { + snapshotContext.offset.markFirstSnapshotRecord(); + } + + protected void firstRecordInDataCollection(RelationalSnapshotContext snapshotContext) { + snapshotContext.offset.markFirstRecordInDataCollection(); + } + protected void lastRecordInDataCollection(RelationalSnapshotContext snapshotContext) { snapshotContext.offset.markLastRecordInDataCollection(); } @@ -552,6 +571,7 @@ public static class RelationalSnapshotContext

capturedTables; public Set capturedSchemaTables; + public boolean firstTable; public boolean lastTable; public boolean lastRecordInTable;