From b61319feb771855c52d0a4ebb9ba52dfd6dc7951 Mon Sep 17 00:00:00 2001 From: jcechace Date: Wed, 15 Jun 2022 11:58:09 +0200 Subject: [PATCH] DBZ-4733 Basic integration test for connection string support --- .../mongodb/ReplicaSetDiscovery.java | 2 +- ...ongoDbConnectorWithConnectionStringIT.java | 383 ++++++++++++++++++ 2 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorWithConnectionStringIT.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java index 93b9afa7d..e7e2edf91 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ReplicaSetDiscovery.java @@ -18,8 +18,8 @@ import com.mongodb.ServerAddress; import com.mongodb.client.MongoClient; import com.mongodb.connection.ClusterDescription; -import com.mongodb.connection.ServerConnectionState; import com.mongodb.connection.ClusterType; +import com.mongodb.connection.ServerConnectionState; import com.mongodb.connection.ServerDescription; import io.debezium.annotation.ThreadSafe; diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorWithConnectionStringIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorWithConnectionStringIT.java new file mode 100644 index 000000000..dc39ba4c4 --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorWithConnectionStringIT.java @@ -0,0 +1,383 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import static org.fest.assertions.Assertions.assertThat; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.Assume; +import org.junit.Test; + +import com.mongodb.client.ClientSession; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertOneOptions; + +import io.debezium.config.Configuration; +import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; +import io.debezium.data.Envelope; +import io.debezium.data.Envelope.Operation; +import io.debezium.util.IoUtil; +import io.debezium.util.Testing; + +/** + * @author Randall Hauch + * + */ +public class MongoDbConnectorWithConnectionStringIT extends AbstractMongoConnectorIT { + + @Test + public void shouldConsumeAllEventsFromSingleReplicaWithMongoProtocol() throws InterruptedException { + shouldConsumeAllEventsFromDatabase("mongodb://localhost:27017/", false); + } + + @Test + public void shouldConsumeAllEventsFromSingleReplicaWithMongoSrvProtocol() throws InterruptedException { + var connectionString = System.getProperty("mongodb.connection.string"); + Assume.assumeThat(connectionString, notNullValue()); + shouldConsumeAllEventsFromDatabase(connectionString, true); + } + + public void shouldConsumeAllEventsFromDatabase(String connectionString, boolean ssl) throws InterruptedException { + // Use the DB configuration to define the connector's configuration ... + var properties = TestHelper.getConfiguration().asProperties(); + properties.remove(MongoDbConnectorConfig.HOSTS.name()); + + config = Configuration.from(properties).edit() + .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") + .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") + .with(MongoDbConnectorConfig.CONNECTION_STRING, connectionString) + .with(MongoDbConnectorConfig.SSL_ENABLED, ssl) + .with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true) + .build(); + + // Set up the replication context for connections ... + context = new MongoDbTaskContext(config); + + // Cleanup database + TestHelper.cleanDatabase(primary(), "dbit"); + + // Before starting the connector, add data to the databases ... + storeDocuments("dbit", "simpletons", "simple_objects.json"); + storeDocuments("dbit", "restaurants", "restaurants1.json"); + + // Start the connector ... + start(MongoDbConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + SourceRecords records = consumeRecordsByTopic(12); + records.topics().forEach(System.out::println); + assertThat(records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6); + assertThat(records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6); + assertThat(records.topics().size()).isEqualTo(2); + AtomicBoolean foundLast = new AtomicBoolean(false); + records.forEach(record -> { + // Check that all records are valid, and can be serialized and deserialized ... + validate(record); + verifyFromInitialSync(record, foundLast); + verifyReadOperation(record); + }); + assertThat(foundLast.get()).isTrue(); + + // At this point, the connector has performed the initial sync and awaits changes ... + + // --------------------------------------------------------------------------------------------------------------- + // Store more documents while the connector is still running + // --------------------------------------------------------------------------------------------------------------- + storeDocuments("dbit", "restaurants", "restaurants2.json"); + + // Wait until we can consume the 4 documents we just added ... + SourceRecords records2 = consumeRecordsByTopic(4); + assertThat(records2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4); + assertThat(records2.topics().size()).isEqualTo(1); + records2.forEach(record -> { + // Check that all records are valid, and can be serialized and deserialized ... + validate(record); + verifyNotFromInitialSync(record); + verifyCreateOperation(record); + }); + + // --------------------------------------------------------------------------------------------------------------- + // Stop the connector + // --------------------------------------------------------------------------------------------------------------- + stopConnector(); + + // --------------------------------------------------------------------------------------------------------------- + // Store more documents while the connector is NOT running + // --------------------------------------------------------------------------------------------------------------- + storeDocuments("dbit", "restaurants", "restaurants3.json"); + + // --------------------------------------------------------------------------------------------------------------- + // Start the connector and we should only see the documents added since it was stopped + // --------------------------------------------------------------------------------------------------------------- + start(MongoDbConnector.class, config); + + // Wait until we can consume the 4 documents we just added ... + SourceRecords records3 = consumeRecordsByTopic(5); + assertThat(records3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5); + assertThat(records3.topics().size()).isEqualTo(1); + records3.forEach(record -> { + // Check that all records are valid, and can be serialized and deserialized ... + validate(record); + verifyNotFromInitialSync(record); + verifyCreateOperation(record); + }); + + // --------------------------------------------------------------------------------------------------------------- + // Store more documents while the connector is still running + // --------------------------------------------------------------------------------------------------------------- + storeDocuments("dbit", "restaurants", "restaurants4.json"); + + // Wait until we can consume the 4 documents we just added ... + SourceRecords records4 = consumeRecordsByTopic(8); + assertThat(records4.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(8); + assertThat(records4.topics().size()).isEqualTo(1); + records4.forEach(record -> { + // Check that all records are valid, and can be serialized and deserialized ... + validate(record); + verifyNotFromInitialSync(record); + verifyCreateOperation(record); + }); + + // --------------------------------------------------------------------------------------------------------------- + // Create and then update a document + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + AtomicReference id = new AtomicReference<>(); + primary().execute("create", mongo -> { + MongoDatabase db1 = mongo.getDatabase("dbit"); + MongoCollection coll = db1.getCollection("arbitrary"); + coll.drop(); + + // Insert the document with a generated ID ... + Document doc = Document.parse("{\"a\": 1, \"b\": 2}"); + InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true); + coll.insertOne(doc, insertOptions); + + // Find the document to get the generated ID ... + doc = coll.find().first(); + Testing.debug("Document: " + doc); + id.set(doc.getObjectId("_id").toString()); + Testing.debug("Document ID: " + id.get()); + }); + + primary().execute("update", mongo -> { + MongoDatabase db1 = mongo.getDatabase("dbit"); + MongoCollection coll = db1.getCollection("arbitrary"); + + // Find the document ... + Document doc = coll.find().first(); + Testing.debug("Document: " + doc); + Document filter = Document.parse("{\"a\": 1}"); + Document operation = Document.parse("{ \"$set\": { \"b\": 10 } }"); + coll.updateOne(filter, operation); + + doc = coll.find().first(); + Testing.debug("Document: " + doc); + }); + + // Wait until we can consume the 1 insert and 1 update ... + SourceRecords insertAndUpdate = consumeRecordsByTopic(2); + assertThat(insertAndUpdate.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2); + assertThat(insertAndUpdate.topics().size()).isEqualTo(1); + records4.forEach(record -> { + // Check that all records are valid, and can be serialized and deserialized ... + validate(record); + verifyNotFromInitialSync(record); + verifyCreateOperation(record); + }); + SourceRecord insertRecord = insertAndUpdate.allRecordsInOrder().get(0); + SourceRecord updateRecord = insertAndUpdate.allRecordsInOrder().get(1); + Testing.debug("Insert event: " + insertRecord); + Testing.debug("Update event: " + updateRecord); + Struct insertKey = (Struct) insertRecord.key(); + Struct updateKey = (Struct) updateRecord.key(); + String insertId = toObjectId(insertKey.getString("id")).toString(); + String updateId = toObjectId(updateKey.getString("id")).toString(); + assertThat(insertId).isEqualTo(id.get()); + assertThat(updateId).isEqualTo(id.get()); + + // --------------------------------------------------------------------------------------------------------------- + // Delete a document + // --------------------------------------------------------------------------------------------------------------- + primary().execute("delete", mongo -> { + MongoDatabase db1 = mongo.getDatabase("dbit"); + MongoCollection coll = db1.getCollection("arbitrary"); + Document filter = Document.parse("{\"a\": 1}"); + coll.deleteOne(filter); + }); + + // Wait until we can consume the 1 delete ... + SourceRecords delete = consumeRecordsByTopic(2); + assertThat(delete.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2); + assertThat(delete.topics().size()).isEqualTo(1); + + SourceRecord deleteRecord = delete.allRecordsInOrder().get(0); + validate(deleteRecord); + verifyNotFromInitialSync(deleteRecord); + verifyDeleteOperation(deleteRecord); + + SourceRecord tombStoneRecord = delete.allRecordsInOrder().get(1); + validate(tombStoneRecord); + + Testing.debug("Delete event: " + deleteRecord); + Testing.debug("Tombstone event: " + tombStoneRecord); + Struct deleteKey = (Struct) deleteRecord.key(); + String deleteId = toObjectId(deleteKey.getString("id")).toString(); + assertThat(deleteId).isEqualTo(id.get()); + } + + protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) { + if (record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)) { + assertThat(record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)).isTrue(); + Struct value = (Struct) record.value(); + assertThat(value.getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("true"); + } + else { + // Only the last record in the initial sync should be marked as not being part of the initial sync ... + assertThat(foundLast.getAndSet(true)).isFalse(); + Struct value = (Struct) record.value(); + assertThat(value.getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isEqualTo("last"); + } + } + + protected void verifyNotFromInitialSync(SourceRecord record) { + assertThat(record.sourceOffset().containsKey(SourceInfo.INITIAL_SYNC)).isFalse(); + Struct value = (Struct) record.value(); + assertThat(value.getStruct(Envelope.FieldName.SOURCE).getString(SourceInfo.SNAPSHOT_KEY)).isNull(); + } + + protected void verifyCreateOperation(SourceRecord record) { + verifyOperation(record, Operation.CREATE); + } + + protected void verifyReadOperation(SourceRecord record) { + verifyOperation(record, Operation.READ); + } + + protected void verifyDeleteOperation(SourceRecord record) { + verifyOperation(record, Operation.DELETE); + } + + protected void verifyOperation(SourceRecord record, Operation expected) { + Struct value = (Struct) record.value(); + assertThat(value.getString(Envelope.FieldName.OPERATION)).isEqualTo(expected.code()); + } + + @Override + protected MongoPrimary primary() { + var discovery = new ReplicaSetDiscovery(context); + var sets = discovery.getReplicaSets(); + var replicaSet = sets.all().get(0); + return context.getConnectionContext().primaryFor(replicaSet, context.filters(), connectionErrorHandler(3)); + } + + protected void storeDocuments(String dbName, String collectionName, String pathOnClasspath) { + primary().execute("storing documents", mongo -> { + Testing.debug("Storing in '" + dbName + "." + collectionName + "' documents loaded from from '" + pathOnClasspath + "'"); + MongoDatabase db1 = mongo.getDatabase(dbName); + MongoCollection coll = db1.getCollection(collectionName); + coll.drop(); + storeDocuments(coll, pathOnClasspath); + }); + } + + protected void storeDocuments(MongoCollection collection, String pathOnClasspath) { + InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true); + loadTestDocuments(pathOnClasspath).forEach(doc -> { + assertThat(doc).isNotNull(); + assertThat(doc.size()).isGreaterThan(0); + collection.insertOne(doc, insertOptions); + }); + } + + protected void storeDocumentsInTx(String dbName, String collectionName, String pathOnClasspath) { + primary().execute("storing documents", mongo -> { + Testing.debug("Storing in '" + dbName + "." + collectionName + "' documents loaded from from '" + pathOnClasspath + "'"); + MongoDatabase db1 = mongo.getDatabase(dbName); + MongoCollection coll = db1.getCollection(collectionName); + coll.drop(); + db1.createCollection(collectionName); + final ClientSession session = mongo.startSession(); + + MongoDatabase admin = mongo.getDatabase("admin"); + if (admin != null) { + int timeout = Integer.parseInt(System.getProperty("mongo.transaction.lock.request.timeout.ms", "1000")); + Testing.debug("Setting MongoDB transaction lock request timeout as '" + timeout + "ms'"); + admin.runCommand(session, new Document().append("setParameter", 1).append("maxTransactionLockRequestTimeoutMillis", timeout)); + } + + session.startTransaction(); + storeDocuments(session, coll, pathOnClasspath); + session.commitTransaction(); + }); + } + + protected void storeDocuments(ClientSession session, MongoCollection collection, String pathOnClasspath) { + InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true); + loadTestDocuments(pathOnClasspath).forEach(doc -> { + assertThat(doc).isNotNull(); + assertThat(doc.size()).isGreaterThan(0); + if (session == null) { + collection.insertOne(doc, insertOptions); + } + else { + collection.insertOne(session, doc, insertOptions); + } + }); + } + + protected List loadTestDocuments(String pathOnClasspath) { + List results = new ArrayList<>(); + try (InputStream stream = Files.readResourceAsStream(pathOnClasspath)) { + assertThat(stream).isNotNull(); + IoUtil.readLines(stream, line -> { + Document doc = Document.parse(line); + assertThat(doc.size()).isGreaterThan(0); + results.add(doc); + }); + } + catch (IOException e) { + fail("Unable to find or read file '" + pathOnClasspath + "': " + e.getMessage()); + } + return results; + } + + protected BiConsumer connectionErrorHandler(int numErrorsBeforeFailing) { + AtomicInteger attempts = new AtomicInteger(); + return (desc, error) -> { + if (attempts.incrementAndGet() > numErrorsBeforeFailing) { + fail("Unable to connect to primary after " + numErrorsBeforeFailing + " errors trying to " + desc + ": " + error); + } + logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error); + }; + } + + private String formatObjectId(ObjectId objId) { + return "{\"$oid\": \"" + objId + "\"}"; + } + + private ObjectId toObjectId(String oid) { + return new ObjectId(oid.substring(10, oid.length() - 2)); + } +}