From 2d72dae649a948d4ce53eec063962b951e0d6010 Mon Sep 17 00:00:00 2001 From: Mark Bereznitsky Date: Sat, 28 May 2022 15:24:02 +1000 Subject: [PATCH] DBZ-5136 further refining of the algorithm for determining position in snapshot and IT test fixes --- .../mongodb/MongoDbOffsetContext.java | 14 +-- .../MongoDbSnapshotChangeEventSource.java | 5 +- .../mongodb/ReplicaSetOffsetContext.java | 33 +----- .../connector/mysql/MySqlOffsetContext.java | 23 ++-- .../connector/mysql/SnapshotSourceIT.java | 7 +- .../connector/oracle/OracleOffsetContext.java | 25 ++-- .../postgresql/PostgresOffsetContext.java | 23 ++-- .../AbstractRecordsProducerTest.java | 36 ++++-- .../postgresql/PublicGeometryIT.java | 3 +- .../postgresql/RecordsSnapshotProducerIT.java | 107 ++++++++++-------- .../postgresql/RecordsStreamProducerIT.java | 5 +- .../sqlserver/SqlServerOffsetContext.java | 24 ++-- .../connector/sqlserver/SnapshotIT.java | 3 +- .../pipeline/CommonOffsetContext.java | 43 ------- .../debezium/pipeline/spi/OffsetContext.java | 25 +--- .../RelationalSnapshotChangeEventSource.java | 32 +++--- jenkins-jobs/scripts/config/Aliases.txt | 3 +- 17 files changed, 193 insertions(+), 218 deletions(-) delete mode 100644 debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java index d120e5fee..8eaad16e1 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbOffsetContext.java @@ -16,9 +16,8 @@ import org.bson.BsonDocument; import io.debezium.connector.SnapshotRecord; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.schema.DataCollectionId; @@ -27,7 +26,7 @@ * * @author Chris Cranford */ -public class MongoDbOffsetContext extends CommonOffsetContext { +public class MongoDbOffsetContext implements OffsetContext { private final SourceInfo sourceInfo; private final TransactionContext transactionContext; @@ -72,10 +71,6 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } - public BaseSourceInfo getSourceInfoObject() { - return sourceInfo; - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && sourceInfo.isSnapshotRunning(); @@ -95,6 +90,11 @@ public void postSnapshotCompletion() { sourceInfo.setSnapshot(SnapshotRecord.FALSE); } + @Override + public void markSnapshotRecord(SnapshotRecord record) { + sourceInfo.setSnapshot(record); + } + @Override public TransactionContext getTransactionContext() { return transactionContext; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index d994c682b..a3c8a3b99 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -31,6 +31,7 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; +import io.debezium.connector.SnapshotRecord; import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; @@ -464,7 +465,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex snapshotContext.lastRecordInCollection = !cursor.hasNext(); if (snapshotContext.lastCollection && snapshotContext.lastRecordInCollection) { - snapshotContext.offset.markLastSnapshotRecord(); + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST); } dispatcher.dispatchSnapshotEvent(snapshotContext.partition, collectionId, @@ -474,7 +475,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex } else if (snapshotContext.lastCollection) { // if the last collection does not contain any records we still need to mark the last processed event as last one - snapshotContext.offset.markLastSnapshotRecord(); + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST); } LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", docs, collectionId, diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java index e34d6df3d..9447f4922 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetOffsetContext.java @@ -16,8 +16,7 @@ import com.mongodb.client.model.changestream.ChangeStreamDocument; import io.debezium.annotation.ThreadSafe; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.pipeline.CommonOffsetContext; +import io.debezium.connector.SnapshotRecord; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -33,7 +32,7 @@ * @author Chris Cranford */ @ThreadSafe -public class ReplicaSetOffsetContext extends CommonOffsetContext { +public class ReplicaSetOffsetContext implements OffsetContext { private final MongoDbOffsetContext offsetContext; private final String replicaSetName; @@ -66,38 +65,14 @@ public Struct getSourceInfo() { return offsetContext.getSourceInfo(); } - public BaseSourceInfo getSourceInfoObject() { - return offsetContext.getSourceInfoObject(); - } - @Override public boolean isSnapshotRunning() { return offsetContext.isSnapshotRunning(); } @Override - public void markSnapshotRecord() { - offsetContext.markSnapshotRecord(); - } - - @Override - public void markFirstRecordInDataCollection() { - offsetContext.markFirstRecordInDataCollection(); - } - - @Override - public void markFirstSnapshotRecord() { - offsetContext.markFirstSnapshotRecord(); - } - - @Override - public void markLastRecordInDataCollection() { - offsetContext.markLastRecordInDataCollection(); - } - - @Override - public void markLastSnapshotRecord() { - offsetContext.markLastSnapshotRecord(); + public void markSnapshotRecord(SnapshotRecord record) { + offsetContext.markSnapshotRecord(record); } @Override diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java index 0358f68d5..01f9b60e6 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlOffsetContext.java @@ -15,8 +15,6 @@ import org.apache.kafka.connect.errors.ConnectException; import io.debezium.connector.SnapshotRecord; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -24,7 +22,7 @@ import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class MySqlOffsetContext extends CommonOffsetContext { +public class MySqlOffsetContext implements OffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String EVENTS_TO_SKIP_OFFSET_KEY = "event"; @@ -116,10 +114,6 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } - public BaseSourceInfo getSourceInfoObject() { - return sourceInfo; - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot() && !snapshotCompleted; @@ -140,6 +134,11 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } + @Override + public void postSnapshotCompletion() { + sourceInfo.setSnapshot(SnapshotRecord.FALSE); + } + private void setTransactionId() { // use GTID if it is available if (sourceInfo.getCurrentGtid() != null) { @@ -222,6 +221,11 @@ private long longOffsetValue(Map values, String key) { } } + @Override + public void markSnapshotRecord(SnapshotRecord record) { + sourceInfo.setSnapshot(record); + } + @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -245,6 +249,11 @@ public TransactionContext getTransactionContext() { return transactionContext; } + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java index e21e1de8e..5ae1b0f19 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotSourceIT.java @@ -8,7 +8,6 @@ import static io.debezium.junit.EqualityCheck.LESS_THAN; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; import java.nio.file.Path; import java.sql.Connection; @@ -181,11 +180,7 @@ private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCa else if (Objects.equals(snapshotSourceField, "first_in_data_collection")) { assertThat(previousRecordTable).isNotEqualTo(currentRecordTable); } - else { - assertTrue(Objects.equals(snapshotSourceField, "true") || Objects.equals(snapshotSourceField, "last_in_data_collection")); - } - - if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) { + else if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) { assertThat(previousRecordTable).isNotEqualTo(currentRecordTable); } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java index 491445069..a1c14effc 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleOffsetContext.java @@ -16,14 +16,13 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.connector.SnapshotRecord; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.relational.TableId; import io.debezium.schema.DataCollectionId; -public class OracleOffsetContext extends CommonOffsetContext { +public class OracleOffsetContext implements OffsetContext { public static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; public static final String SNAPSHOT_PENDING_TRANSACTIONS_KEY = "snapshot_pending_tx"; @@ -205,10 +204,7 @@ public Schema getSourceInfoSchema() { return sourceInfoSchema; } - public BaseSourceInfo getSourceInfoObject() { - return sourceInfo; - } - + @Override public Struct getSourceInfo() { return sourceInfo.struct(); } @@ -277,6 +273,11 @@ public void preSnapshotCompletion() { snapshotCompleted = true; } + @Override + public void postSnapshotCompletion() { + sourceInfo.setSnapshot(SnapshotRecord.FALSE); + } + @Override public String toString() { StringBuilder sb = new StringBuilder("OracleOffsetContext [scn=").append(getScn()); @@ -293,6 +294,11 @@ public String toString() { return sb.toString(); } + @Override + public void markSnapshotRecord(SnapshotRecord record) { + sourceInfo.setSnapshot(record); + } + @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.tableEvent((TableId) tableId); @@ -314,6 +320,11 @@ public TransactionContext getTransactionContext() { return transactionContext; } + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 641c8b426..16394d9b0 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -17,11 +17,9 @@ import org.slf4j.LoggerFactory; import io.debezium.connector.SnapshotRecord; -import io.debezium.connector.common.BaseSourceInfo; import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext; import io.debezium.pipeline.spi.OffsetContext; @@ -31,7 +29,7 @@ import io.debezium.time.Conversions; import io.debezium.util.Clock; -public class PostgresOffsetContext extends CommonOffsetContext { +public class PostgresOffsetContext implements OffsetContext { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSnapshotChangeEventSource.class); public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc"; @@ -107,10 +105,6 @@ public Struct getSourceInfo() { return sourceInfo.struct(); } - public BaseSourceInfo getSourceInfoObject() { - return sourceInfo; - } - @Override public boolean isSnapshotRunning() { return sourceInfo.isSnapshot(); @@ -127,6 +121,11 @@ public void preSnapshotCompletion() { lastSnapshotRecord = true; } + @Override + public void postSnapshotCompletion() { + sourceInfo.setSnapshot(SnapshotRecord.FALSE); + } + public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; sourceInfo.update(lsn, commitTime, txId, xmin, tableId); @@ -259,6 +258,11 @@ public OffsetState asOffsetState() { sourceInfo.isSnapshot()); } + @Override + public void markSnapshotRecord(SnapshotRecord record) { + sourceInfo.setSnapshot(record); + } + @Override public void event(DataCollectionId tableId, Instant instant) { sourceInfo.update(instant, (TableId) tableId); @@ -269,6 +273,11 @@ public TransactionContext getTransactionContext() { return transactionContext; } + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 02b2e80df..e673c25d8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -1045,7 +1045,27 @@ protected void assertRecordSchemaAndValues(List expectedSch } } - protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean shouldBeSnapshot, boolean shouldBeLastSnapshotRecord) { + protected SnapshotRecord expectedSnapshotRecordFromPosition(int totalPosition, int totalCount, int topicPosition, int topicCount) { + if (totalPosition == totalCount) { + return SnapshotRecord.LAST; + } + + if (totalPosition == 1) { + return SnapshotRecord.FIRST; + } + + if (topicPosition == topicCount) { + return SnapshotRecord.LAST_IN_DATA_COLLECTION; + } + + if (topicPosition == 1) { + return SnapshotRecord.FIRST_IN_DATA_COLLECTION; + } + + return SnapshotRecord.TRUE; + } + + protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, SnapshotRecord expectedType) { Map offset = record.sourceOffset(); assertNotNull(offset.get(SourceInfo.TXID_KEY)); assertNotNull(offset.get(SourceInfo.TIMESTAMP_USEC_KEY)); @@ -1054,9 +1074,8 @@ protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean Object lastSnapshotRecord = offset.get(SourceInfo.LAST_SNAPSHOT_RECORD_KEY); - if (shouldBeSnapshot) { + if (expectedType != SnapshotRecord.FALSE) { assertTrue("Snapshot marker expected but not found", (Boolean) snapshot); - assertEquals("Last snapshot record marker mismatch", shouldBeLastSnapshotRecord, lastSnapshotRecord); } else { assertNull("Snapshot marker not expected, but found", snapshot); @@ -1066,16 +1085,11 @@ protected void assertRecordOffsetAndSnapshotSource(SourceRecord record, boolean if (envelope != null && Envelope.isEnvelopeSchema(envelope.schema())) { final Struct source = (Struct) envelope.get("source"); final SnapshotRecord sourceSnapshot = SnapshotRecord.fromSource(source); - if (shouldBeSnapshot) { - if (shouldBeLastSnapshotRecord) { - assertEquals("Expected snapshot last record", SnapshotRecord.LAST, sourceSnapshot); - } - else { - assertEquals("Expected snapshot intermediary record", SnapshotRecord.TRUE, sourceSnapshot); - } + if (sourceSnapshot != null) { + assertEquals("Expected snapshot of type, but found", expectedType, sourceSnapshot); } else { - assertNull("Source snapshot marker not expected, but found", sourceSnapshot); + assertEquals("Source snapshot marker not expected, but found", expectedType, SnapshotRecord.FALSE); } } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java index fd37bc7da..624fbcd46 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PublicGeometryIT.java @@ -21,6 +21,7 @@ import org.junit.rules.TestRule; import io.debezium.config.Configuration; +import io.debezium.connector.SnapshotRecord; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection; @@ -103,7 +104,7 @@ private void assertInsert(String statement, Integer pk, List> expectedValuesByTopicName = super.schemaAndValuesByTopicName(); - consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName)); - Testing.Print.enable(); - // check the offset information for each record - while (!consumer.isEmpty()) { - SourceRecord record = consumer.remove(); - assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty()); + AtomicInteger totalCount = new AtomicInteger(0); + consumer.process(record -> { + assertReadRecord(record, expectedValuesByTopicName); assertSourceInfo(record); - } + + SnapshotRecord expected = expectedSnapshotRecordFromPosition( + totalCount.incrementAndGet(), expectedValuesByTopicName.size(), + 1, 1); + assertRecordOffsetAndSnapshotSource(record, expected); + }); } public static class CustomDatatypeConverter implements CustomConverter { @@ -204,13 +208,13 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception { SourceRecord first = consumer.remove(); VerifyRecord.isValidInsert(first, PK_FIELD, 2); assertEquals(topicName("s1.a"), first.topic()); - assertRecordOffsetAndSnapshotSource(first, false, false); + assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.FALSE); assertSourceInfo(first, TestHelper.TEST_DATABASE, "s1", "a"); SourceRecord second = consumer.remove(); VerifyRecord.isValidInsert(second, PK_FIELD, 2); assertEquals(topicName("s2.a"), second.topic()); - assertRecordOffsetAndSnapshotSource(second, false, false); + assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE); assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a"); // now shut down the producers and insert some more records @@ -231,7 +235,10 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception { int counterVal = counter.getAndIncrement(); int expectedPk = (counterVal % 3) + 1; // each table has 3 entries keyed 1-3 VerifyRecord.isValidRead(record, PK_FIELD, expectedPk); - assertRecordOffsetAndSnapshotSource(record, true, counterVal == (expectedRecordsCount - 1)); + SnapshotRecord expectedType = (counterVal % 3) == 0 ? SnapshotRecord.FIRST_IN_DATA_COLLECTION + : (counterVal % 3) == 1 ? SnapshotRecord.TRUE + : (counterVal == expectedRecordsCount - 1) ? SnapshotRecord.LAST : SnapshotRecord.LAST_IN_DATA_COLLECTION; + assertRecordOffsetAndSnapshotSource(record, expectedType); assertSourceInfo(record); }); consumer.clear(); @@ -244,12 +251,12 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception { consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); first = consumer.remove(); VerifyRecord.isValidInsert(first, PK_FIELD, 4); - assertRecordOffsetAndSnapshotSource(first, false, false); + assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.FALSE); assertSourceInfo(first, TestHelper.TEST_DATABASE, "s1", "a"); second = consumer.remove(); VerifyRecord.isValidInsert(second, PK_FIELD, 4); - assertRecordOffsetAndSnapshotSource(second, false, false); + assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE); assertSourceInfo(second, TestHelper.TEST_DATABASE, "s2", "a"); } @@ -305,10 +312,10 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception { final SourceRecord first = consumer.remove(); VerifyRecord.isValidRead(first, PK_FIELD, 1); - assertRecordOffsetAndSnapshotSource(first, true, true); + assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.LAST); final SourceRecord second = consumer.remove(); assertThat(second.topic()).startsWith("__debezium-heartbeat"); - assertRecordOffsetAndSnapshotSource(second, false, false); + assertRecordOffsetAndSnapshotSource(second, SnapshotRecord.FALSE); } private void assertReadRecord(SourceRecord record, Map> expectedValuesByTopicName) { @@ -334,14 +341,16 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); Map> expectedValuesByTopicName = super.schemaAndValuesByTopicNameAdaptiveTimeMicroseconds(); - consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName)); - - // check the offset information for each record - while (!consumer.isEmpty()) { - SourceRecord record = consumer.remove(); - assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty()); + AtomicInteger totalCount = new AtomicInteger(0); + consumer.process(record -> { + assertReadRecord(record, expectedValuesByTopicName); assertSourceInfo(record); - } + + SnapshotRecord expected = expectedSnapshotRecordFromPosition( + totalCount.incrementAndGet(), expectedValuesByTopicName.size(), + 1, 1); + assertRecordOffsetAndSnapshotSource(record, expected); + }); } @Test @@ -362,14 +371,12 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); Map> expectedValuesByTopicName = super.schemaAndValuesByTopicNameStringEncodedDecimals(); - consumer.process(record -> assertReadRecord(record, expectedValuesByTopicName)); - - // check the offset information for each record - while (!consumer.isEmpty()) { - SourceRecord record = consumer.remove(); - assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty()); + consumer.process(record -> { + assertReadRecord(record, expectedValuesByTopicName); assertSourceInfo(record); - } + + assertRecordOffsetAndSnapshotSource(record, SnapshotRecord.LAST_IN_DATA_COLLECTION); + }); } @Test @@ -407,42 +414,44 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception { // then start the producer and validate all records are there buildNoStreamProducer(TestHelper.defaultConfig()); - TestConsumer consumer = testConsumer(1 + 30); - consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); - Set ids = new HashSet<>(); - Map topicCounts = Collect.hashMapOf( - "test_server.public.first_table", 0, + Map expectedTopicCounts = Collect.hashMapOf( + "test_server.public.first_table", 1, "test_server.public.partitioned", 0, - "test_server.public.partitioned_1_100", 0, - "test_server.public.partitioned_101_200", 0); + "test_server.public.partitioned_1_100", 10, + "test_server.public.partitioned_101_200", 20); + int expectedTotalCount = expectedTopicCounts.values().stream().mapToInt(Integer::intValue).sum(); + + TestConsumer consumer = testConsumer(expectedTotalCount); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + Map actualTopicCounts = new HashMap<>(); + AtomicInteger actualTotalCount = new AtomicInteger(0); consumer.process(record -> { + assertSourceInfo(record); Struct key = (Struct) record.key(); if (key != null) { final Integer id = key.getInt32("pk"); Assertions.assertThat(ids).excludes(id); ids.add(id); } - topicCounts.put(record.topic(), topicCounts.get(record.topic()) + 1); + + actualTopicCounts.put(record.topic(), actualTopicCounts.getOrDefault(record.topic(), 0) + 1); + + SnapshotRecord expected = expectedSnapshotRecordFromPosition( + actualTotalCount.incrementAndGet(), expectedTotalCount, + actualTopicCounts.get(record.topic()), expectedTopicCounts.get(record.topic())); + assertRecordOffsetAndSnapshotSource(record, expected); }); // verify distinct records - assertEquals(31, ids.size()); + assertEquals(expectedTotalCount, actualTotalCount.get()); + assertEquals(expectedTotalCount, ids.size()); // verify each topic contains exactly the number of input records - assertEquals(1, topicCounts.get("test_server.public.first_table").intValue()); - assertEquals(0, topicCounts.get("test_server.public.partitioned").intValue()); - assertEquals(10, topicCounts.get("test_server.public.partitioned_1_100").intValue()); - assertEquals(20, topicCounts.get("test_server.public.partitioned_101_200").intValue()); - - // check the offset information for each record - while (!consumer.isEmpty()) { - SourceRecord record = consumer.remove(); - assertRecordOffsetAndSnapshotSource(record, true, consumer.isEmpty()); - assertSourceInfo(record); - } + assertTrue("Expected counts per topic don't match", expectedTopicCounts.entrySet().containsAll(actualTopicCounts.entrySet())); } @Test diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 9f29346e0..4d92872a3 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -60,6 +60,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.Configuration; +import io.debezium.connector.SnapshotRecord; import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SchemaRefreshMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; @@ -2923,7 +2924,7 @@ private void assertInsert(String statement, Integer pk, List { private final SqlServerConnectorConfig connectorConfig; @@ -168,6 +168,11 @@ public String toString() { "]"; } + @Override + public void markSnapshotRecord(SnapshotRecord record) { + sourceInfo.setSnapshot(record); + } + @Override public void event(DataCollectionId tableId, Instant timestamp) { sourceInfo.setSourceTime(timestamp); @@ -179,6 +184,11 @@ public TransactionContext getTransactionContext() { return transactionContext; } + @Override + public void incrementalSnapshotEvents() { + sourceInfo.setSnapshot(SnapshotRecord.INCREMENTAL); + } + @Override public IncrementalSnapshotContext getIncrementalSnapshotContext() { return incrementalSnapshotContext; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index d39d15bd0..08a643ca9 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -155,7 +155,8 @@ public void takeSnapshotAndStartStreaming() throws Exception { // Ignore initial records final SourceRecords records = consumeRecordsByTopic(INITIAL_RECORDS_PER_TABLE); final List table1 = records.recordsForTopic("server1.dbo.table1"); - table1.subList(0, INITIAL_RECORDS_PER_TABLE - 1).forEach(record -> { + assertThat(((Struct) table1.get(0).value()).getStruct("source").getString("snapshot")).isEqualTo("first"); + table1.subList(1, INITIAL_RECORDS_PER_TABLE - 1).forEach(record -> { assertThat(((Struct) record.value()).getStruct("source").getString("snapshot")).isEqualTo("true"); }); assertThat(((Struct) table1.get(INITIAL_RECORDS_PER_TABLE - 1).value()).getStruct("source").getString("snapshot")).isEqualTo("last"); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java deleted file mode 100644 index ef8dc1875..000000000 --- a/debezium-core/src/main/java/io/debezium/pipeline/CommonOffsetContext.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.pipeline; - -import io.debezium.connector.SnapshotRecord; -import io.debezium.connector.common.BaseSourceInfo; -import io.debezium.pipeline.spi.OffsetContext; - -public abstract class CommonOffsetContext implements OffsetContext { - - public abstract BaseSourceInfo getSourceInfoObject(); - - public void markSnapshotRecord() { - getSourceInfoObject().setSnapshot(SnapshotRecord.TRUE); - } - - public void markFirstSnapshotRecord() { - getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST); - } - - public void markFirstRecordInDataCollection() { - getSourceInfoObject().setSnapshot(SnapshotRecord.FIRST_IN_DATA_COLLECTION); - } - - public void markLastSnapshotRecord() { - getSourceInfoObject().setSnapshot(SnapshotRecord.LAST); - } - - public void markLastRecordInDataCollection() { - getSourceInfoObject().setSnapshot(SnapshotRecord.LAST_IN_DATA_COLLECTION); - } - - public void incrementalSnapshotEvents() { - getSourceInfoObject().setSnapshot(SnapshotRecord.INCREMENTAL); - } - - public void postSnapshotCompletion() { - getSourceInfoObject().setSnapshot(SnapshotRecord.FALSE); - } -} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java index c86acedc7..e4c66da96 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/spi/OffsetContext.java @@ -11,6 +11,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; +import io.debezium.connector.SnapshotRecord; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.pipeline.txmetadata.TransactionContext; @@ -46,29 +47,9 @@ interface Loader { boolean isSnapshotRunning(); /** - * mark current record as a regular snapshot record that is not last in a table or collection + * Mark the position of the record in the snapshot. */ - void markSnapshotRecord(); - - /** - * mark current record as the last one in the table or collection - */ - void markLastRecordInDataCollection(); - - /** - * mark current record as the first one in the snapshot - */ - void markFirstSnapshotRecord(); - - /** - * mark current record as the first one in the table or collection - */ - void markFirstRecordInDataCollection(); - - /** - * mark current record as the last one in the snapshot - */ - void markLastSnapshotRecord(); + void markSnapshotRecord(SnapshotRecord record); /** * Signals that a snapshot will begin, which should reflect in an updated offset state. diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 11fb99c7d..f416b9937 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -25,9 +25,9 @@ import org.slf4j.LoggerFactory; import io.debezium.DebeziumException; +import io.debezium.connector.SnapshotRecord; import io.debezium.jdbc.CancellableResultSet; import io.debezium.jdbc.JdbcConnection; -import io.debezium.pipeline.CommonOffsetContext; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher.SnapshotReceiver; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; @@ -53,7 +53,7 @@ * * @author Gunnar Morling */ -public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { +public abstract class RelationalSnapshotChangeEventSource

extends AbstractSnapshotChangeEventSource { private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class); @@ -402,29 +402,29 @@ else if (snapshotContext.lastTable) { } private void setSnapshotMarker(RelationalSnapshotContext snapshotContext) { + if (snapshotContext.lastRecordInTable && snapshotContext.lastTable) { + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST); // Absolute last record + return; + } + + if (snapshotContext.firstRecordInTable && snapshotContext.firstTable) { + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.FIRST); // Absolute first record + return; + } + if (snapshotContext.lastRecordInTable) { - if (snapshotContext.lastTable) { - lastSnapshotRecord(snapshotContext); - } - else { - snapshotContext.offset.markLastRecordInDataCollection(); - } + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST_IN_DATA_COLLECTION); } else if (snapshotContext.firstRecordInTable) { - if (snapshotContext.firstTable) { - snapshotContext.offset.markFirstSnapshotRecord(); - } - else { - snapshotContext.offset.markFirstRecordInDataCollection(); - } + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.FIRST_IN_DATA_COLLECTION); } else { - snapshotContext.offset.markSnapshotRecord(); + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.TRUE); } } protected void lastSnapshotRecord(RelationalSnapshotContext snapshotContext) { - snapshotContext.offset.markLastSnapshotRecord(); + snapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST); } /** diff --git a/jenkins-jobs/scripts/config/Aliases.txt b/jenkins-jobs/scripts/config/Aliases.txt index 149f5e9cd..e5e03c288 100644 --- a/jenkins-jobs/scripts/config/Aliases.txt +++ b/jenkins-jobs/scripts/config/Aliases.txt @@ -139,4 +139,5 @@ troeselereos,Timo Roeseler Himanshu-LT,Himanshu Mishra Chrisss93,Chris Lee connorszczepaniak-wk,Connor Szczepaniak -ajunwalker,Andrew Walker \ No newline at end of file +ajunwalker,Andrew Walker +alwaysbemark,Mark Bereznitsky \ No newline at end of file