diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index db7a27f6a..f81b05b76 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -638,6 +638,46 @@ public void shouldConsumeLargeEvents() throws InterruptedException { } + @Test + public void shouldLogInvalidOffsetWithSnapshotModeInitial() throws InterruptedException { + final LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class); + + // Configuration to capture changes from both databases + config = TestHelper.getConfiguration(mongo).edit() + .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") + .with(CommonConnectorConfig.TOPIC_PREFIX, "mongo") + .build(); + + // Create invalid offset captured from some previous run + // > unless time travel is involved it should be effectively globally unique + Map, Map> offset = Map.of( + Collect.hashMapOf("server_id", "mongo"), + Collect.hashMapOf( + SourceInfo.TIMESTAMP, 0, + SourceInfo.ORDER, -1, + SourceInfo.RESUME_TOKEN, "826617AF13000000012B022C0100296E5A1004DF2AABE77A714014A9E60EFF70AFF1DC46645F696400646617AF13E337477353A2CE830004")); + storeOffsets(config, offset); + + // Set up the replication context for connections ... + context = new MongoDbTaskContext(config); + + // Cleanup databases + TestHelper.cleanDatabase(mongo, "dbit"); + + // Before starting the connector, add data to the databases ... + insertDocuments("dbit", "colA", new Document("value", "foo")); + insertDocuments("dbit", "colB", new Document("value", "bar")); + + // --------------------------------------------------------------------------------------------------------------- + // Start the connector + // --------------------------------------------------------------------------------------------------------------- + start(MongoDbConnector.class, config); + + // Snapshot not performed due to SnapshotMode.INITIAL, warning in the logs + logInterceptor.containsMessage("Last recorded offset is no longer available on the server"); + } + @Test public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException { // Use the DB configuration to define the connector's configuration ...