DBZ-5136 further refining of the algorithm for determining position in snapshot and IT test fixes

This commit is contained in:
Mark Bereznitsky 2022-05-28 15:24:02 +10:00 committed by Jiri Pechanec
parent 08e8c3d795
commit 2d72dae649
17 changed files with 193 additions and 218 deletions

View File

@ -16,9 +16,8 @@
import org.bson.BsonDocument;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
@ -27,7 +26,7 @@
*
* @author Chris Cranford
*/
public class MongoDbOffsetContext extends CommonOffsetContext {
public class MongoDbOffsetContext implements OffsetContext {
private final SourceInfo sourceInfo;
private final TransactionContext transactionContext;
@ -72,10 +71,6 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning();
@ -95,6 +90,11 @@ public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public TransactionContext getTransactionContext() {
return transactionContext;

View File

@ -31,6 +31,7 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -464,7 +465,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
snapshotContext.lastRecordInCollection = !cursor.hasNext();
if (snapshotContext.lastCollection && snapshotContext.lastRecordInCollection) {
snapshotContext.offset.markLastSnapshotRecord();
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
}
dispatcher.dispatchSnapshotEvent(snapshotContext.partition, collectionId,
@ -474,7 +475,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
}
else if (snapshotContext.lastCollection) {
// if the last collection does not contain any records we still need to mark the last processed event as last one
snapshotContext.offset.markLastSnapshotRecord();
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
}
LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", docs, collectionId,

View File

@ -16,8 +16,7 @@
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -33,7 +32,7 @@
* @author Chris Cranford
*/
@ThreadSafe
public class ReplicaSetOffsetContext extends CommonOffsetContext {
public class ReplicaSetOffsetContext implements OffsetContext {
private final MongoDbOffsetContext offsetContext;
private final String replicaSetName;
@ -66,38 +65,14 @@ public Struct getSourceInfo() {
return offsetContext.getSourceInfo();
}
public BaseSourceInfo getSourceInfoObject() {
return offsetContext.getSourceInfoObject();
}
@Override
public boolean isSnapshotRunning() {
return offsetContext.isSnapshotRunning();
}
@Override
public void markSnapshotRecord() {
offsetContext.markSnapshotRecord();
}
@Override
public void markFirstRecordInDataCollection() {
offsetContext.markFirstRecordInDataCollection();
}
@Override
public void markFirstSnapshotRecord() {
offsetContext.markFirstSnapshotRecord();
}
@Override
public void markLastRecordInDataCollection() {
offsetContext.markLastRecordInDataCollection();
}
@Override
public void markLastSnapshotRecord() {
offsetContext.markLastSnapshotRecord();
public void markSnapshotRecord(SnapshotRecord record) {
offsetContext.markSnapshotRecord(record);
}
@Override

View File

@ -15,8 +15,6 @@
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -24,7 +22,7 @@
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class MySqlOffsetContext extends CommonOffsetContext {
public class MySqlOffsetContext implements OffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event";
@ -116,10 +114,6 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot() && !snapshotCompleted;
@ -140,6 +134,11 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
private void setTransactionId() {
// use GTID if it is available
if (sourceInfo.getCurrentGtid() != null) {
@ -222,6 +221,11 @@ private long longOffsetValue(Map<String, ?> values, String key) {
}
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -245,6 +249,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -8,7 +8,6 @@
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import java.nio.file.Path;
import java.sql.Connection;
@ -181,11 +180,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCa
else if (Objects.equals(snapshotSourceField, "first_in_data_collection")) {
assertThat(previousRecordTable).isNotEqualTo(currentRecordTable);
}
else {
assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection"));
}
if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) {
else if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) {
assertThat(previousRecordTable).isNotEqualTo(currentRecordTable);
}
}

View File

@ -16,14 +16,13 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
public class OracleOffsetContext extends CommonOffsetContext {
public class OracleOffsetContext implements OffsetContext {
public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx";
@ -205,10 +204,7 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@ -277,6 +273,11 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn());
@ -293,6 +294,11 @@ public String toString() {
return sb.toString();
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.tableEvent((TableId) tableId);
@ -314,6 +320,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -17,11 +17,9 @@
import org.slf4j.LoggerFactory;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -31,7 +29,7 @@
import io.debezium.time.Conversions;
import io.debezium.util.Clock;
public class PostgresOffsetContext extends CommonOffsetContext {
public class PostgresOffsetContext implements OffsetContext {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class);
public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
@ -107,10 +105,6 @@ public Struct getSourceInfo() {
return sourceInfo.struct();
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public boolean isSnapshotRunning() {
return sourceInfo.isSnapshot();
@ -127,6 +121,11 @@ public void preSnapshotCompletion() {
lastSnapshotRecord = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
sourceInfo.update(lsn, commitTime, txId, xmin, tableId);
@ -259,6 +258,11 @@ public OffsetState asOffsetState() {
sourceInfo.isSnapshot());
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant instant) {
sourceInfo.update(instant, (TableId) tableId);
@ -269,6 +273,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -1045,7 +1045,27 @@ protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedSch
}
}
protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean shouldBeSnapshot, boolean shouldBeLastSnapshotRecord) {
protected SnapshotRecord expectedSnapshotRecordFromPosition(int totalPosition, int totalCount, int topicPosition, int topicCount) {
if (totalPosition == totalCount) {
return SnapshotRecord.LAST;
}
if (totalPosition == 1) {
return SnapshotRecord.FIRST;
}
if (topicPosition == topicCount) {
return SnapshotRecord.LAST_IN_DATA_COLLECTION;
}
if (topicPosition == 1) {
return SnapshotRecord.FIRST_IN_DATA_COLLECTION;
}
return SnapshotRecord.TRUE;
}
protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, SnapshotRecord expectedType) {
Map<String, ?> offset = record.sourceOffset();
assertNotNull(offset.get(SourceInfo.TXID_KEY));
assertNotNull(offset.get(SourceInfo.TIMESTAMP_USEC_KEY));
@ -1054,9 +1074,8 @@ protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean
Object lastSnapshotRecord = offset.get(SourceInfo.LAST_SNAPSHOT_RECORD_KEY);
if (shouldBeSnapshot) {
if (expectedType != SnapshotRecord.FALSE) {
assertTrue("Snapshot marker expected but not found", (Boolean) snapshot);
assertEquals("Last snapshot record marker mismatch", shouldBeLastSnapshotRecord, lastSnapshotRecord);
}
else {
assertNull("Snapshot marker not expected, but found", snapshot);
@ -1066,16 +1085,11 @@ protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean
if (envelope != null && Envelope.isEnvelopeSchema(envelope.schema())) {
final Struct source = (Struct) envelope.get("source");
final SnapshotRecord sourceSnapshot = SnapshotRecord.fromSource(source);
if (shouldBeSnapshot) {
if (shouldBeLastSnapshotRecord) {
assertEquals("Expected snapshot last record", SnapshotRecord.LAST, sourceSnapshot);
}
else {
assertEquals("Expected snapshot intermediary record", SnapshotRecord.TRUE, sourceSnapshot);
}
if (sourceSnapshot != null) {
assertEquals("Expected snapshot of type, but found", expectedType, sourceSnapshot);
}
else {
assertNull("Source snapshot marker not expected, but found", sourceSnapshot);
assertEquals("Source snapshot marker not expected, but found", expectedType, SnapshotRecord.FALSE);
}
}
}

View File

@ -21,6 +21,7 @@
import org.junit.rules.TestRule;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
@ -103,7 +104,7 @@ private void assertInsert(String statement, Integer pk, List<SchemaAndValueField
try {
executeAndWait(statement);
SourceRecord record = assertRecordInserted(expectedTopicName, pk != null ? PK_FIELD : null, pk);
assertRecordOffsetAndSnapshotSource(record, false, false);
assertRecordOffsetAndSnapshotSource(record, SnapshotRecord.FALSE);
assertSourceInfo(record, "postgres", table.schema(), table.table());
assertRecordSchemaAndValues(expectedSchemaAndValuesByColumn, record, Envelope.FieldName.AFTER);
}

View File

@ -12,9 +12,11 @@
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -35,6 +37,7 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.data.Bits;
import io.debezium.data.Enum;
@ -49,7 +52,6 @@
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
/**
* Integration test for {@link RecordsSnapshotProducerIT}
@ -83,14 +85,16 @@ public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTopicName = super.schemaAndValuesByTopicName();
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
Testing.Print.enable();
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
AtomicInteger totalCount = new AtomicInteger(0);
consumer.process(record -> {
assertReadRecord(record, expectedValuesByTopicName);
assertSourceInfo(record);
}
SnapshotRecord expected = expectedSnapshotRecordFromPosition(
totalCount.incrementAndGet(), expectedValuesByTopicName.size(),
1, 1);
assertRecordOffsetAndSnapshotSource(record, expected);
});
}
public static class CustomDatatypeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
@ -204,13 +208,13 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
SourceRecord first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 2);
assertEquals(topicName("s1.a"), first.topic());
assertRecordOffsetAndSnapshotSource(first, false, false);
assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.FALSE);
assertSourceInfo(first, TestHelper.TEST_DATABASE, "s1", "a");
SourceRecord second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 2);
assertEquals(topicName("s2.a"), second.topic());
assertRecordOffsetAndSnapshotSource(second, false, false);
assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE);
assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a");
// now shut down the producers and insert some more records
@ -231,7 +235,10 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
int counterVal = counter.getAndIncrement();
int expectedPk = (counterVal % 3) + 1; // each table has 3 entries keyed 1-3
VerifyRecord.isValidRead(record, PK_FIELD, expectedPk);
assertRecordOffsetAndSnapshotSource(record, true, counterVal == (expectedRecordsCount - 1));
SnapshotRecord expectedType = (counterVal % 3) == 0 ? SnapshotRecord.FIRST_IN_DATA_COLLECTION
: (counterVal % 3) == 1 ? SnapshotRecord.TRUE
: (counterVal == expectedRecordsCount - 1) ? SnapshotRecord.LAST : SnapshotRecord.LAST_IN_DATA_COLLECTION;
assertRecordOffsetAndSnapshotSource(record, expectedType);
assertSourceInfo(record);
});
consumer.clear();
@ -244,12 +251,12 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 4);
assertRecordOffsetAndSnapshotSource(first, false, false);
assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.FALSE);
assertSourceInfo(first, TestHelper.TEST_DATABASE, "s1", "a");
second = consumer.remove();
VerifyRecord.isValidInsert(second, PK_FIELD, 4);
assertRecordOffsetAndSnapshotSource(second, false, false);
assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE);
assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a");
}
@ -305,10 +312,10 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
final SourceRecord first = consumer.remove();
VerifyRecord.isValidRead(first, PK_FIELD, 1);
assertRecordOffsetAndSnapshotSource(first, true, true);
assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.LAST);
final SourceRecord second = consumer.remove();
assertThat(second.topic()).startsWith("__debezium-heartbeat");
assertRecordOffsetAndSnapshotSource(second, false, false);
assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE);
}
private void assertReadRecord(SourceRecord record, Map<String, List<SchemaAndValueField>> expectedValuesByTopicName) {
@ -334,14 +341,16 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTopicName = super.schemaAndValuesByTopicNameAdaptiveTimeMicroseconds();
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
AtomicInteger totalCount = new AtomicInteger(0);
consumer.process(record -> {
assertReadRecord(record, expectedValuesByTopicName);
assertSourceInfo(record);
}
SnapshotRecord expected = expectedSnapshotRecordFromPosition(
totalCount.incrementAndGet(), expectedValuesByTopicName.size(),
1, 1);
assertRecordOffsetAndSnapshotSource(record, expected);
});
}
@Test
@ -362,14 +371,12 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTopicName = super.schemaAndValuesByTopicNameStringEncodedDecimals();
consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName));
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
consumer.process(record -> {
assertReadRecord(record, expectedValuesByTopicName);
assertSourceInfo(record);
}
assertRecordOffsetAndSnapshotSource(record, SnapshotRecord.LAST_IN_DATA_COLLECTION);
});
}
@Test
@ -407,42 +414,44 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
// then start the producer and validate all records are there
buildNoStreamProducer(TestHelper.defaultConfig());
TestConsumer consumer = testConsumer(1 + 30);
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
Set<Integer> ids = new HashSet<>();
Map<String, Integer> topicCounts = Collect.hashMapOf(
"test_server.public.first_table", 0,
Map<String, Integer> expectedTopicCounts = Collect.hashMapOf(
"test_server.public.first_table", 1,
"test_server.public.partitioned", 0,
"test_server.public.partitioned_1_100", 0,
"test_server.public.partitioned_101_200", 0);
"test_server.public.partitioned_1_100", 10,
"test_server.public.partitioned_101_200", 20);
int expectedTotalCount = expectedTopicCounts.values().stream().mapToInt(Integer::intValue).sum();
TestConsumer consumer = testConsumer(expectedTotalCount);
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
Map<String, Integer> actualTopicCounts = new HashMap<>();
AtomicInteger actualTotalCount = new AtomicInteger(0);
consumer.process(record -> {
assertSourceInfo(record);
Struct key = (Struct) record.key();
if (key != null) {
final Integer id = key.getInt32("pk");
Assertions.assertThat(ids).excludes(id);
ids.add(id);
}
topicCounts.put(record.topic(), topicCounts.get(record.topic()) + 1);
actualTopicCounts.put(record.topic(), actualTopicCounts.getOrDefault(record.topic(), 0) + 1);
SnapshotRecord expected = expectedSnapshotRecordFromPosition(
actualTotalCount.incrementAndGet(), expectedTotalCount,
actualTopicCounts.get(record.topic()), expectedTopicCounts.get(record.topic()));
assertRecordOffsetAndSnapshotSource(record, expected);
});
// verify distinct records
assertEquals(31, ids.size());
assertEquals(expectedTotalCount, actualTotalCount.get());
assertEquals(expectedTotalCount, ids.size());
// verify each topic contains exactly the number of input records
assertEquals(1, topicCounts.get("test_server.public.first_table").intValue());
assertEquals(0, topicCounts.get("test_server.public.partitioned").intValue());
assertEquals(10, topicCounts.get("test_server.public.partitioned_1_100").intValue());
assertEquals(20, topicCounts.get("test_server.public.partitioned_101_200").intValue());
// check the offset information for each record
while (!consumer.isEmpty()) {
SourceRecord record = consumer.remove();
assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty());
assertSourceInfo(record);
}
assertTrue("Expected counts per topic don't match", expectedTopicCounts.entrySet().containsAll(actualTopicCounts.entrySet()));
}
@Test

View File

@ -60,6 +60,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SchemaRefreshMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
@ -2923,7 +2924,7 @@ private void assertInsert(String statement, Integer pk, List<SchemaAndValueField
try {
executeAndWait(statement);
SourceRecord record = assertRecordInserted(expectedTopicName, pk != null ? PK_FIELD : null, pk);
assertRecordOffsetAndSnapshotSource(record, false, false);
assertRecordOffsetAndSnapshotSource(record, SnapshotRecord.FALSE);
assertSourceInfo(record, "postgres", table.schema(), table.table());
assertRecordSchemaAndValues(expectedSchemaAndValuesByColumn, record, Envelope.FieldName.AFTER);
}
@ -2941,7 +2942,7 @@ private void assertDelete(String statement, Integer pk,
try {
executeAndWait(statement);
SourceRecord record = assertRecordDeleted(expectedTopicName, pk != null ? PK_FIELD : null, pk);
assertRecordOffsetAndSnapshotSource(record, false, false);
assertRecordOffsetAndSnapshotSource(record, SnapshotRecord.FALSE);
assertSourceInfo(record, "postgres", table.schema(), table.table());
assertRecordSchemaAndValues(expectedSchemaAndValuesByColumn, record, Envelope.FieldName.BEFORE);
assertRecordSchemaAndValues(null, record, Envelope.FieldName.AFTER);

View File

@ -12,8 +12,6 @@
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
@ -22,7 +20,7 @@
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Collect;
public class SqlServerOffsetContext extends CommonOffsetContext {
public class SqlServerOffsetContext implements OffsetContext {
private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
@ -84,10 +82,7 @@ public Schema getSourceInfoSchema() {
return sourceInfoSchema;
}
public BaseSourceInfo getSourceInfoObject() {
return sourceInfo;
}
@Override
public Struct getSourceInfo() {
return sourceInfo.struct();
}
@ -132,6 +127,11 @@ public void preSnapshotCompletion() {
snapshotCompleted = true;
}
@Override
public void postSnapshotCompletion() {
sourceInfo.setSnapshot(SnapshotRecord.FALSE);
}
public static class Loader implements OffsetContext.Loader<SqlServerOffsetContext> {
private final SqlServerConnectorConfig connectorConfig;
@ -168,6 +168,11 @@ public String toString() {
"]";
}
@Override
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}
@Override
public void event(DataCollectionId tableId, Instant timestamp) {
sourceInfo.setSourceTime(timestamp);
@ -179,6 +184,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}
@Override
public void incrementalSnapshotEvents() {
sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL);
}
@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;

View File

@ -155,7 +155,8 @@ public void takeSnapshotAndStartStreaming() throws Exception {
// Ignore initial records
final SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE);
final List<SourceRecord> table1 = records.recordsForTopic("server1.dbo.table1");
table1.subList(0, INITIAL_RECORDS_PER_TABLE - 1).forEach(record -> {
assertThat(((Struct) table1.get(0).value()).getStruct("source").getString("snapshot")).isEqualTo("first");
table1.subList(1, INITIAL_RECORDS_PER_TABLE - 1).forEach(record -> {
assertThat(((Struct) record.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
});
assertThat(((Struct) table1.get(INITIAL_RECORDS_PER_TABLE - 1).value()).getStruct("source").getString("snapshot")).isEqualTo("last");

View File

@ -1,43 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.pipeline.spi.OffsetContext;
public abstract class CommonOffsetContext implements OffsetContext {
public abstract BaseSourceInfo getSourceInfoObject();
public void markSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.TRUE);
}
public void markFirstSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST);
}
public void markFirstRecordInDataCollection() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
}
public void markLastSnapshotRecord() {
getSourceInfoObject().setSnapshot(SnapshotRecord.LAST);
}
public void markLastRecordInDataCollection() {
getSourceInfoObject().setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
public void incrementalSnapshotEvents() {
getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL);
}
public void postSnapshotCompletion() {
getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE);
}
}

View File

@ -11,6 +11,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import io.debezium.connector.SnapshotRecord;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
@ -46,29 +47,9 @@ interface Loader<O extends OffsetContext> {
boolean isSnapshotRunning();
/**
* mark current record as a regular snapshot record that is not last in a table or collection
* Mark the position of the record in the snapshot.
*/
void markSnapshotRecord();
/**
* mark current record as the last one in the table or collection
*/
void markLastRecordInDataCollection();
/**
* mark current record as the first one in the snapshot
*/
void markFirstSnapshotRecord();
/**
* mark current record as the first one in the table or collection
*/
void markFirstRecordInDataCollection();
/**
* mark current record as the last one in the snapshot
*/
void markLastSnapshotRecord();
void markSnapshotRecord(SnapshotRecord record);
/**
* Signals that a snapshot will begin, which should reflect in an updated offset state.

View File

@ -25,9 +25,9 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.jdbc.CancellableResultSet;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.CommonOffsetContext;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
@ -53,7 +53,7 @@
*
* @author Gunnar Morling
*/
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends CommonOffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
@ -402,29 +402,29 @@ else if (snapshotContext.lastTable) {
}
private void setSnapshotMarker(RelationalSnapshotContext<P, O> snapshotContext) {
if (snapshotContext.lastRecordInTable && snapshotContext.lastTable) {
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST); // Absolute last record
return;
}
if (snapshotContext.firstRecordInTable && snapshotContext.firstTable) {
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.FIRST); // Absolute first record
return;
}
if (snapshotContext.lastRecordInTable) {
if (snapshotContext.lastTable) {
lastSnapshotRecord(snapshotContext);
}
else {
snapshotContext.offset.markLastRecordInDataCollection();
}
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST_IN_DATA_COLLECTION);
}
else if (snapshotContext.firstRecordInTable) {
if (snapshotContext.firstTable) {
snapshotContext.offset.markFirstSnapshotRecord();
}
else {
snapshotContext.offset.markFirstRecordInDataCollection();
}
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
}
else {
snapshotContext.offset.markSnapshotRecord();
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.TRUE);
}
}
protected void lastSnapshotRecord(RelationalSnapshotContext<P, O> snapshotContext) {
snapshotContext.offset.markLastSnapshotRecord();
snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
}
/**

View File

@ -140,3 +140,4 @@ Himanshu-LT,Himanshu Mishra
Chrisss93,Chris Lee
connorszczepaniak-wk,Connor Szczepaniak
ajunwalker,Andrew Walker
alwaysbemark,Mark Bereznitsky