DBZ-4339 Remove need for container names and fixed ports on Linux.

This commit is contained in:
Bobby Tiernay 2022-11-26 15:32:21 -05:00 committed by Jiri Pechanec
parent b6712c7ccc
commit 8f6abd4428
4 changed files with 102 additions and 25 deletions

View File

@ -28,10 +28,10 @@
/**
* A container for running a single MongoDB {@code mongod} or {@code mongos} process.
* <p>
* In order to interact with a running container from the host using a client driver, the container's network alias
* ({@link #name}) must be resolvable from the host. On most systems this will require configuring {@code /etc/hosts}
* to have an entry that maps {@link #name} to {@code 127.0.0.1}. To make this portable across systems, fixed ports are
* used on the host and are mapped exactly to the container. Random free ports are assigned to minimize the chance of
* In order to interact with a running container from the host running Docker Desktop using a client driver, the
* container's network alias ({@link #name}) must be resolvable from the host. On most systems this will require
* configuring {@code /etc/hosts} to have an entry that maps {@link #name} to {@code 127.0.0.1}. Fixed ports are used on
* the host and are mapped 1:1 exactly with the container. Random free ports are assigned to minimize the chance of
* conflicts.
*/
public class MongoDbContainer extends GenericContainer<MongoDbContainer> {
@ -55,7 +55,7 @@ public static Builder node() {
public static final class Builder {
private String name;
private int port = -1;
private int port = 27017;
private String replicaSet;
private Network network = Network.SHARED;
@ -88,12 +88,18 @@ public MongoDbContainer build() {
private MongoDbContainer(Builder builder) {
super(IMAGE_NAME);
this.name = builder.name;
this.port = builder.port == -1 ? findFreePort() : builder.port;
this.replicaSet = builder.replicaSet;
if (isDockerDesktop()) {
// See class-level Java Docs
this.port = findFreePort();
addFixedExposedPort(port, port);
}
else {
this.port = builder.port;
}
withNetwork(builder.network);
addFixedExposedPort(port, port);
withCreateContainerCmdModifier(cmd -> cmd.withName(name));
withNetworkAliases(name);
withCommand(
"--replSet", replicaSet,
@ -134,6 +140,13 @@ public ServerAddress getNamedAddress() {
return new ServerAddress(name, port);
}
/**
* Invokes <a href="https://www.mongodb.com/docs/manual/reference/method/rs.initiate/">rs.initiate</a> on the
* container.
*
* @param configServer whether this replica set is a used for a <a href="https://www.mongodb.com/docs/manual/reference/replica-configuration/#mongodb-rsconf-rsconf.configsvr">sharded cluster's config server</a>.
* @param serverAddresses the list of <a href="https://www.mongodb.com/docs/manual/reference/replica-configuration/#mongodb-rsconf-rsconf.members-n-.host">hostname / port numbers</a> of the set members
*/
public void initReplicaSet(boolean configServer, ServerAddress... serverAddresses) {
LOGGER.info("[{}:{}] Initializing replica set...", replicaSet, name);
eval("rs.initiate({_id:'" + replicaSet + "',configsvr:" + configServer + ",members:[" +
@ -144,6 +157,10 @@ public void initReplicaSet(boolean configServer, ServerAddress... serverAddresse
"]});");
}
/**
* Invokes <a href="https://www.mongodb.com/docs/manual/reference/method/rs.stepDown/">rs.stepDown</a> on the
* container to instruct the primary of the replica set to become the primary.
*/
public void stepDown() {
LOGGER.info("[{}:{}] Stepping down...", replicaSet, name);
eval("rs.stepDown();");

View File

@ -112,6 +112,9 @@ public String getName() {
return name;
}
/**
* {@inheritDoc}
*/
@Override
public Set<Startable> getDependencies() {
return new HashSet<>(members);
@ -128,10 +131,18 @@ public String getConnectionString() {
.collect(joining(","));
}
/**
* Returns the replica set member containers.
*
* @return the replica set members
*/
public List<MongoDbContainer> getMembers() {
return members;
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
// `start` needs to be reentrant for `Startables.deepStart` or it will be sad
@ -159,6 +170,9 @@ public void start() {
started = true;
}
/**
* {@inheritDoc}
*/
@Override
public void stop() {
LOGGER.info("[{}] Stopping...", name);
@ -168,7 +182,10 @@ public void stop() {
private void initializeReplicaSet() {
var arbitraryNode = members.get(0);
var serverAddresses = members.stream().map(MongoDbContainer::getClientAddress).toArray(ServerAddress[]::new);
var serverAddresses = members.stream()
.map(MongoDbContainer::getClientAddress)
.toArray(ServerAddress[]::new);
arbitraryNode.initReplicaSet(configServer, serverAddresses);
}
@ -199,7 +216,7 @@ public Optional<MongoDbContainer> tryPrimary() {
private Optional<MongoDbContainer> findMember(ServerDescription serverDescription) {
return members.stream()
.filter(node -> node.getNamedAddress().equals(serverDescription.getAddress()) ||
.filter(node -> node.getNamedAddress().equals(serverDescription.getAddress()) || // Match by name or possibly IP
node.getClientAddress().equals(serverDescription.getAddress()))
.findFirst();
}
@ -220,6 +237,7 @@ private ClusterDescription getClusterDescription() {
try (var client = MongoClients.create(getConnectionString())) {
// Force an actual connection via `first` since `listDatabaseNames` is lazily evaluated
client.listDatabaseNames().first();
return client.getClusterDescription();
}
}

View File

@ -9,9 +9,11 @@
import static io.debezium.connector.mongodb.cluster.MongoDbReplicaSet.replicaSet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.rangeClosed;
import static org.awaitility.Awaitility.await;
import java.util.List;
import java.util.Objects;
@ -25,6 +27,9 @@
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import com.mongodb.BasicDBObject;
import com.mongodb.client.MongoClients;
/**
* A MongoDB sharded cluster.
*/
@ -91,6 +96,9 @@ private MongoDbShardedCluster(Builder builder) {
this.routers = createRouters();
}
/**
* {@inheritDoc}
*/
@Override
public void start() {
// `start` needs to be reentrant for `Startables.deepStart` or it will be sad
@ -109,6 +117,9 @@ public void start() {
started = true;
}
/**
* {@inheritDoc}
*/
@Override
public void stop() {
// Idempotent
@ -160,7 +171,8 @@ private MongoDbReplicaSet createShard(int i) {
var shard = replicaSet()
.network(network)
.namespace("test-mongo-shard" + i + "-replica")
.name("shard" + i).memberCount(replicaCount)
.name("shard" + i)
.memberCount(replicaCount)
.build();
shard.getMembers().forEach(node -> node.setCommand(
@ -208,13 +220,17 @@ private MongoDbContainer createRouter(Network network, int i) {
"mongos",
"--port", String.valueOf(router.getNamedAddress().getPort()),
"--bind_ip", "localhost," + router.getNamedAddress().getHost(),
"--configdb", formatReplicaSetAddress(configServers, true));
"--configdb", formatReplicaSetAddress(configServers, /* namedAddess= */ true));
router.getDependencies().addAll(shards);
router.getDependencies().add(configServers);
return router;
}
/**
* Invokes <a hrer="https://www.mongodb.com/docs/v6.0/reference/method/sh.addShard/#mongodb-method-sh.addShard">sh.addShard</a>
* on a router to add a new shard replica set to the sharded cluster.
*/
public void addShard() {
var shard = createShard(shards.size() + 1);
shard.start();
@ -222,9 +238,28 @@ public void addShard() {
addShard(shard);
}
/**
* Invokes the <a href="https://www.mongodb.com/docs/manual/reference/command/removeShard/">removeShard</a> command to
* remove the last added {@code shard} from the sharded cluster.
* <p>
* Waits until the state of the shard is {@code completed} before shutting down the shard replica set.
*/
public void removeShard() {
var shard = shards.remove(shards.size() - 1);
removeShard(shard);
LOGGER.info("Removing shard: {}", shard.getName());
// See https://www.mongodb.com/docs/manual/reference/command/removeShard/
try (var client = MongoClients.create(getConnectionString())) {
var admin = client.getDatabase("admin");
var command = new BasicDBObject("removeShard", shard.getName());
await()
.atMost(30, SECONDS)
.pollInterval(1, SECONDS)
.until(() -> admin.runCommand(command)
.get("state", String.class)
.equals("completed"));
}
shard.stop();
}
@ -234,29 +269,21 @@ private void addShards() {
private void addShard(MongoDbReplicaSet shard) {
// See https://www.mongodb.com/docs/v6.0/tutorial/deploy-shard-cluster/#add-shards-to-the-cluster
var shardAddress = formatReplicaSetAddress(shard, false);
var shardAddress = formatReplicaSetAddress(shard, /* namedAddess= */ false);
LOGGER.info("Adding shard: {}", shardAddress);
var arbitraryRouter = routers.get(0);
arbitraryRouter.eval(
"sh.addShard('" + shardAddress + "');");
}
private void removeShard(MongoDbReplicaSet shard) {
// See https://www.mongodb.com/docs/manual/reference/command/removeShard/
LOGGER.info("Removing shard: {}", shard.getName());
var arbitraryRouter = routers.get(0);
arbitraryRouter.eval(
"db.adminCommand({removeShard: '" + shard.getName() + "'});");
}
private Stream<Startable> stream() {
return Stream.concat(Stream.concat(shards.stream(), Stream.of(configServers)), routers.stream());
}
private static String formatReplicaSetAddress(MongoDbReplicaSet replicaSet, boolean named) {
private static String formatReplicaSetAddress(MongoDbReplicaSet replicaSet, boolean namedAddress) {
// See https://www.mongodb.com/docs/v6.0/reference/method/sh.addShard/#mongodb-method-sh.addShard
return replicaSet.getName() + "/" + replicaSet.getMembers().stream()
.map(named ? MongoDbContainer::getNamedAddress : MongoDbContainer::getClientAddress)
.map(namedAddress ? MongoDbContainer::getNamedAddress : MongoDbContainer::getClientAddress)
.map(Object::toString)
.collect(joining(","));
}

View File

@ -16,8 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.BasicDBObject;
import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
/**
@ -47,7 +49,10 @@ public void testCluster() {
var collectionName = "docs";
cluster.shardCollection(databaseName, collectionName, "name");
var collection = client.getDatabase(databaseName).getCollection(collectionName);
var database = client.getDatabase(databaseName);
assertShardCount(client, 1);
var collection = database.getCollection(collectionName);
rangeClosed(1, 10)
.mapToObj(i -> Document.parse("{name:" + i + "}"))
.forEach(collection::insertOne);
@ -58,11 +63,21 @@ public void testCluster() {
logger.info("Connected to cluster: {}", client.getClusterDescription());
cluster.addShard();
assertShardCount(client, 2);
logger.info("Connected to cluster: {}", client.getClusterDescription());
cluster.removeShard();
assertShardCount(client, 1);
}
}
}
private static void assertShardCount(MongoClient client, int expectedShardCount) {
assertThat(client
.getDatabase("admin")
.runCommand(new BasicDBObject("listShards", 1))
.getList("shards", Document.class))
.hasSize(expectedShardCount);
}
}