diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index 49f60cf06..1efd5f070 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.assertj.core.api.AssertionsForInterfaceTypes; import org.bson.BsonDocument; import org.bson.Document; import org.bson.conversions.Bson; @@ -2723,7 +2722,7 @@ public void shouldConsumeDocumentsWithComplexIds() throws Exception { insertDocuments("dbit", "colA", doc); SourceRecords records = consumeRecordsByTopic(1); - AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1); + assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1); stopConnector(); @@ -2734,7 +2733,7 @@ public void shouldConsumeDocumentsWithComplexIds() throws Exception { insertDocuments("dbit", "colA", doc1); records = consumeRecordsByTopic(1); - AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1); + assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1); stopConnector(); } @@ -2745,8 +2744,10 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte LogInterceptor logInterceptor = new LogInterceptor(MongoDbOffsetContext.class); config = TestHelper.getConfiguration(mongo).edit() + .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(CommonConnectorConfig.TOPIC_PREFIX, "mongo") + .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.ALWAYS) .build(); Map, Map> offset = Map.of( @@ -2766,18 +2767,18 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte // Before starting the connector, add data to the databases ... insertDocuments("dbit", "colA", new Document("_id", 1).append("name", "John")); + insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane")); // Start the connector ... start(MongoDbConnector.class, config); - waitForStreamingRunning("mongodb", "mongo"); - insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane")); - // Consume the records ... - SourceRecords records = consumeRecordsByTopic(1); - assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1); + SourceRecords records = consumeRecordsByTopic(2); assertThat(logInterceptor.containsMessage("Old resume token format detected, attempting to parse as string 826666EDD6000000032B0229296E04")).isTrue(); + assertThat(records.recordsForTopic("mongo.dbit.colA").size()).isEqualTo(2); + + assertNoRecordsToConsume(); // Stop the connector ... stopConnector();