diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbContainer.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbContainer.java
index c9d6dbf66..360bd2d7e 100644
--- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbContainer.java
+++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbContainer.java
@@ -28,10 +28,10 @@
/**
* A container for running a single MongoDB {@code mongod} or {@code mongos} process.
*
- * In order to interact with a running container from the host using a client driver, the container's network alias
- * ({@link #name}) must be resolvable from the host. On most systems this will require configuring {@code /etc/hosts}
- * to have an entry that maps {@link #name} to {@code 127.0.0.1}. To make this portable across systems, fixed ports are
- * used on the host and are mapped exactly to the container. Random free ports are assigned to minimize the chance of
+ * In order to interact with a running container from the host running Docker Desktop using a client driver, the
+ * container's network alias ({@link #name}) must be resolvable from the host. On most systems this will require
+ * configuring {@code /etc/hosts} to have an entry that maps {@link #name} to {@code 127.0.0.1}. Fixed ports are used on
+ * the host and are mapped 1:1 exactly with the container. Random free ports are assigned to minimize the chance of
* conflicts.
*/
public class MongoDbContainer extends GenericContainer {
@@ -55,7 +55,7 @@ public static Builder node() {
public static final class Builder {
private String name;
- private int port = -1;
+ private int port = 27017;
private String replicaSet;
private Network network = Network.SHARED;
@@ -88,12 +88,18 @@ public MongoDbContainer build() {
private MongoDbContainer(Builder builder) {
super(IMAGE_NAME);
this.name = builder.name;
- this.port = builder.port == -1 ? findFreePort() : builder.port;
this.replicaSet = builder.replicaSet;
+ if (isDockerDesktop()) {
+ // See class-level Java Docs
+ this.port = findFreePort();
+ addFixedExposedPort(port, port);
+ }
+ else {
+ this.port = builder.port;
+ }
+
withNetwork(builder.network);
- addFixedExposedPort(port, port);
- withCreateContainerCmdModifier(cmd -> cmd.withName(name));
withNetworkAliases(name);
withCommand(
"--replSet", replicaSet,
@@ -134,6 +140,13 @@ public ServerAddress getNamedAddress() {
return new ServerAddress(name, port);
}
+ /**
+ * Invokes rs.initiate on the
+ * container.
+ *
+ * @param configServer whether this replica set is a used for a sharded cluster's config server.
+ * @param serverAddresses the list of hostname / port numbers of the set members
+ */
public void initReplicaSet(boolean configServer, ServerAddress... serverAddresses) {
LOGGER.info("[{}:{}] Initializing replica set...", replicaSet, name);
eval("rs.initiate({_id:'" + replicaSet + "',configsvr:" + configServer + ",members:[" +
@@ -144,6 +157,10 @@ public void initReplicaSet(boolean configServer, ServerAddress... serverAddresse
"]});");
}
+ /**
+ * Invokes rs.stepDown on the
+ * container to instruct the primary of the replica set to become the primary.
+ */
public void stepDown() {
LOGGER.info("[{}:{}] Stepping down...", replicaSet, name);
eval("rs.stepDown();");
diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbReplicaSet.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbReplicaSet.java
index 3bc0fb25c..e06976705 100644
--- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbReplicaSet.java
+++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbReplicaSet.java
@@ -112,6 +112,9 @@ public String getName() {
return name;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public Set getDependencies() {
return new HashSet<>(members);
@@ -128,10 +131,18 @@ public String getConnectionString() {
.collect(joining(","));
}
+ /**
+ * Returns the replica set member containers.
+ *
+ * @return the replica set members
+ */
public List getMembers() {
return members;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void start() {
// `start` needs to be reentrant for `Startables.deepStart` or it will be sad
@@ -159,6 +170,9 @@ public void start() {
started = true;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void stop() {
LOGGER.info("[{}] Stopping...", name);
@@ -168,7 +182,10 @@ public void stop() {
private void initializeReplicaSet() {
var arbitraryNode = members.get(0);
- var serverAddresses = members.stream().map(MongoDbContainer::getClientAddress).toArray(ServerAddress[]::new);
+ var serverAddresses = members.stream()
+ .map(MongoDbContainer::getClientAddress)
+ .toArray(ServerAddress[]::new);
+
arbitraryNode.initReplicaSet(configServer, serverAddresses);
}
@@ -199,7 +216,7 @@ public Optional tryPrimary() {
private Optional findMember(ServerDescription serverDescription) {
return members.stream()
- .filter(node -> node.getNamedAddress().equals(serverDescription.getAddress()) ||
+ .filter(node -> node.getNamedAddress().equals(serverDescription.getAddress()) || // Match by name or possibly IP
node.getClientAddress().equals(serverDescription.getAddress()))
.findFirst();
}
@@ -220,6 +237,7 @@ private ClusterDescription getClusterDescription() {
try (var client = MongoClients.create(getConnectionString())) {
// Force an actual connection via `first` since `listDatabaseNames` is lazily evaluated
client.listDatabaseNames().first();
+
return client.getClusterDescription();
}
}
diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedCluster.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedCluster.java
index fb41b252d..aebe7cfec 100644
--- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedCluster.java
+++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedCluster.java
@@ -9,9 +9,11 @@
import static io.debezium.connector.mongodb.cluster.MongoDbReplicaSet.replicaSet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.rangeClosed;
+import static org.awaitility.Awaitility.await;
import java.util.List;
import java.util.Objects;
@@ -25,6 +27,9 @@
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
+import com.mongodb.BasicDBObject;
+import com.mongodb.client.MongoClients;
+
/**
* A MongoDB sharded cluster.
*/
@@ -91,6 +96,9 @@ private MongoDbShardedCluster(Builder builder) {
this.routers = createRouters();
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void start() {
// `start` needs to be reentrant for `Startables.deepStart` or it will be sad
@@ -109,6 +117,9 @@ public void start() {
started = true;
}
+ /**
+ * {@inheritDoc}
+ */
@Override
public void stop() {
// Idempotent
@@ -160,7 +171,8 @@ private MongoDbReplicaSet createShard(int i) {
var shard = replicaSet()
.network(network)
.namespace("test-mongo-shard" + i + "-replica")
- .name("shard" + i).memberCount(replicaCount)
+ .name("shard" + i)
+ .memberCount(replicaCount)
.build();
shard.getMembers().forEach(node -> node.setCommand(
@@ -208,13 +220,17 @@ private MongoDbContainer createRouter(Network network, int i) {
"mongos",
"--port", String.valueOf(router.getNamedAddress().getPort()),
"--bind_ip", "localhost," + router.getNamedAddress().getHost(),
- "--configdb", formatReplicaSetAddress(configServers, true));
+ "--configdb", formatReplicaSetAddress(configServers, /* namedAddess= */ true));
router.getDependencies().addAll(shards);
router.getDependencies().add(configServers);
return router;
}
+ /**
+ * Invokes sh.addShard
+ * on a router to add a new shard replica set to the sharded cluster.
+ */
public void addShard() {
var shard = createShard(shards.size() + 1);
shard.start();
@@ -222,9 +238,28 @@ public void addShard() {
addShard(shard);
}
+ /**
+ * Invokes the removeShard command to
+ * remove the last added {@code shard} from the sharded cluster.
+ *
+ * Waits until the state of the shard is {@code completed} before shutting down the shard replica set.
+ */
public void removeShard() {
var shard = shards.remove(shards.size() - 1);
- removeShard(shard);
+ LOGGER.info("Removing shard: {}", shard.getName());
+
+ // See https://www.mongodb.com/docs/manual/reference/command/removeShard/
+ try (var client = MongoClients.create(getConnectionString())) {
+ var admin = client.getDatabase("admin");
+ var command = new BasicDBObject("removeShard", shard.getName());
+ await()
+ .atMost(30, SECONDS)
+ .pollInterval(1, SECONDS)
+ .until(() -> admin.runCommand(command)
+ .get("state", String.class)
+ .equals("completed"));
+ }
+
shard.stop();
}
@@ -234,29 +269,21 @@ private void addShards() {
private void addShard(MongoDbReplicaSet shard) {
// See https://www.mongodb.com/docs/v6.0/tutorial/deploy-shard-cluster/#add-shards-to-the-cluster
- var shardAddress = formatReplicaSetAddress(shard, false);
+ var shardAddress = formatReplicaSetAddress(shard, /* namedAddess= */ false);
LOGGER.info("Adding shard: {}", shardAddress);
var arbitraryRouter = routers.get(0);
arbitraryRouter.eval(
"sh.addShard('" + shardAddress + "');");
}
- private void removeShard(MongoDbReplicaSet shard) {
- // See https://www.mongodb.com/docs/manual/reference/command/removeShard/
- LOGGER.info("Removing shard: {}", shard.getName());
- var arbitraryRouter = routers.get(0);
- arbitraryRouter.eval(
- "db.adminCommand({removeShard: '" + shard.getName() + "'});");
- }
-
private Stream stream() {
return Stream.concat(Stream.concat(shards.stream(), Stream.of(configServers)), routers.stream());
}
- private static String formatReplicaSetAddress(MongoDbReplicaSet replicaSet, boolean named) {
+ private static String formatReplicaSetAddress(MongoDbReplicaSet replicaSet, boolean namedAddress) {
// See https://www.mongodb.com/docs/v6.0/reference/method/sh.addShard/#mongodb-method-sh.addShard
return replicaSet.getName() + "/" + replicaSet.getMembers().stream()
- .map(named ? MongoDbContainer::getNamedAddress : MongoDbContainer::getClientAddress)
+ .map(namedAddress ? MongoDbContainer::getNamedAddress : MongoDbContainer::getClientAddress)
.map(Object::toString)
.collect(joining(","));
}
diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedClusterIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedClusterIT.java
index 43f12ada3..c3eea8fff 100644
--- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedClusterIT.java
+++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/cluster/MongoDbShardedClusterIT.java
@@ -16,8 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.mongodb.BasicDBObject;
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
+import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
/**
@@ -47,7 +49,10 @@ public void testCluster() {
var collectionName = "docs";
cluster.shardCollection(databaseName, collectionName, "name");
- var collection = client.getDatabase(databaseName).getCollection(collectionName);
+ var database = client.getDatabase(databaseName);
+ assertShardCount(client, 1);
+
+ var collection = database.getCollection(collectionName);
rangeClosed(1, 10)
.mapToObj(i -> Document.parse("{name:" + i + "}"))
.forEach(collection::insertOne);
@@ -58,11 +63,21 @@ public void testCluster() {
logger.info("Connected to cluster: {}", client.getClusterDescription());
cluster.addShard();
+ assertShardCount(client, 2);
logger.info("Connected to cluster: {}", client.getClusterDescription());
cluster.removeShard();
+ assertShardCount(client, 1);
}
}
}
+ private static void assertShardCount(MongoClient client, int expectedShardCount) {
+ assertThat(client
+ .getDatabase("admin")
+ .runCommand(new BasicDBObject("listShards", 1))
+ .getList("shards", Document.class))
+ .hasSize(expectedShardCount);
+ }
+
}