DBZ-5628 Add mongo 6.0 pre-image IT test

This commit is contained in:
Bin Huang 2022-10-05 17:27:53 -07:00 committed by Jiri Pechanec
parent c588d831ee
commit 92b3035457
6 changed files with 168 additions and 12 deletions

View File

@ -6,12 +6,8 @@
package io.debezium.connector.mongodb;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;

View File

@ -6,6 +6,7 @@
package io.debezium.connector.mongodb;
import static io.debezium.connector.mongodb.JsonSerialization.COMPACT_JSON_SETTINGS;
import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
@ -26,6 +27,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.mongodb.client.model.ChangeStreamPreAndPostImagesOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
@ -55,6 +58,7 @@
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Collect;
@ -68,7 +72,7 @@
public class MongoDbConnectorIT extends AbstractMongoConnectorIT {
/**
* Verifies that the connector doesn't run with an invalid configuration. This does not actually connect to the MySQL server.
* Verifies that the connector doesn't run with an invalid configuration. This does not actually connect to the Mongo server.
*/
@Test
public void shouldNotStartWithInvalidConfiguration() {
@ -237,6 +241,166 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MongoDbConnectorConfig.CAPTURE_MODE);
}
@Test
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 6, reason = "Pre-image support in Change Stream is officially released in Mongo 6.0.")
public void shouldConsumePreImage() throws InterruptedException {
config = TestHelper.getConfiguration().edit()
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS_UPDATE_FULL_WITH_PRE_IMAGE)
.with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.build();
// Set up the replication context for connections ...
context = new MongoDbTaskContext(config);
// Cleanup database
TestHelper.cleanDatabase(primary(), "dbit");
// Before starting the connector, add data to the databases ...
storeDocuments("dbit", "simpletons", "simple_objects.json");
storeDocuments("dbit", "restaurants", "restaurants1.json");
// Start the connector ...
start(MongoDbConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(12);
records.topics().forEach(System.out::println);
assertThat(records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
assertThat(records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
assertThat(records.topics().size()).isEqualTo(2);
AtomicBoolean foundLast = new AtomicBoolean(false);
records.forEach(record -> {
// Check that all records are valid, and can be serialized and deserialized ...
validate(record);
verifyFromInitialSync(record, foundLast);
verifyReadOperation(record);
});
assertThat(foundLast.get()).isTrue();
AtomicReference<String> id = new AtomicReference<>();
String collName = "preimage";
primary().execute("create collection with pre-image recording enabled", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
CreateCollectionOptions options = new CreateCollectionOptions();
options.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true));
db1.createCollection(collName, options);
});
Testing.Debug.enable();
primary().execute("create", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection(collName);
// Insert the document with a generated ID ...
Document doc = Document.parse("{\"a\": 1, \"b\": 2}");
InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true);
coll.insertOne(doc, insertOptions);
// Find the document to get the generated ID ...
doc = coll.find().first();
Testing.debug("Document: " + doc);
id.set(doc.getObjectId("_id").toString());
Testing.debug("Document ID: " + id.get());
});
primary().execute("update", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection(collName);
// Find the document ...
Document doc = coll.find().first();
Testing.debug("Document: " + doc);
Document filter = Document.parse("{\"a\": 1}");
Document operation = Document.parse("{ \"$set\": { \"b\": 10 } }");
coll.updateOne(filter, operation);
doc = coll.find().first();
Testing.debug("Document: " + doc);
});
primary().execute("replace", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection(collName);
// Find the document ...
Document doc = coll.find().first();
Testing.debug("Document: " + doc);
Document filter = Document.parse("{\"a\": 1}");
Document operation = Document.parse("{\"a\": 1, \"b\": 50}");
coll.replaceOne(filter, operation);
doc = coll.find().first();
Testing.debug("Document: " + doc);
});
// Wait until we can consume the 1 insert and 1 update and 1 replace...
SourceRecords insertAndUpdateAndReplace = consumeRecordsByTopic(3);
assertThat(insertAndUpdateAndReplace.recordsForTopic("mongo.dbit." + collName).size()).isEqualTo(3);
assertThat(insertAndUpdateAndReplace.topics().size()).isEqualTo(1);
insertAndUpdateAndReplace.forEach(record -> {
// Check that all records are valid, and can be serialized and deserialized ...
validate(record);
verifyNotFromInitialSync(record);
});
SourceRecord insertRecord = insertAndUpdateAndReplace.allRecordsInOrder().get(0);
verifyCreateOperation(insertRecord);
SourceRecord updateRecord = insertAndUpdateAndReplace.allRecordsInOrder().get(1);
verifyUpdateOperation(updateRecord);
SourceRecord replaceRecord = insertAndUpdateAndReplace.allRecordsInOrder().get(2);
verifyUpdateOperation(replaceRecord);
Testing.debug("Insert event: " + insertRecord);
Testing.debug("Update event: " + updateRecord);
Testing.debug("Replace event: " + replaceRecord);
Struct insertValue = (Struct) insertRecord.value();
Struct updateValue = (Struct) updateRecord.value();
Struct replaceValue = (Struct) replaceRecord.value();
assertThat(insertValue.getString("before")).isNull();
assertThat(insertValue.getString("after")).isEqualTo(updateValue.getString("before"));
assertThat(updateValue.getString("after")).isEqualTo(replaceValue.getString("before"));
// ---------------------------------------------------------------------------------------------------------------
// Delete a document
// ---------------------------------------------------------------------------------------------------------------
primary().execute("delete", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection(collName);
Document filter = Document.parse("{\"a\": 1}");
coll.deleteOne(filter);
});
// Wait until we can consume the 1 delete ...
SourceRecords delete = consumeRecordsByTopic(2);
assertThat(delete.recordsForTopic("mongo.dbit." + collName).size()).isEqualTo(2);
assertThat(delete.topics().size()).isEqualTo(1);
SourceRecord deleteRecord = delete.allRecordsInOrder().get(0);
validate(deleteRecord);
verifyNotFromInitialSync(deleteRecord);
verifyDeleteOperation(deleteRecord);
Struct deleteValue = (Struct) deleteRecord.value();
assertThat(replaceValue.getString("after")).isEqualTo(deleteValue.getString("before"));
SourceRecord tombStoneRecord = delete.allRecordsInOrder().get(1);
validate(tombStoneRecord);
Testing.debug("Delete event: " + deleteRecord);
Testing.debug("Tombstone event: " + tombStoneRecord);
Struct deleteKey = (Struct) deleteRecord.key();
String deleteId = toObjectId(deleteKey.getString("id")).toString();
assertThat(deleteId).isEqualTo(id.get());
}
@Test
public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException {
// Use the DB configuration to define the connector's configuration ...
@ -397,7 +561,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
// ---------------------------------------------------------------------------------------------------------------
// Delete a document
// ---------------------------------------------------------------------------------------------------------------
// -------------------------------------------------------------------------------------------------------------
primary().execute("delete", mongo -> {
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection("arbitrary");

View File

@ -14,9 +14,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

View File

@ -1,2 +0,0 @@
package io.debezium.connector.mongodb;public class PreImageIT {
}

View File

@ -15,7 +15,7 @@
/**
* Implementation of {@link DatabaseVersionResolver} specific for MySQL.
*
* @author Chris Cranford
* @author Xinbin Huang
*/
public class MongoDbDatabaseVersionResolver implements DatabaseVersionResolver {

View File

@ -129,7 +129,7 @@
<!-- Databases, should align with database drivers -->
<version.mysql.server>5.7</version.mysql.server>
<version.mongo.server>4.0</version.mongo.server>
<version.mongo.server>6.0</version.mongo.server>
<version.cassandra3>3.11.12</version.cassandra3>
<version.cassandra4>4.0.2</version.cassandra4>