DBZ-7770 Added test verifying invalid log position is properly logged for MongoDB

This commit is contained in:
Jakub Cechacek 2024-04-11 12:47:38 +02:00 committed by Jiri Pechanec
parent 9242d45240
commit 4061a02f8b

View File

@ -638,6 +638,46 @@ public void shouldConsumeLargeEvents() throws InterruptedException {
}
@Test
public void shouldLogInvalidOffsetWithSnapshotModeInitial() throws InterruptedException {
final LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
// Configuration to capture changes from both databases
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
// Create invalid offset captured from some previous run
// > unless time travel is involved it should be effectively globally unique
Map<Map<String, ?>, Map<String, ?>> offset = Map.of(
Collect.hashMapOf("server_id", "mongo"),
Collect.hashMapOf(
SourceInfo.TIMESTAMP, 0,
SourceInfo.ORDER, -1,
SourceInfo.RESUME_TOKEN, "826617AF13000000012B022C0100296E5A1004DF2AABE77A714014A9E60EFF70AFF1DC46645F696400646617AF13E337477353A2CE830004"));
storeOffsets(config, offset);
// Set up the replication context for connections ...
context = new MongoDbTaskContext(config);
// Cleanup databases
TestHelper.cleanDatabase(mongo, "dbit");
// Before starting the connector, add data to the databases ...
insertDocuments("dbit", "colA", new Document("value", "foo"));
insertDocuments("dbit", "colB", new Document("value", "bar"));
// ---------------------------------------------------------------------------------------------------------------
// Start the connector
// ---------------------------------------------------------------------------------------------------------------
start(MongoDbConnector.class, config);
// Snapshot not performed due to SnapshotMode.INITIAL, warning in the logs
logInterceptor.containsMessage("Last recorded offset is no longer available on the server");
}
@Test
public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException {
// Use the DB configuration to define the connector's configuration ...