DBZ-7299 Test coverage for lookup and post-image full updates
This commit is contained in:
parent
4ea39b7d98
commit
b80b568e6c
@ -54,6 +54,7 @@
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Field;
|
||||
import io.debezium.connector.mongodb.MongoDbConnectorConfig.FiltersMatchMode;
|
||||
import io.debezium.connector.mongodb.MongoDbConnectorConfig.FullUpdateType;
|
||||
import io.debezium.converters.CloudEventsConverterTest;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.Envelope.Operation;
|
||||
@ -465,6 +466,109 @@ public void shouldConsumePreImage() throws InterruptedException {
|
||||
assertThat(deleteId).isEqualTo(id.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
|
||||
public void shouldConsumeFullUpdateWithPostImage() throws InterruptedException {
|
||||
shouldConsumeFullUpdate(FullUpdateType.POST_IMAGE, "updated", "final");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldConsumeFullUpdateWithLookup() throws InterruptedException {
|
||||
shouldConsumeFullUpdate(FullUpdateType.LOOKUP, "updated", "final");
|
||||
}
|
||||
|
||||
public void shouldConsumeFullUpdate(FullUpdateType fullUpdateType, String firstUpdate, String secondUpdate) throws InterruptedException {
|
||||
config = TestHelper.getConfiguration(mongo).edit()
|
||||
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_UPDATE_FULL)
|
||||
.with(MongoDbConnectorConfig.CAPTURE_MODE_FULL_UPDATE_TYPE, fullUpdateType)
|
||||
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
|
||||
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
|
||||
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
|
||||
.build();
|
||||
|
||||
// Set up the replication context for connections ...
|
||||
context = new MongoDbTaskContext(config);
|
||||
|
||||
// Cleanup database
|
||||
TestHelper.cleanDatabase(mongo, "dbit");
|
||||
|
||||
// Create database
|
||||
String collName = "images";
|
||||
try (var client = connect()) {
|
||||
MongoDatabase db1 = client.getDatabase("dbit");
|
||||
CreateCollectionOptions options = new CreateCollectionOptions();
|
||||
if (fullUpdateType.isPostImage()) {
|
||||
options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
|
||||
}
|
||||
db1.createCollection(collName, options);
|
||||
}
|
||||
|
||||
var doc = new Document("_id", "0").append("val", "init");
|
||||
insertDocuments("dbit", collName, doc);
|
||||
|
||||
// Start the connector ...
|
||||
start(MongoDbConnector.class, config);
|
||||
|
||||
// Consume initial event
|
||||
SourceRecords records = consumeRecordsByTopic(1);
|
||||
records.topics().forEach(System.out::println);
|
||||
assertThat(records.recordsForTopic("mongo.dbit.images").size()).isEqualTo(1);
|
||||
assertThat(records.topics().size()).isEqualTo(1);
|
||||
AtomicBoolean foundLast = new AtomicBoolean(false);
|
||||
records.forEach(record -> {
|
||||
// Check that all records are valid, and can be serialized and deserialized ...
|
||||
validate(record);
|
||||
verifyFromInitialSnapshot(record, foundLast);
|
||||
verifyReadOperation(record);
|
||||
});
|
||||
assertThat(foundLast.get()).isTrue();
|
||||
|
||||
// stop the connector
|
||||
stopConnector();
|
||||
Testing.Debug.enable();
|
||||
|
||||
// update the document twice
|
||||
var selector = new Document("_id", "0");
|
||||
var update1 = new Document("$set", new Document("val", firstUpdate));
|
||||
var update2 = new Document("$set", new Document("val", secondUpdate));
|
||||
updateDocument("dbit", collName, selector, update1);
|
||||
updateDocument("dbit", collName, selector, update2);
|
||||
|
||||
// start the connector again
|
||||
start(MongoDbConnector.class, config);
|
||||
|
||||
// Wait until we can consume the 2 updates
|
||||
SourceRecords updateRecords = consumeRecordsByTopic(2);
|
||||
assertThat(updateRecords.recordsForTopic("mongo.dbit.images").size()).isEqualTo(2);
|
||||
assertThat(updateRecords.topics().size()).isEqualTo(1);
|
||||
updateRecords.forEach(record -> {
|
||||
// Check that all records are valid, and can be serialized and deserialized ...
|
||||
validate(record);
|
||||
verifyNotFromInitialSnapshot(record);
|
||||
});
|
||||
|
||||
// Verify after values
|
||||
SourceRecord updateRecord1 = updateRecords.allRecordsInOrder().get(0);
|
||||
verifyUpdateOperation(updateRecord1);
|
||||
SourceRecord updateRecord2 = updateRecords.allRecordsInOrder().get(1);
|
||||
verifyUpdateOperation(updateRecord2);
|
||||
|
||||
Testing.debug("Update event 1: " + updateRecord1);
|
||||
Testing.debug("Update event 2: " + updateRecord2);
|
||||
|
||||
Struct updateValue1 = (Struct) updateRecord1.value();
|
||||
Struct updateValue2 = (Struct) updateRecord2.value();
|
||||
|
||||
if (fullUpdateType.isPostImage()) {
|
||||
assertThat(updateValue1.getString("after")).contains(firstUpdate);
|
||||
assertThat(updateValue2.getString("after")).contains(secondUpdate);
|
||||
}
|
||||
else {
|
||||
assertThat(updateValue1.getString("after")).contains(secondUpdate);
|
||||
assertThat(updateValue2.getString("after")).contains(secondUpdate);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
|
||||
public void shouldConsumeLargeEvents() throws InterruptedException {
|
||||
|
Loading…
Reference in New Issue
Block a user