DBZ-6522 Reorder assert statements

This commit is contained in:
ani-sha 2024-06-13 12:55:54 +05:30 committed by Jakub Cechacek
parent 774edcacfa
commit b26aec4e9b

View File

@ -34,7 +34,6 @@
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
@ -2723,7 +2722,7 @@ public void shouldConsumeDocumentsWithComplexIds() throws Exception {
insertDocuments("dbit", "colA", doc);
SourceRecords records = consumeRecordsByTopic(1);
AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
stopConnector();
@ -2734,7 +2733,7 @@ public void shouldConsumeDocumentsWithComplexIds() throws Exception {
insertDocuments("dbit", "colA", doc1);
records = consumeRecordsByTopic(1);
AssertionsForInterfaceTypes.assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
stopConnector();
}
@ -2745,8 +2744,10 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte
LogInterceptor logInterceptor = new LogInterceptor(MongoDbOffsetContext.class);
config = TestHelper.getConfiguration(mongo).edit()
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.ALWAYS)
.build();
Map<Map<String, ?>, Map<String, ?>> offset = Map.of(
@ -2766,18 +2767,18 @@ public void shouldConsumeEventsFromOffsetWithDataResumeToken() throws Interrupte
// Before starting the connector, add data to the databases ...
insertDocuments("dbit", "colA", new Document("_id", 1).append("name", "John"));
insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane"));
// Start the connector ...
start(MongoDbConnector.class, config);
waitForStreamingRunning("mongodb", "mongo");
insertDocuments("dbit", "colA", new Document("_id", 24734982398L).append("name", "Jane"));
// Consume the records ...
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic("mongo.dbit.colA")).hasSize(1);
SourceRecords records = consumeRecordsByTopic(2);
assertThat(logInterceptor.containsMessage("Old resume token format detected, attempting to parse as string 826666EDD6000000032B0229296E04")).isTrue();
assertThat(records.recordsForTopic("mongo.dbit.colA").size()).isEqualTo(2);
assertNoRecordsToConsume();
// Stop the connector ...
stopConnector();