DBZ-6522 Obtain offset _data string using mongo router to validate tests

This commit is contained in:
ani-sha 2024-06-19 11:05:01 +05:30 committed by Jakub Cechacek
parent b26aec4e9b
commit 5f6f40d591

View File

@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -2747,16 +2748,34 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.ALWAYS)
.build();
MongoDbConnectorConfig connectorConfig = new MongoDbConnectorConfig(config);
// Create a change stream pipeline
var pipelineFactory = new ChangeStreamPipelineFactory(connectorConfig, new Filters.FilterConfig(config));
ChangeStreamPipeline pipeline = pipelineFactory.create();
var stages = pipeline.getStages();
String resumeToken;
// Insert a document using router and obtain resume token
try (var router = connect()) {
var routerStream = router.watch(stages, BsonDocument.class);
try (var rc = routerStream.cursor()) {
insertDocuments("dbit", "colA", new Document("_id", 1).append("name", "John"));
rc.next();
resumeToken = Objects.requireNonNull(rc.getResumeToken()).get("_data").asString().getValue();
}
}
Map<Map<String, ?>, Map<String, ?>> offset = Map.of(
Collect.hashMapOf("server_id", "mongo"),
Collect.hashMapOf(
SourceInfo.TIMESTAMP, 0,
SourceInfo.ORDER, -1,
SourceInfo.RESUME_TOKEN,
"826666EDD6000000032B0229296E04"));
resumeToken));
storeOffsets(config, offset);
// Set up the replication context for connections ...
@ -2766,7 +2785,6 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte
TestHelper.cleanDatabase(mongo, "dbit");
// Before starting the connector, add data to the databases ...
insertDocuments("dbit", "colA", new Document("_id", 1).append("name", "John"));
insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane"));
// Start the connector ...
@ -2774,23 +2792,11 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte
waitForStreamingRunning("mongodb", "mongo");
// Consume the records ...
SourceRecords records = consumeRecordsByTopic(2);
assertThat(logInterceptor.containsMessage("Old resume token format detected, attempting to parse as string 826666EDD6000000032B0229296E04")).isTrue();
assertThat(records.recordsForTopic("mongo.dbit.colA").size()).isEqualTo(2);
SourceRecords records = consumeRecordsByTopic(1);
assertThat(logInterceptor.containsMessage("Old resume token format detected, attempting to parse as string " + resumeToken)).isTrue();
assertThat(records.recordsForTopic("mongo.dbit.colA").size()).isEqualTo(1);
assertNoRecordsToConsume();
// Stop the connector ...
stopConnector();
// Restart the connector ...
start(MongoDbConnector.class, config);
insertDocuments("dbit", "colA", new Document("_id", 24734982399L).append("name", "Jack"));
// Consume the records ...
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
}
@Test