DBZ-6032 Connection management refactoring

This commit is contained in:
jcechace 2022-11-23 00:31:41 +01:00 committed by Jiri Pechanec
parent 2f44fcc064
commit 922191cdd7
34 changed files with 709 additions and 1375 deletions

View File

@ -6,36 +6,19 @@
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ConnectionString;
import com.mongodb.MongoCredential;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import io.debezium.config.Configuration;
import io.debezium.function.BlockingConsumer;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
/**
* @author Randall Hauch
@ -43,18 +26,10 @@
*/
public class ConnectionContext implements AutoCloseable {
/**
* A pause between failed MongoDB operations to prevent CPU throttling and DoS of
* target MongoDB database.
*/
private static final Duration PAUSE_AFTER_ERROR = Duration.ofMillis(500);
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionContext.class);
protected final Configuration config;
protected final MongoClients pool;
protected final DelayStrategy backoffStrategy;
protected final boolean useHostsAsSeeds;
/**
* @param config the configuration
@ -62,7 +37,8 @@ public class ConnectionContext implements AutoCloseable {
public ConnectionContext(Configuration config) {
this.config = config;
this.useHostsAsSeeds = config.getBoolean(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS);
final ConnectionString connectionString = connectionString();
final String username = config.getString(MongoDbConnectorConfig.USER);
final String password = config.getString(MongoDbConnectorConfig.PASSWORD);
final String adminDbName = config.getString(MongoDbConnectorConfig.AUTH_SOURCE);
@ -75,9 +51,9 @@ public ConnectionContext(Configuration config) {
final int serverSelectionTimeoutMs = config.getInteger(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS);
// Set up the client pool so that it ...
MongoClients.Builder clientBuilder = MongoClients.create();
MongoClients.Builder poolBuilder = MongoClients.create();
clientBuilder.settings()
poolBuilder.settings()
.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
.applyToClusterSettings(
@ -85,35 +61,79 @@ public ConnectionContext(Configuration config) {
.applyToServerSettings(
builder -> builder.heartbeatFrequency(heartbeatFrequencyMs, TimeUnit.MILLISECONDS));
// Use credentials if provided as part of connection String
var cs = connectionString();
cs
.map(ConnectionString::getCredential)
.ifPresent(clientBuilder::withCredential);
// Use credential if provided as properties
if (username != null || password != null) {
clientBuilder.withCredential(MongoCredential.createCredential(username, adminDbName, password.toCharArray()));
poolBuilder.withCredential(MongoCredential.createCredential(username, adminDbName, password.toCharArray()));
}
if (useSSL) {
clientBuilder.settings().applyToSslSettings(
poolBuilder.settings().applyToSslSettings(
builder -> builder.enabled(true).invalidHostNameAllowed(sslAllowInvalidHostnames));
}
clientBuilder.settings()
poolBuilder.settings()
.applyToSocketSettings(builder -> builder.connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
.readTimeout(socketTimeoutMs, TimeUnit.MILLISECONDS))
.applyToClusterSettings(
builder -> builder.serverSelectionTimeout(serverSelectionTimeoutMs, TimeUnit.MILLISECONDS));
pool = clientBuilder.build();
final int initialDelayInMs = config.getInteger(MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS);
final long maxDelayInMs = config.getLong(MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS);
this.backoffStrategy = DelayStrategy.exponential(Duration.ofMillis(initialDelayInMs), Duration.ofMillis(maxDelayInMs));
pool = poolBuilder.build();
}
public void shutdown() {
protected Logger logger() {
return LOGGER;
}
/**
* Initial connection string which is either a host specification or connection string
*
* @return hosts or connection string
*/
public String connectionSeed() {
String seed = config.getString(MongoDbConnectorConfig.CONNECTION_STRING);
if (seed == null) {
String hosts = config.getString(MongoDbConnectorConfig.HOSTS);
seed = ConnectionStrings.buildFromHosts(hosts);
}
return seed;
}
public ConnectionString connectionString() {
return new ConnectionString(connectionSeed());
}
/**
* Same as {@link #connectionSeed()} but masks sensitive information
*
* @return masked connection seed
*/
public String maskedConnectionSeed() {
return ConnectionStrings.mask(connectionSeed());
}
public Duration pollInterval() {
return Duration.ofMillis(config.getLong(MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS));
}
public MongoClient connect() {
return pool.client(connectionString());
}
/**
* Obtain a client scoped to specific replica set.
*
* @param replicaSet the replica set information; may not be null
* @param filters the filter configuration
* @param errorHandler the function to be called whenever the node is unable to
* {@link RetryingMongoClient#execute(String, BlockingConsumer)} execute} an operation to completion; may be null
* @return the client, or {@code null} if no primary could be found for the replica set
*/
public RetryingMongoClient connect(ReplicaSet replicaSet, ReadPreference preference, Filters filters,
BiConsumer<String, Throwable> errorHandler) {
return new RetryingMongoClient(replicaSet, preference, pool::client, filters, errorHandler);
}
@Override
public final void close() {
try {
// Closing all connections ...
logger().info("Closing all connections to {}", maskedConnectionSeed());
@ -123,421 +143,4 @@ public void shutdown() {
logger().error("Unexpected error shutting down the MongoDB clients", e);
}
}
@Override
public final void close() {
shutdown();
}
protected Logger logger() {
return LOGGER;
}
public MongoClients clients() {
return pool;
}
public boolean performSnapshotEvenIfNotNeeded() {
return false;
}
public MongoClient clientFor(ReplicaSet replicaSet) {
return clientFor(replicaSet.addresses());
}
public MongoClient clientFor(String seedAddresses) {
List<ServerAddress> addresses = MongoUtil.parseAddresses(seedAddresses);
return clientFor(addresses);
}
public MongoClient clientForSeedConnection() {
return connectionString()
.map(this::clientFor)
.orElseGet(() -> clientFor(hosts()));
}
public MongoClient clientFor(ConnectionString connectionString) {
return pool.clientForMembers(connectionString);
}
public MongoClient clientFor(List<ServerAddress> addresses) {
if (this.useHostsAsSeeds || addresses.isEmpty()) {
return pool.clientForMembers(addresses);
}
return pool.clientFor(addresses.get(0));
}
public String hosts() {
return config.getString(MongoDbConnectorConfig.HOSTS);
}
/**
* Initial connection seed which is either a host specification or connection string
*
* @return hosts or connection string
*/
public String connectionSeed() {
return connectionString()
.map(ConnectionString::toString)
.orElse(config.getString(MongoDbConnectorConfig.HOSTS));
}
/**
* Same as {@link #connectionSeed()} but masks sensitive information
*
* @return masked connection seed
*/
public String maskedConnectionSeed() {
var connectionSeed = connectionSeed();
return connectionString()
.map(ConnectionString::getCredential)
.map(creds -> Strings.mask(
connectionSeed,
creds.getUserName(),
creds.getSource(),
creds.getPassword() != null ? String.valueOf(creds.getPassword()) : null))
.orElse(connectionSeed);
}
public Optional<ConnectionString> connectionString() {
String raw = config.getString(MongoDbConnectorConfig.CONNECTION_STRING);
return Optional.ofNullable(raw).map(ConnectionString::new);
}
public Duration pollInterval() {
return Duration.ofMillis(config.getLong(MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS));
}
/**
* Obtain a client that will repeatedly try to obtain a client to the primary node of the replica set, waiting (and using
* this context's back-off strategy) if required until the primary becomes available.
*
* @param replicaSet the replica set information; may not be null
* @param filters the filter configuration
* @param errorHandler the function to be called whenever the primary is unable to
* {@link MongoPrimary#execute(String, Consumer) execute} an operation to completion; may be null
* @return the client, or {@code null} if no primary could be found for the replica set
*/
public ConnectionContext.MongoPrimary primaryFor(ReplicaSet replicaSet, Filters filters, BiConsumer<String, Throwable> errorHandler) {
return new ConnectionContext.MongoPrimary(this, replicaSet, filters, errorHandler);
}
/**
* Obtain a client that will repeatedly try to obtain a client to a node of preferred type of the replica set, waiting (and using
* this context's back-off strategy) if required until the node becomes available.
*
* @param replicaSet the replica set information; may not be null
* @param filters the filter configuration
* @param errorHandler the function to be called whenever the node is unable to
* {@link MongoPreferredNode#execute(String, Consumer) execute} an operation to completion; may be null
* @return the client, or {@code null} if no primary could be found for the replica set
*/
public ConnectionContext.MongoPreferredNode preferredFor(ReplicaSet replicaSet, ReadPreference preference, Filters filters,
BiConsumer<String, Throwable> errorHandler) {
return new ConnectionContext.MongoPreferredNode(this, replicaSet, preference, filters, errorHandler);
}
/**
* Obtain a client that will repeatedly try to obtain a client to a node of preferred type of the replica set, waiting (and using
* this context's back-off strategy) if required until the node becomes available.
*
* @param replicaSet the replica set information; may not be null
* @return the client, or {@code null} if no node of preferred type could be found for the replica set
*/
protected Supplier<MongoClient> preferredClientFor(ReplicaSet replicaSet, ReadPreference preference) {
return preferredClientFor(replicaSet, preference, (attempts, remaining, error) -> {
if (error == null) {
logger().info("Unable to connect to {} node of '{}' after attempt #{} ({} remaining)", preference.getName(),
replicaSet, attempts, remaining);
}
else {
logger().error("Error while attempting to connect to {} node of '{}' after attempt #{} ({} remaining): {}", preference.getName(),
replicaSet, attempts, remaining, error.getMessage(), error);
}
});
}
/**
* Obtain a client that will repeated try to obtain a client to the primary node of the replica set, waiting (and using
* this context's back-off strategy) if required until the primary becomes available.
*
* @param replicaSet the replica set information; may not be null
* @param handler the function that will be called when the primary could not be obtained; may not be null
* @return the client, or {@code null} if no primary could be found for the replica set
*/
protected Supplier<MongoClient> preferredClientFor(ReplicaSet replicaSet, ReadPreference preference, PreferredConnectFailed handler) {
Supplier<MongoClient> factory = () -> clientForPreferred(replicaSet, preference);
int maxAttempts = config.getInteger(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS);
return () -> {
int attempts = 0;
MongoClient client = null;
while (client == null) {
++attempts;
try {
// Try to get the primary
client = factory.get();
if (client != null) {
break;
}
}
catch (Throwable t) {
handler.failed(attempts, maxAttempts - attempts, t);
}
if (attempts >= maxAttempts) {
throw new ConnectException("Unable to connect to " + preference.getName() + " node of '" + replicaSet +
"' after " + attempts + " failed attempts");
}
handler.failed(attempts, maxAttempts - attempts, null);
backoffStrategy.sleepWhen(true);
}
return client;
};
}
@FunctionalInterface
public interface PreferredConnectFailed {
void failed(int attemptNumber, int attemptsRemaining, Throwable error);
}
/**
* A supplier of a client that connects only to the primary of a replica set. Operations on the primary will continue
*/
public static class MongoPrimary extends MongoPreferredNode {
protected MongoPrimary(ConnectionContext context, ReplicaSet replicaSet, Filters filters, BiConsumer<String, Throwable> errorHandler) {
super(context, replicaSet, ReadPreference.primary(), filters, errorHandler);
}
}
/**
* A supplier of a client that connects only to node of preferred type from a replica set. Operations on this node will continue
*/
public static class MongoPreferredNode {
private final ReplicaSet replicaSet;
private final Supplier<MongoClient> connectionSupplier;
private final Filters filters;
private final BiConsumer<String, Throwable> errorHandler;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ReadPreference preference;
protected MongoPreferredNode(
ConnectionContext context,
ReplicaSet replicaSet,
ReadPreference preference,
Filters filters,
BiConsumer<String, Throwable> errorHandler) {
this.replicaSet = replicaSet;
this.preference = preference;
this.connectionSupplier = context.preferredClientFor(replicaSet, preference);
this.filters = filters;
this.errorHandler = errorHandler;
}
/**
* Get the replica set.
*
* @return the replica set; never null
*/
public ReplicaSet replicaSet() {
return replicaSet;
}
/**
* Get read preference of
*
* @return the read preference
*/
public ReadPreference getPreference() {
return preference;
}
/**
* Get the address of the node with preferred type, if there is one.
*
* @return the address of the replica set's preferred node, or {@code null} if there is currently no node of preferred type
*/
public ServerAddress address() {
return execute("get replica set " + preference.getName(), client -> {
return MongoUtil.getPreferredAddress(client, preference);
});
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type.
*/
public void execute(String desc, Consumer<MongoClient> operation) {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) {
MongoClient client = connectionSupplier.get();
try {
operation.accept(client);
return;
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
}
try {
errorMetronome.pause();
}
catch (InterruptedException e) {
// Interruption is not propagated
}
}
}
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type
* @return return value of the executed operation
*/
public <T> T execute(String desc, Function<MongoClient, T> operation) {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) {
MongoClient client = connectionSupplier.get();
try {
return operation.apply(client);
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
}
try {
errorMetronome.pause();
}
catch (InterruptedException e) {
// Interruption is not propagated
}
}
}
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type.
* @throws InterruptedException if the operation was interrupted
*/
public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) {
MongoClient client = connectionSupplier.get();
try {
operation.accept(client);
return;
}
catch (InterruptedException e) {
throw e;
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
}
errorMetronome.pause();
}
}
}
/**
* Use a node of preferred type to get the names of all the databases in the replica set, applying the current database
* filter configuration. This method will block until a node of preferred type can be obtained to get the names of all
* databases in the replica set.
*
* @return the database names; never null but possibly empty
*/
public Set<String> databaseNames() {
return execute("get database names", client -> {
Set<String> databaseNames = new HashSet<>();
MongoUtil.forEachDatabaseName(
client,
dbName -> {
if (filters.databaseFilter().test(dbName)) {
databaseNames.add(dbName);
}
});
return databaseNames;
});
}
/**
* Use a node of preferred type to get the identifiers of all the collections in the replica set, applying the current
* collection filter configuration. This method will block until a primary can be obtained to get the
* identifiers of all collections in the replica set.
*
* @return the collection identifiers; never null
*/
public List<CollectionId> collections() {
String replicaSetName = replicaSet.replicaSetName();
// For each database, get the list of collections ...
return execute("get collections in databases", client -> {
List<CollectionId> collections = new ArrayList<>();
Set<String> databaseNames = databaseNames();
for (String dbName : databaseNames) {
MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> {
CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName);
if (filters.collectionFilter().test(collectionId)) {
collections.add(collectionId);
}
});
}
return collections;
});
}
private boolean isRunning() {
return running.get();
}
/**
* Terminates the execution loop of the current primary
*/
public void stop() {
running.set(false);
}
}
/**
* Obtain a client that talks only to the primary node of the replica set.
*
* @param replicaSet the replica set information; may not be null
* @return the client, or {@code null} if no primary could be found for the replica set
*/
protected MongoClient clientForPreferred(ReplicaSet replicaSet, ReadPreference preference) {
MongoClient replicaSetClient = clientFor(replicaSet);
final ClusterDescription clusterDescription = MongoUtil.clusterDescription(replicaSetClient);
if (clusterDescription.getType() == ClusterType.UNKNOWN) {
if (!this.useHostsAsSeeds) {
// No replica set status is available, but it may still be a replica set ...
return replicaSetClient;
}
// This is not a replica set, so there will be no oplog to read ...
throw new ConnectException("The MongoDB server(s) at '" + replicaSet +
"' is not a valid replica set and cannot be used");
}
// It is a replica set ...
ServerAddress preferredAddress = MongoUtil.getPreferredAddress(replicaSetClient, preference);
if (preferredAddress != null) {
return pool.clientFor(preferredAddress);
}
return null;
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.mongodb.ConnectionString;
import io.debezium.DebeziumException;
import io.debezium.util.Strings;
/**
* Host string parsing utilities
*/
public final class ConnectionStrings {
private ConnectionStrings() {
// intentionally private;
}
/**
* Regular expression that extracts the hosts for the replica sets. The raw expression is
* {@code (([^/]+)\/))?(.+)}.
*/
private static final Pattern HOST_PATTERN = Pattern.compile("(([^/]+)\\/)?(.+)");
public static Optional<String> parseFromHosts(String hosts) {
return matcher(hosts).map(m -> connectionString(m.group(2), m.group(3)));
}
public static String buildFromHosts(String hosts) {
return parseFromHosts(hosts).orElseThrow(() -> new DebeziumException("Unable to build connection string"));
}
public static String appendParameter(String connectionString, String name, String value) {
var param = name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
if (connectionString.endsWith("?")) {
return connectionString + param;
}
if (connectionString.endsWith("/")) {
return connectionString + "?" + param;
}
var pos = connectionString.lastIndexOf("?");
if (pos == -1) {
return connectionString + "/?" + param;
}
return connectionString + "&" + param;
}
public static String mask(String connectionString) {
var cs = new ConnectionString(connectionString);
var credentials = cs.getCredential();
return credentials == null ? connectionString
: Strings.mask(
connectionString,
credentials.getUserName(),
credentials.getSource(),
credentials.getPassword() != null ? String.valueOf(credentials.getPassword()) : null);
}
private static String connectionString(String rsName, String host) {
if (rsName == null) {
return String.format("mongodb://%s/", host);
}
else {
return String.format("mongodb://%s/?replicaSet=%s", host, rsName);
}
}
private static Optional<Matcher> matcher(String hosts) {
if (hosts == null || hosts.isBlank()) {
return Optional.empty();
}
Matcher matcher = HOST_PATTERN.matcher(hosts);
if (!matcher.matches()) {
return Optional.empty();
}
return Optional.of(matcher);
}
}

View File

@ -5,18 +5,15 @@
*/
package io.debezium.connector.mongodb;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.function.Consumer;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.connection.ClusterConnectionMode;
import io.debezium.annotation.ThreadSafe;
@ -76,136 +73,44 @@ public MongoClients build() {
}
}
protected static Supplier<MongoClientSettings.Builder> createSettingsSupplier(MongoClientSettings settings) {
return () -> MongoClientSettings.builder(settings);
}
private final Map<MongoClientSettings, MongoClient> connections = new ConcurrentHashMap<>();
private final Map<ServerAddress, MongoClient> directConnections = new ConcurrentHashMap<>();
private final Map<List<ServerAddress>, MongoClient> connections = new ConcurrentHashMap<>();
private final Map<ConnectionString, MongoClient> stringConnections = new ConcurrentHashMap<>();
private final Supplier<MongoClientSettings.Builder> settingsSupplier;
private final MongoClientSettings defaultSettings;
private MongoClients(MongoClientSettings.Builder settings) {
this.settingsSupplier = createSettingsSupplier(settings.build());
this.defaultSettings = settings.build();
}
/**
* Creates fresh {@link MongoClientSettings.Builder} from {@link #settingsSupplier}
* Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings}
* @return connection settings builder
*/
protected MongoClientSettings.Builder settings() {
return settingsSupplier.get();
return MongoClientSettings.builder(defaultSettings);
}
/**
* Clear out and close any open connections.
*/
public void clear() {
directConnections.values().forEach(MongoClient::close);
connections.values().forEach(MongoClient::close);
stringConnections.values().forEach(MongoClient::close);
directConnections.clear();
connections.clear();
stringConnections.clear();
}
/**
* Obtain a direct client connection to the specified server. This is typically used to connect to a standalone server,
* but it also can be used to obtain a client that will only use this server, even if the server is a member of a replica
* set or sharded cluster.
* <p>
* The format of the supplied string is one of the following:
*
* <pre>
* host:port
* host
* </pre>
*
* where {@code host} contains the resolvable hostname or IP address of the server, and {@code port} is the integral port
* number. If the port is not provided, the {@link ServerAddress#defaultPort() default port} is used. If neither the host
* or port are provided (or {@code addressString} is {@code null}), then an address will use the
* {@link ServerAddress#defaultHost() default host} and {@link ServerAddress#defaultPort() default port}.
*
* @param addressString the string that contains the host and port of the server
* @return the MongoClient instance; never null
*/
public MongoClient clientFor(String addressString) {
return clientFor(MongoUtil.parseAddress(addressString));
public MongoClient client(ConnectionString connectionString) {
return client(settings -> settings.applyConnectionString(connectionString));
}
/**
* Obtain a direct client connection to the specified server. This is typically used to connect to a standalone server,
* but it also can be used to obtain a client that will only use this server, even if the server is a member of a replica
* set or sharded cluster.
*
* @param address the address of the server to use
* @return the MongoClient instance; never null
*/
public MongoClient clientFor(ServerAddress address) {
return directConnections.computeIfAbsent(address, this::directConnection);
public MongoClient client(ReplicaSet replicaSet, ReadPreference preference) {
return client(settings -> settings
.applyConnectionString(replicaSet.connectionString())
.readPreference(preference));
}
/**
* Obtain a client connection to the replica set or cluster. The supplied addresses are used as seeds, and once a connection
* is established it will discover all of the members.
* <p>
* The format of the supplied string is one of the following:
*
* <pre>
* replicaSetName/host:port
* replicaSetName/host:port,host2:port2
* replicaSetName/host:port,host2:port2,host3:port3
* host:port
* host:port,host2:port2
* host:port,host2:port2,host3:port3
* </pre>
*
* where {@code replicaSetName} is the name of the replica set, {@code host} contains the resolvable hostname or IP address of
* the server, and {@code port} is the integral port number. If the port is not provided, the
* {@link ServerAddress#defaultPort() default port} is used. If neither the host or port are provided (or
* {@code addressString} is {@code null}), then an address will use the {@link ServerAddress#defaultHost() default host} and
* {@link ServerAddress#defaultPort() default port}.
* <p>
* This method does not use the replica set name.
*
* @param addressList the string containing a comma-separated list of host and port pairs, optionally preceded by a
* replica set name
* @return the MongoClient instance; never null
*/
public MongoClient clientForMembers(String addressList) {
return clientForMembers(MongoUtil.parseAddresses(addressList));
}
protected MongoClient client(Consumer<MongoClientSettings.Builder> configurator) {
MongoClientSettings.Builder settings = settings();
configurator.accept(settings);
public MongoClient clientForMembers(ConnectionString connectionString) {
return stringConnections.computeIfAbsent(connectionString, this::connection);
}
/**
* Obtain a client connection to the replica set or cluster. The supplied addresses are used as seeds, and once a connection
* is established it will discover all of the members.
*
* @param seedAddresses the seed addresses
* @return the MongoClient instance; never null
*/
public MongoClient clientForMembers(List<ServerAddress> seedAddresses) {
return connections.computeIfAbsent(seedAddresses, this::connection);
}
protected MongoClient directConnection(ServerAddress address) {
MongoClientSettings.Builder settings = settings().applyToClusterSettings(builder -> builder.hosts(Collections.singletonList(address)));
return com.mongodb.client.MongoClients.create(settings.build());
}
protected MongoClient connection(List<ServerAddress> addresses) {
MongoClientSettings.Builder settings = settings().applyToClusterSettings(builder -> builder.hosts(addresses));
if (addresses.size() > 1) {
settings.applyToClusterSettings(builder -> builder.mode(ClusterConnectionMode.MULTIPLE));
}
return com.mongodb.client.MongoClients.create(settings.build());
}
protected MongoClient connection(ConnectionString connectionString) {
MongoClientSettings.Builder settings = settings().applyConnectionString(connectionString);
return com.mongodb.client.MongoClients.create(settings.build());
return connections.computeIfAbsent(settings.build(), com.mongodb.client.MongoClients::create);
}
}

View File

@ -9,22 +9,23 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ConnectionString;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext.PreviousContext;
@ -45,10 +46,6 @@
* with the host addresses of the replica set. When the connector starts, it will discover the primary node and use it to
* replicate the contents of the replica set.
* <p>
* If necessary, a {@link MongoDbConnectorConfig#AUTO_DISCOVER_MEMBERS configuration property} can be used to disable the
* logic used to discover the primary node, an in this case the connector will use the first host address specified in the
* configuration as the primary node. Obviously this may cause problems when the replica set elects a different node as the
* primary, since the connector will continue to read the oplog using the same node that may no longer be the primary.
*
* <h2>Parallel Replication</h2>
* The connector will concurrently and independently replicate each of the replica sets. When the connector is asked to
@ -102,10 +99,10 @@ public Class<? extends Task> taskClass() {
public void start(Map<String, String> props) {
// Validate the configuration ...
final Configuration config = Configuration.from(props);
if (!config.validateAndRecord(MongoDbConnectorConfig.ALL_FIELDS, logger::error)) {
throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
if (!config.validateAndRecord(MongoDbConnectorConfig.ALL_FIELDS, logger::error)) {
throw new DebeziumException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
}
this.config = config;
// Set up the replication context ...
@ -152,15 +149,17 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
ReplicaSets replicaSets = monitorThread.getReplicaSets(10, TimeUnit.SECONDS);
if (replicaSets != null) {
logger.info("Subdividing {} MongoDB replica set(s) into at most {} task(s)",
replicaSets.replicaSetCount(), maxTasks);
replicaSets.size(), maxTasks);
replicaSets.subdivide(maxTasks, replicaSetsForTask -> {
// Create the configuration for each task ...
int taskId = taskConfigs.size();
logger.info("Configuring MongoDB connector task {} to capture events for replica set(s) at {}", taskId, replicaSetsForTask.hosts());
Properties configProps = config.asProperties();
configProps.remove(MongoDbConnectorConfig.CONNECTION_STRING.name());
String rsConnectionStrings = replicaSetsForTask.all().stream()
.map(ReplicaSet::connectionString)
.map(ConnectionString::toString)
.collect(Collectors.joining(";"));
logger.info("Configuring MongoDB connector task {} to capture events for replica set(s): {}", taskId, rsConnectionStrings);
taskConfigs.add(config.edit()
.with(MongoDbConnectorConfig.HOSTS, replicaSetsForTask.hosts())
.with(MongoDbConnectorConfig.REPLICA_SETS, rsConnectionStrings)
.with(MongoDbConnectorConfig.TASK_ID, taskId)
.build()
.asMap());
@ -187,7 +186,7 @@ public void stop() {
}
try {
if (this.connectionContext != null) {
this.connectionContext.shutdown();
this.connectionContext.close();
}
}
finally {
@ -227,7 +226,7 @@ public Config validate(Map<String, String> connectorConfigs) {
// Try to connect to the database ...
try (ConnectionContext connContext = new ConnectionContext(config)) {
try (MongoClient client = connContext.clientForSeedConnection()) {
try (MongoClient client = connContext.connect()) {
client.listDatabaseNames();
}
}

View File

@ -226,36 +226,16 @@ public boolean isIncludePreImage() {
.withDescription("Database connection string.");
/**
* The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the
* replica set.
* The comma-separated list of replica set names
*/
public static final Field HOSTS = Field.create("mongodb.hosts")
.withDisplayName("Hosts")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(MongoDbConnectorConfig::validateHosts)
.withDescription("The hostname and port pairs (in the form 'host' or 'host:port') "
+ "of the MongoDB server(s) in the replica set.");
public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover")
.withDisplayName("Auto-discovery")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(true)
.withValidation(Field::isBoolean, MongoDbConnectorConfig::validateAutodiscovery)
.withDescription("Specifies whether the addresses in 'hosts' are seeds that should be "
+ "used to discover all members of the cluster or replica set ('true'), "
+ "or whether the address(es) in 'hosts' should be used as is ('false'). "
+ "The default is 'true'.");
public static final Field REPLICA_SETS = Field.create("mongodb.replica.sets")
.withDescription("Internal use only")
.withType(Type.LIST);
public static final Field USER = Field.create("mongodb.user")
.withDisplayName("User")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 4))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 2))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("Database user for connecting to MongoDB, if necessary.");
@ -263,7 +243,7 @@ public boolean isIncludePreImage() {
public static final Field PASSWORD = Field.create("mongodb.password")
.withDisplayName("Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 5))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 3))
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDescription("Password to be used when connecting to MongoDB, if necessary.");
@ -271,7 +251,7 @@ public boolean isIncludePreImage() {
public static final Field MONGODB_POLL_INTERVAL_MS = Field.create("mongodb.poll.interval.ms")
.withDisplayName("Replica membership poll interval (ms)")
.withType(Type.LONG)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 6))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 4))
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDefault(30_000L)
@ -529,6 +509,35 @@ public boolean isIncludePreImage() {
"for data change, schema change, transaction, heartbeat event etc.")
.withDefault(DefaultTopicNamingStrategy.class.getName());
/**
* The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the
* replica set.
*/
@Deprecated
public static final Field HOSTS = Field.create("mongodb.hosts")
.withDisplayName("Hosts")
.withType(Type.LIST)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 5))
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
.withValidation(MongoDbConnectorConfig::validateHosts)
.withDescription("The hostname and port pairs (in the form 'host' or 'host:port') "
+ "of the MongoDB server(s) in the replica set.");
@Deprecated
public static final Field AUTO_DISCOVER_MEMBERS = Field.create("mongodb.members.auto.discover")
.withDisplayName("Auto-discovery")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 6))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDefault(true)
.withValidation(Field::isBoolean)
.withDescription("Specifies whether the addresses in 'hosts' are seeds that should be "
+ "used to discover all members of the cluster or replica set ('true'), "
+ "or whether the address(es) in 'hosts' should be used as is ('false'). "
+ "The default is 'true'.");
private static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
.name("MongoDB")
.type(
@ -597,31 +606,22 @@ private static int validateHosts(Configuration config, Field field, ValidationOu
String hosts = config.getString(field);
String connectionString = config.getString(CONNECTION_STRING);
if (hosts == null && connectionString == null) {
problems.accept(field, hosts, "Host specification or connection string is required");
if (hosts == null) {
return 0;
}
LOGGER.warn("Config property '{}' will be removed in the future, use '{}' instead", field.name(), CONNECTION_STRING.name());
if (connectionString != null) {
LOGGER.warn("Config property '{}' is ignored, property '{}' takes precedence", field.name(), CONNECTION_STRING.name());
return 0;
}
if (ConnectionStrings.parseFromHosts(hosts).isEmpty()) {
problems.accept(field, null, "Invalid host specification");
return 1;
}
int count = 0;
if (hosts != null && ReplicaSets.parse(hosts).all().isEmpty()) {
problems.accept(field, hosts, "Invalid host specification");
++count;
}
return count;
}
private static int validateConnectionString(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
try {
if (value != null) {
ConnectionString cs = new ConnectionString(value);
}
}
catch (Exception e) {
problems.accept(field, value, "Connection string is invalid");
return 1;
}
return 0;
}
@ -638,10 +638,23 @@ private static int validateChangeStreamPipeline(Configuration config, Field fiel
return 0;
}
private static int validateAutodiscovery(Configuration config, Field field, ValidationOutput problems) {
boolean value = config.getBoolean(field);
if (!value && config.hasKey(CONNECTION_STRING)) {
problems.accept(field, value, "Connection string requires autodiscovery");
private static int validateConnectionString(Configuration config, Field field, ValidationOutput problems) {
String connectionStringValue = config.getString(field);
String hostValue = config.getString(HOSTS);
if (connectionStringValue == null) {
if (hostValue == null) {
problems.accept(field, null, "Missing connection string");
return 1;
}
return 0;
}
try {
ConnectionString cs = new ConnectionString(connectionStringValue);
}
catch (Exception e) {
problems.accept(field, connectionStringValue, "Invalid connection string");
return 1;
}
return 0;

View File

@ -13,11 +13,11 @@
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
@ -174,10 +174,11 @@ private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig connectorC
}
private ReplicaSets getReplicaSets(Configuration config) {
final String hosts = config.getString(MongoDbConnectorConfig.HOSTS);
final ReplicaSets replicaSets = ReplicaSets.parse(hosts);
if (replicaSets.validReplicaSetCount() == 0) {
throw new ConnectException("Unable to start MongoDB connector task since no replica sets were found at " + hosts);
var replicas = config.getList(MongoDbConnectorConfig.REPLICA_SETS, ";", ReplicaSet::new);
final ReplicaSets replicaSets = new ReplicaSets(replicas);
if (replicaSets.size() == 0) {
throw new DebeziumException("Unable to start MongoDB connector task since no replica sets were found");
}
return replicaSets;
}

View File

@ -14,7 +14,6 @@
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.Document;
import org.slf4j.Logger;
@ -26,7 +25,6 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
@ -71,8 +69,9 @@ public class MongoDbIncrementalSnapshotChangeEventSource
protected EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
protected IncrementalSnapshotContext<CollectionId> context = null;
protected final Map<Struct, Object[]> window = new LinkedHashMap<>();
private MongoPreferredNode primary;
private MongoPreferredNode secondary;
private RetryingMongoClient primary;
private RetryingMongoClient secondary;
private CollectionId signallingCollectionId;
public MongoDbIncrementalSnapshotChangeEventSource(MongoDbConnectorConfig config,
@ -260,7 +259,7 @@ protected void readChunk(MongoDbPartition partition) throws InterruptedException
final CollectionId currentDataCollectionId = context.currentDataCollectionId().getId();
currentCollection = (MongoDbCollectionSchema) collectionSchema.schemaFor(currentDataCollectionId);
if (replicaSets.all().size() > 1) {
LOGGER.warn("Incremental snapshotting supported only for single result set topology, skipping collection '{}', known collections {}",
LOGGER.warn("Incremental snapshotting supported only for single replica set topology, skipping collection '{}', known collections {}",
currentDataCollectionId);
nextDataCollection(partition);
continue;
@ -562,16 +561,16 @@ public void processMessage(MongoDbPartition partition, DataCollectionId dataColl
}
}
private MongoPreferredNode establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) {
return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> {
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReadPreference preference, ReplicaSet replicaSet) {
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
// propagate authorization failures
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchConnectorEvent(partition, new DisconnectEvent());
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
});
}

View File

@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
@ -32,8 +31,8 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -108,7 +107,6 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
final ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, taskContext.serverName(), "replicator-snapshot", threads);
final CountDownLatch latch = new CountDownLatch(threads);
LOGGER.info("Ignoring unnamed replica sets: {}", replicaSets.unnamedReplicaSets());
LOGGER.info("Starting {} thread(s) to snapshot replica sets: {}", threads, replicaSetsToSnapshot);
LOGGER.info("Snapshot step 3 - Snapshotting data");
@ -151,7 +149,7 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
}
finally {
LOGGER.info("Stopping mongodb connections");
taskContext.getConnectionContext().shutdown();
taskContext.getConnectionContext().close();
}
if (aborted.get()) {
@ -183,7 +181,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) previousOffset;
try {
replicaSets.onEachReplicaSet(replicaSet -> {
MongoPreferredNode mongo = null;
RetryingMongoClient mongo = null;
try {
mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
@ -199,7 +197,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
});
}
finally {
taskContext.getConnectionContext().shutdown();
taskContext.getConnectionContext().close();
}
return new MongoDbSnapshottingTask(replicaSetSnapshots);
@ -212,7 +210,7 @@ protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoD
}
private void snapshotReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext ctx, ReplicaSet replicaSet) throws InterruptedException {
MongoPreferredNode mongo = null;
RetryingMongoClient mongo = null;
try {
mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.secondaryPreferred());
if (mongo != null) {
@ -226,21 +224,21 @@ private void snapshotReplicaSet(ChangeEventSourceContext sourceContext, MongoDbS
}
}
private MongoPreferredNode establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> {
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
// propagate authorization failures
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchConnectorEvent(partition, new DisconnectEvent());
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
});
}
private boolean isSnapshotExpected(MongoPreferredNode mongo, ReplicaSetOffsetContext offsetContext) {
private boolean isSnapshotExpected(RetryingMongoClient mongo, ReplicaSetOffsetContext offsetContext) {
boolean performSnapshot = true;
if (offsetContext.hasOffset()) {
if (LOGGER.isInfoEnabled()) {
@ -291,7 +289,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
replicaSets.onEachReplicaSet(replicaSet -> {
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
MongoPreferredNode mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred());
RetryingMongoClient mongo = establishConnection(ctx.partition, replicaSet, ReadPreference.primaryPreferred());
if (mongo != null) {
try {
mongo.execute("get oplog position", client -> {
@ -310,7 +308,7 @@ protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets
}
private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet,
MongoPreferredNode mongo)
RetryingMongoClient mongo)
throws InterruptedException {
SnapshotReceiver<MongoDbPartition> snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
snapshotContext.offset.preSnapshotStart();
@ -328,7 +326,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, MongoDbSna
private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, MongoPreferredNode mongo)
ReplicaSet replicaSet, RetryingMongoClient mongo)
throws InterruptedException {
final String rsName = replicaSet.replicaSetName();
@ -434,7 +432,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
private void createDataEventsForCollection(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, CollectionId collectionId, MongoPreferredNode mongo)
ReplicaSet replicaSet, CollectionId collectionId, RetryingMongoClient mongo)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();

View File

@ -14,7 +14,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
@ -22,7 +21,6 @@
import org.slf4j.LoggerFactory;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
@ -30,7 +28,7 @@
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.connector.mongodb.ConnectionContext.MongoPreferredNode;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -78,7 +76,7 @@ public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig,
@Override
public void execute(ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext)
throws InterruptedException {
final List<ReplicaSet> validReplicaSets = replicaSets.validReplicaSets();
final List<ReplicaSet> validReplicaSets = replicaSets.all();
if (offsetContext == null) {
offsetContext = initializeOffsets(connectorConfig, partition, replicaSets);
@ -95,17 +93,17 @@ else if (validReplicaSets.size() > 1) {
}
}
finally {
taskContext.getConnectionContext().shutdown();
taskContext.getConnectionContext().close();
}
}
private void streamChangesForReplicaSet(ChangeEventSourceContext context, MongoDbPartition partition,
ReplicaSet replicaSet, MongoDbOffsetContext offsetContext) {
MongoPreferredNode mongo = null;
RetryingMongoClient mongo = null;
try {
mongo = establishConnection(partition, replicaSet, ReadPreference.secondaryPreferred());
if (mongo != null) {
final AtomicReference<MongoPreferredNode> mongoReference = new AtomicReference<>(mongo);
final AtomicReference<RetryingMongoClient> mongoReference = new AtomicReference<>(mongo);
mongo.execute("read from change stream on '" + replicaSet + "'", client -> {
readChangeStream(client, mongoReference.get(), replicaSet, context, offsetContext);
});
@ -152,21 +150,21 @@ private void streamChangesForReplicaSets(ChangeEventSourceContext context, Mongo
executor.shutdown();
}
private MongoPreferredNode establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
return connectionContext.preferredFor(replicaSet, preference, taskContext.filters(), (desc, error) -> {
private RetryingMongoClient establishConnection(MongoDbPartition partition, ReplicaSet replicaSet, ReadPreference preference) {
return connectionContext.connect(replicaSet, preference, taskContext.filters(), (desc, error) -> {
// propagate authorization failures
if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
else {
dispatcher.dispatchConnectorEvent(partition, new DisconnectEvent());
LOGGER.error("Error while attempting to {}: {}", desc, error.getMessage(), error);
throw new ConnectException("Error while attempting to " + desc, error);
throw new DebeziumException("Error while attempting to " + desc, error);
}
});
}
private void readChangeStream(MongoClient client, MongoPreferredNode mongo, ReplicaSet replicaSet, ChangeEventSourceContext context,
private void readChangeStream(MongoClient client, RetryingMongoClient mongo, ReplicaSet replicaSet, ChangeEventSourceContext context,
MongoDbOffsetContext offsetContext) {
final ReplicaSetPartition rsPartition = offsetContext.getReplicaSetPartition(replicaSet);
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
@ -175,8 +173,7 @@ private void readChangeStream(MongoClient client, MongoPreferredNode mongo, Repl
ReplicaSetChangeStreamsContext oplogContext = new ReplicaSetChangeStreamsContext(rsPartition, rsOffsetContext, mongo, replicaSet);
final ServerAddress nodeAddress = MongoUtil.getPreferredAddress(client, mongo.getPreference());
LOGGER.info("Reading change stream for '{}'/{} from {} starting at {}", replicaSet, mongo.getPreference().getName(), nodeAddress, oplogStart);
LOGGER.info("Reading change stream for '{}' starting at {}", replicaSet, oplogStart);
final ChangeStreamPipeline pipeline = new ChangeStreamPipelineFactory(rsOffsetContext, taskContext.getConnectorConfig(), taskContext.filters().getConfig())
.create();
@ -271,7 +268,7 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
final Map<ReplicaSet, BsonDocument> positions = new LinkedHashMap<>();
replicaSets.onEachReplicaSet(replicaSet -> {
LOGGER.info("Determine Snapshot Offset for replica-set {}", replicaSet.replicaSetName());
MongoPreferredNode mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
RetryingMongoClient mongo = establishConnection(partition, replicaSet, ReadPreference.primaryPreferred());
if (mongo != null) {
try {
mongo.execute("get oplog position", client -> {
@ -295,11 +292,11 @@ protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connecto
private static class ReplicaSetChangeStreamsContext {
private final ReplicaSetPartition partition;
private final ReplicaSetOffsetContext offset;
private final MongoPreferredNode mongo;
private final RetryingMongoClient mongo;
private final ReplicaSet replicaSet;
ReplicaSetChangeStreamsContext(ReplicaSetPartition partition, ReplicaSetOffsetContext offsetContext,
MongoPreferredNode mongo, ReplicaSet replicaSet) {
RetryingMongoClient mongo, ReplicaSet replicaSet) {
this.partition = partition;
this.offset = offsetContext;
this.mongo = mongo;
@ -314,7 +311,7 @@ ReplicaSetOffsetContext getOffset() {
return offset;
}
MongoPreferredNode getMongo() {
RetryingMongoClient getMongo() {
return mongo;
}

View File

@ -19,7 +19,6 @@
import org.slf4j.Logger;
import com.mongodb.MongoQueryException;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
@ -31,7 +30,6 @@
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import io.debezium.DebeziumException;
import io.debezium.function.BlockingConsumer;
import io.debezium.util.Strings;
@ -350,27 +348,6 @@ protected static String toString(List<ServerAddress> addresses) {
return Strings.join(ADDRESS_DELIMITER, addresses);
}
protected static ServerAddress getPreferredAddress(MongoClient client, ReadPreference preference) {
ClusterDescription clusterDescription = clusterDescription(client);
if (!clusterDescription.hasReadableServer(preference)) {
throw new DebeziumException("Unable to use cluster description from MongoDB connection: " + clusterDescription);
}
List<ServerDescription> serverDescriptions = preference.choose(clusterDescription);
if (serverDescriptions.size() == 0) {
throw new DebeziumException("Unable to read server descriptions from MongoDB connection (Null or empty list).");
}
Optional<ServerDescription> preferredDescription = serverDescriptions.stream().findFirst();
return preferredDescription
.map(ServerDescription::getAddress)
.map(address -> new ServerAddress(address.getHost(), address.getPort()))
.orElseThrow(() -> new DebeziumException("Unable to find primary from MongoDB connection, got '" + serverDescriptions + "'"));
}
/**
* Retrieves cluster description, forcing a connection if not yet available
*
@ -389,6 +366,15 @@ public static ClusterDescription clusterDescription(MongoClient client) {
return description;
}
public static Optional<String> replicaSetName(ClusterDescription clusterDescription) {
var servers = clusterDescription.getServerDescriptions();
return servers.stream()
.map(ServerDescription::getSetName)
.filter(Objects::nonNull)
.findFirst();
}
private MongoUtil() {
}
}

View File

@ -5,74 +5,31 @@
*/
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.mongodb.ServerAddress;
import com.mongodb.ConnectionString;
import io.debezium.annotation.Immutable;
@Immutable
public final class ReplicaSet implements Comparable<ReplicaSet> {
/**
* Regular expression that extracts the hosts for the replica sets. The raw expression is
* {@code ((([^=]+)[=])?(([^/]+)\/))?(.+)}.
*/
private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
/**
* Parse the supplied string for the information about the hosts for a replica set. The string is a shard host
* specification (e.g., "{@code shard01=replicaSet1/host1:27017,host2:27017}"), replica set hosts (e.g.,
* "{@code replicaSet1/host1:27017,host2:27017}"), or standalone host (e.g., "{@code host1:27017}" or
* "{@code 1.2.3.4:27017}").
*
* @param hosts the hosts string; may be null
* @return the replica set; or {@code null} if the host string could not be parsed
*/
public static ReplicaSet parse(String hosts) {
if (hosts != null) {
Matcher matcher = HOST_PATTERN.matcher(hosts);
if (matcher.matches()) {
String shard = matcher.group(3);
String replicaSetName = matcher.group(5);
String host = matcher.group(6);
if (host != null && host.trim().length() != 0) {
return new ReplicaSet(host, replicaSetName, shard);
}
}
}
return null;
}
private final List<ServerAddress> addresses;
private final String replicaSetName;
private final String shardName;
private final ConnectionString connectionString;
private final int hc;
public ReplicaSet(List<ServerAddress> addresses, String replicaSetName, String shardName) {
this.addresses = new ArrayList<>(addresses);
this.addresses.sort(ReplicaSet::compareServerAddresses);
this.replicaSetName = replicaSetName != null ? replicaSetName.trim() : null;
this.shardName = shardName != null ? shardName.trim() : null;
this.hc = addresses.hashCode();
public ReplicaSet(String connectionString) {
this(new ConnectionString(connectionString));
}
public ReplicaSet(String addresses, String replicaSetName, String shardName) {
this(MongoUtil.parseAddresses(addresses), replicaSetName, shardName);
public ReplicaSet(ConnectionString connectionString) {
this(connectionString.getRequiredReplicaSetName(), connectionString);
}
/**
* Get the immutable list of server addresses.
*
* @return the server addresses; never null
*/
public List<ServerAddress> addresses() {
return addresses;
private ReplicaSet(String replicaSetName, ConnectionString connectionString) {
this.connectionString = Objects.requireNonNull(connectionString, "Connection string cannot be null");
this.replicaSetName = Objects.requireNonNull(replicaSetName, "Replica set name cannot be null");
this.hc = Objects.hash(connectionString);
}
/**
@ -85,33 +42,20 @@ public String replicaSetName() {
}
/**
* Get the shard name for this replica set.
* Get connection string
*
* @return the shard name, or {@code null} if this replica set is not used as a shard
* @return connection string for this replica set
*/
public String shardName() {
return shardName;
}
/**
* Return whether the address(es) represent a standalone server, where the {@link #replicaSetName() replica set name} is
* {@code null}. This method returns the opposite of {@link #hasReplicaSetName()}.
*
* @return {@code true} if this represents the address of a standalone server, or {@code false} if it represents the
* address of a replica set
* @see #hasReplicaSetName()
*/
public boolean isStandaloneServer() {
return replicaSetName == null;
public ConnectionString connectionString() {
return connectionString;
}
/**
* Return whether the address(es) represents a replica set, where the {@link #replicaSetName() replica set name} is
* not {@code null}. This method returns the opposite of {@link #isStandaloneServer()}.
* not {@code null}.
*
* @return {@code true} if this represents the address of a replica set, or {@code false} if it represents the
* address of a standalone server
* @see #isStandaloneServer()
*/
public boolean hasReplicaSetName() {
return replicaSetName != null;
@ -129,8 +73,7 @@ public boolean equals(Object obj) {
}
if (obj instanceof ReplicaSet) {
ReplicaSet that = (ReplicaSet) obj;
return Objects.equals(this.shardName, that.shardName) && Objects.equals(this.replicaSetName, that.replicaSetName) &&
this.addresses.equals(that.addresses);
return Objects.equals(this.replicaSetName, that.replicaSetName);
}
return false;
}
@ -140,82 +83,13 @@ public int compareTo(ReplicaSet that) {
if (that == this) {
return 0;
}
int diff = compareNullable(this.shardName, that.shardName);
if (diff != 0) {
return diff;
}
diff = compareNullable(this.replicaSetName, that.replicaSetName);
if (diff != 0) {
return diff;
}
Iterator<ServerAddress> thisIter = this.addresses.iterator();
Iterator<ServerAddress> thatIter = that.addresses.iterator();
while (thisIter.hasNext() && thatIter.hasNext()) {
diff = compare(thisIter.next(), thatIter.next());
if (diff != 0) {
return diff;
}
}
if (thisIter.hasNext()) {
return 1;
}
if (thatIter.hasNext()) {
return -1;
}
return 0;
return replicaSetName.compareTo(that.replicaSetName);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (this.shardName != null && !this.shardName.isEmpty()) {
sb.append(shardName).append('=');
}
if (this.replicaSetName != null && !this.replicaSetName.isEmpty()) {
sb.append(replicaSetName).append('/');
}
Iterator<ServerAddress> iter = addresses.iterator();
if (iter.hasNext()) {
sb.append(MongoUtil.toString(iter.next()));
}
while (iter.hasNext()) {
sb.append(',').append(MongoUtil.toString(iter.next()));
}
return sb.toString();
}
protected static int compareServerAddresses(ServerAddress one, ServerAddress two) {
if (one == two) {
return 0;
}
if (one == null) {
return two == null ? 0 : -1;
}
if (two == null) {
return 1;
}
return compare(one, two);
}
protected static int compareNullable(String str1, String str2) {
if (str1 == str2) {
return 0;
}
if (str1 == null) {
return str2 == null ? 0 : -1;
}
if (str2 == null) {
return 1;
}
return str1.compareTo(str2);
}
protected static int compare(ServerAddress address1, ServerAddress address2) {
int diff = address1.getHost().compareTo(address2.getHost());
if (diff != 0) {
return diff;
}
return address1.getPort() - address2.getPort();
return replicaSetName;
}
}

View File

@ -6,21 +6,16 @@
package io.debezium.connector.mongodb;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerConnectionState;
import com.mongodb.connection.ServerDescription;
import io.debezium.annotation.ThreadSafe;
@ -65,21 +60,26 @@ public ReplicaSetDiscovery(MongoDbTaskContext context) {
*/
public ReplicaSets getReplicaSets() {
ConnectionContext connectionContext = context.getConnectionContext();
MongoClient client = connectionContext.clientForSeedConnection();
MongoClient client = connectionContext.connect();
Set<ReplicaSet> replicaSetSpecs = new HashSet<>();
final ClusterDescription clusterDescription = MongoUtil.clusterDescription(client);
if (clusterDescription.getType() == ClusterType.SHARDED) {
// First see if the addresses are for a config server replica set ...
LOGGER.info("Cluster at {} identified as sharded cluster", maskedConnectionSeed);
// Gather connection details to each shard ...
String shardsCollection = "shards";
try {
MongoUtil.onCollectionDocuments(client, CONFIG_DATABASE_NAME, shardsCollection, doc -> {
LOGGER.info("Checking shard details from configuration replica set {}", maskedConnectionSeed);
String shardName = doc.getString("_id");
String hostStr = doc.getString("host");
String replicaSetName = MongoUtil.replicaSetUsedIn(hostStr);
replicaSetSpecs.add(new ReplicaSet(hostStr, replicaSetName, shardName));
LOGGER.info("Reading shard details for {}", shardName);
ConnectionStrings.parseFromHosts(hostStr).ifPresentOrElse(
cs -> replicaSetSpecs.add(new ReplicaSet(cs)),
() -> LOGGER.info("Shard {} is not a valid replica set", shardName));
});
}
catch (MongoInterruptedException e) {
@ -94,17 +94,30 @@ public ReplicaSets getReplicaSets() {
}
if (clusterDescription.getType() == ClusterType.REPLICA_SET) {
LOGGER.info("Checking current members of replica set at {}", maskedConnectionSeed);
final List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions().stream()
.filter(x -> x.getState() == ServerConnectionState.CONNECTED).collect(Collectors.toList());
if (serverDescriptions.size() == 0) {
LOGGER.warn("Server descriptions not available, got '{}'", serverDescriptions);
}
else {
List<ServerAddress> addresses = serverDescriptions.stream().map(ServerDescription::getAddress).collect(Collectors.toList());
String replicaSetName = serverDescriptions.get(0).getSetName();
replicaSetSpecs.add(new ReplicaSet(addresses, replicaSetName, null));
LOGGER.info("Cluster at '{}' identified as replicaSet", maskedConnectionSeed);
var connectionString = connectionContext.connectionSeed();
var cs = connectionContext.connectionString();
if (cs.getRequiredReplicaSetName() == null) {
// Java driver is smart enough to connect correctly
// However replicaSet parameter is mandatory, and we need the name for offset storage
LOGGER.warn("Replica set not specified in '{}'", maskedConnectionSeed);
LOGGER.warn("Parameter 'replicaSet' should be added to connection string");
LOGGER.warn("Trying to determine replica set name for '{}'", maskedConnectionSeed);
var rsName = MongoUtil.replicaSetName(clusterDescription);
if (rsName.isPresent()) {
LOGGER.info("Found '{}' replica set for '{}'", rsName.get(), maskedConnectionSeed);
connectionString = ConnectionStrings.appendParameter(connectionString, "replicaSet", rsName.get());
}
else {
LOGGER.warn("Unable to find replica set name for '{}'", maskedConnectionSeed);
}
}
LOGGER.info("Using '{}' as replica set connection string", ConnectionStrings.mask(connectionString));
replicaSetSpecs.add(new ReplicaSet(connectionString));
}
if (replicaSetSpecs.isEmpty()) {

View File

@ -6,17 +6,11 @@
package io.debezium.connector.mongodb;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.util.ConnectorUtils;
@ -31,37 +25,6 @@
@Immutable
public class ReplicaSets {
private static final Pattern REPLICA_DELIMITER_PATTERN = Pattern.compile(";");
/**
* Parse the supplied string for the information about the replica set hosts. The string is a semicolon-delimited list of
* shard hosts (e.g., "{@code shard01=replicaSet1/host1:27017,host2:27017}"), replica set hosts (e.g.,
* "{@code replicaSet1/host1:27017,host2:27017}"), and standalone hosts (e.g., "{@code host1:27017}" or
* "{@code 1.2.3.4:27017}").
*
* @param hosts the hosts string; may be null
* @return the replica sets; never null but possibly empty
* @see ReplicaSets#hosts()
*/
public static ReplicaSets parse(String hosts) {
Set<ReplicaSet> replicaSets = splitHosts(hosts).stream()
.map(ReplicaSet::parse)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
return new ReplicaSets(replicaSets);
}
public static List<String> splitHosts(String hosts) {
if (hosts == null) {
return List.of();
}
return Stream.of(REPLICA_DELIMITER_PATTERN
.split(hosts.trim()))
.filter(h -> !h.isBlank())
.collect(Collectors.toList());
}
/**
* Get an instance that contains no replica sets.
*
@ -71,8 +34,11 @@ public static ReplicaSets empty() {
return new ReplicaSets(null);
}
private final Map<String, ReplicaSet> replicaSetsByName = new HashMap<>();
private final List<ReplicaSet> nonReplicaSets = new ArrayList<>();
public static ReplicaSets of(ReplicaSet... replicaSets) {
return new ReplicaSets(Arrays.asList(replicaSets));
}
private final List<ReplicaSet> replicaSets = new ArrayList<>();
/**
* Create a set of replica set specifications.
@ -81,16 +47,9 @@ public static ReplicaSets empty() {
*/
public ReplicaSets(Collection<ReplicaSet> rsSpecs) {
if (rsSpecs != null) {
rsSpecs.forEach(replicaSet -> {
if (replicaSet.hasReplicaSetName()) {
replicaSetsByName.put(replicaSet.replicaSetName(), replicaSet);
}
else {
nonReplicaSets.add(replicaSet);
}
});
replicaSets.addAll(rsSpecs);
}
Collections.sort(nonReplicaSets);
Collections.sort(replicaSets);
}
/**
@ -98,17 +57,8 @@ public ReplicaSets(Collection<ReplicaSet> rsSpecs) {
*
* @return the replica set count
*/
public int replicaSetCount() {
return replicaSetsByName.size() + nonReplicaSets.size();
}
/**
* Get the number of replica sets with names.
*
* @return the valid replica set count
*/
public int validReplicaSetCount() {
return replicaSetsByName.size();
public int size() {
return replicaSets.size();
}
/**
@ -117,8 +67,7 @@ public int validReplicaSetCount() {
* @param function the consumer function; may not be null
*/
public void onEachReplicaSet(Consumer<ReplicaSet> function) {
this.replicaSetsByName.values().forEach(function);
this.nonReplicaSets.forEach(function);
this.replicaSets.forEach(function);
}
/**
@ -128,15 +77,10 @@ public void onEachReplicaSet(Consumer<ReplicaSet> function) {
* @param subdivisionConsumer the function to be called with each subdivision; may not be null
*/
public void subdivide(int maxSubdivisionCount, Consumer<ReplicaSets> subdivisionConsumer) {
int numGroups = Math.min(replicaSetCount(), maxSubdivisionCount);
if (numGroups <= 1) {
// Just one replica set or subdivision ...
subdivisionConsumer.accept(this);
return;
}
ConnectorUtils.groupPartitions(all(), numGroups).forEach(rsList -> {
subdivisionConsumer.accept(new ReplicaSets(rsList));
});
int numGroups = Math.min(size(), maxSubdivisionCount);
ConnectorUtils.groupPartitions(all(), numGroups).stream()
.map(ReplicaSets::new)
.forEach(subdivisionConsumer);
}
/**
@ -145,32 +89,7 @@ public void subdivide(int maxSubdivisionCount, Consumer<ReplicaSets> subdivision
* @return the replica set objects; never null but possibly empty
*/
public List<ReplicaSet> all() {
List<ReplicaSet> replicaSets = new ArrayList<>();
replicaSets.addAll(replicaSetsByName.values());
replicaSets.addAll(nonReplicaSets);
return replicaSets;
}
/**
* Get a copy of all of the valid {@link ReplicaSet} objects that have names.
*
* @return the valid replica set objects; never null but possibly empty
*/
public List<ReplicaSet> validReplicaSets() {
List<ReplicaSet> replicaSets = new ArrayList<>();
replicaSets.addAll(replicaSetsByName.values());
return replicaSets;
}
/**
* Get a copy of all of the {@link ReplicaSet} objects that have no names.
*
* @return the unnamed replica set objects; never null but possibly empty
*/
public List<ReplicaSet> unnamedReplicaSets() {
List<ReplicaSet> replicaSets = new ArrayList<>();
replicaSets.addAll(nonReplicaSets);
return replicaSets;
return new ArrayList<>(this.replicaSets);
}
/**
@ -180,40 +99,12 @@ public List<ReplicaSet> unnamedReplicaSets() {
* @return {@code true} if the replica sets have changed since the prior state, or {@code false} otherwise
*/
public boolean haveChangedSince(ReplicaSets priorState) {
if (priorState.replicaSetCount() != this.replicaSetCount()) {
// At least one replica set has been added or removed ...
return true;
}
if (this.replicaSetsByName.size() != priorState.replicaSetsByName.size()) {
// The total number of replica sets hasn't changed, but the number of named replica sets has changed ...
return true;
}
// We have the same number of named replica sets ...
if (!this.replicaSetsByName.isEmpty()) {
if (!this.replicaSetsByName.keySet().equals(priorState.replicaSetsByName.keySet())) {
// The replica sets have different names ...
return true;
}
// Otherwise, they have the same names and we don't care about the members ...
}
// None of the named replica sets has changed, so we have no choice to be compare the non-replica set members ...
return this.nonReplicaSets.equals(priorState.nonReplicaSets) ? false : true;
}
/**
* Get the string containing the host names for the replica sets. The result is a string with each replica set hosts
* separated by a semicolon.
*
* @return the host names; never null
* @see #parse(String)
*/
public String hosts() {
return Strings.join(";", all());
return !this.replicaSets.equals(priorState.replicaSets);
}
@Override
public int hashCode() {
return Objects.hash(replicaSetsByName, nonReplicaSets);
return replicaSets.hashCode();
}
@Override
@ -223,14 +114,14 @@ public boolean equals(Object obj) {
}
if (obj instanceof ReplicaSets) {
ReplicaSets that = (ReplicaSets) obj;
return this.replicaSetsByName.equals(that.replicaSetsByName) && this.nonReplicaSets.equals(that.nonReplicaSets);
return this.replicaSets.equals(that.replicaSets);
}
return false;
}
@Override
public String toString() {
return hosts();
return Strings.join(";", all());
}
}

View File

@ -0,0 +1,189 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import io.debezium.DebeziumException;
import io.debezium.function.BlockingConsumer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
/**
* Client scoped to specified replica set.
* Internally this wrapper attempts to obtain regular {@link MongoClient} instance using given back-off strategy
*/
public class RetryingMongoClient {
/**
* A pause between failed MongoDB operations to prevent CPU throttling and DoS of
* target MongoDB database.
*/
private static final Duration PAUSE_AFTER_ERROR = Duration.ofMillis(500);
private final Filters filters;
private final BiConsumer<String, Throwable> errorHandler;
private final AtomicBoolean running = new AtomicBoolean(true);
private final String replicaSetName;
private final Supplier<MongoClient> connectionSupplier;
protected RetryingMongoClient(ReplicaSet replicaSet,
ReadPreference readPreference,
BiFunction<ReplicaSet, ReadPreference, MongoClient> connectionSupplier,
Filters filters,
BiConsumer<String, Throwable> errorHandler) {
this.replicaSetName = replicaSet.replicaSetName();
this.connectionSupplier = () -> connectionSupplier.apply(replicaSet, readPreference);
this.filters = filters;
this.errorHandler = errorHandler;
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type.
*/
public void execute(String desc, Consumer<MongoClient> operation) {
execute(desc, client -> {
operation.accept(client);
return null;
});
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type
* @return return value of the executed operation
*/
public <T> T execute(String desc, Function<MongoClient, T> operation) {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
MongoClient client = connectionSupplier.get();
while (true) {
try {
return operation.apply(client);
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t);
}
try {
errorMetronome.pause();
}
catch (InterruptedException e) {
// Interruption is not propagated
}
}
}
}
/**
* Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
*
* @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on a node of preferred type.
* @throws InterruptedException if the operation was interrupted
*/
public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) {
MongoClient client = connectionSupplier.get();
try {
operation.accept(client);
return;
}
catch (InterruptedException e) {
throw e;
}
catch (Throwable t) {
errorHandler.accept(desc, t);
if (!isRunning()) {
throw new DebeziumException("Operation failed and MongoDB connection '" + replicaSetName + "' termination requested", t);
}
errorMetronome.pause();
}
}
}
/**
* Use a node of preferred type to get the names of all the databases in the replica set, applying the current database
* filter configuration. This method will block until a node of preferred type can be obtained to get the names of all
* databases in the replica set.
*
* @return the database names; never null but possibly empty
*/
public Set<String> databaseNames() {
return execute("get database names", client -> {
Set<String> databaseNames = new HashSet<>();
MongoUtil.forEachDatabaseName(
client,
dbName -> {
if (filters.databaseFilter().test(dbName)) {
databaseNames.add(dbName);
}
});
return databaseNames;
});
}
/**
* Use a node of preferred type to get the identifiers of all the collections in the replica set, applying the current
* collection filter configuration. This method will block until a primary can be obtained to get the
* identifiers of all collections in the replica set.
*
* @return the collection identifiers; never null
*/
public List<CollectionId> collections() {
// For each database, get the list of collections ...
return execute("get collections in databases", client -> {
List<CollectionId> collections = new ArrayList<>();
Set<String> databaseNames = databaseNames();
for (String dbName : databaseNames) {
MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> {
CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName);
if (filters.collectionFilter().test(collectionId)) {
collections.add(collectionId);
}
});
}
return collections;
});
}
private boolean isRunning() {
return running.get();
}
/**
* Terminates the execution loop of the current primary
*/
public void stop() {
running.set(false);
}
}

View File

@ -14,7 +14,6 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.types.BSONTimestamp;
@ -22,6 +21,7 @@
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.SnapshotRecord;
@ -367,7 +367,7 @@ public boolean hasOffset(String replicaSetName) {
* @param replicaSetName the name of the replica set name for which the new offset is to be obtained; may not be null
* @param sourceOffset the previously-recorded Kafka Connect source offset; may be null
* @return {@code true} if the offset was recorded, or {@code false} if the source offset is null
* @throws ConnectException if any offset parameter values are missing, invalid, or of the wrong type
* @throws DebeziumException if any offset parameter values are missing, invalid, or of the wrong type
*/
public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset) {
if (replicaSetName == null) {
@ -402,7 +402,7 @@ public boolean setOffsetFor(String replicaSetName, Map<String, ?> sourceOffset)
* @param partition the partition information; may not be null
* @param sourceOffset the previously-recorded Kafka Connect source offset; may be null
* @return {@code true} if the offset was recorded, or {@code false} if the source offset is null
* @throws ConnectException if any offset parameter values are missing, invalid, or of the wrong type
* @throws DebeziumException if any offset parameter values are missing, invalid, or of the wrong type
*/
public boolean setOffsetFor(Map<String, String> partition, Map<String, ?> sourceOffset) {
String replicaSetName = partition.get(REPLICA_SET_NAME);
@ -457,7 +457,7 @@ private static int intOffsetValue(Map<String, ?> values, String key) {
return Integer.parseInt(obj.toString());
}
catch (NumberFormatException e) {
throw new ConnectException("Source offset '" + key + "' parameter value " + obj + " could not be converted to an integer");
throw new DebeziumException("Source offset '" + key + "' parameter value " + obj + " could not be converted to an integer");
}
}
@ -473,7 +473,7 @@ private static long longOffsetValue(Map<String, ?> values, String key) {
return Long.parseLong(obj.toString());
}
catch (NumberFormatException e) {
throw new ConnectException("Source offset '" + key + "' parameter value " + obj + " could not be converted to a long");
throw new DebeziumException("Source offset '" + key + "' parameter value " + obj + " could not be converted to a long");
}
}

View File

@ -22,7 +22,6 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.ExtractField;
@ -35,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.FieldNameAdjustmentMode;
import io.debezium.config.Configuration;
@ -385,7 +385,7 @@ public void configure(final Map<String, ?> map) {
ExtractNewRecordStateConfigDefinition.ADD_FIELDS);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
throw new DebeziumException("Unable to validate config.");
}
FieldNameAdjustmentMode fieldNameAdjustmentMode = FieldNameAdjustmentMode.parse(

View File

@ -17,13 +17,13 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.BsonValue;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.ArrayEncoding;
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.FieldNameSelector.FieldNamer;
@ -454,7 +454,7 @@ private void subSchema(SchemaBuilder documentSchemaBuilder, Map<String, BsonType
addFieldSchema(arrayDoc, documentSchemaBuilder);
}
else if (prevType != arrayDoc.getValue().getBsonType()) {
throw new ConnectException("Field " + key + " of schema " + documentSchemaBuilder.name()
throw new DebeziumException("Field " + key + " of schema " + documentSchemaBuilder.name()
+ " is not the same type for all documents in the array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
}
@ -470,7 +470,7 @@ private void testType(SchemaBuilder builder, String key, BsonValue value, BsonTy
final String docKey = fieldNamer.fieldNameFor(arrayDoc.getKey());
final BsonType prevType = union.putIfAbsent(docKey, arrayDoc.getValue().getBsonType());
if (prevType != null && prevType != arrayDoc.getValue().getBsonType()) {
throw new ConnectException("Field " + docKey + " of schema " + builder.name()
throw new DebeziumException("Field " + docKey + " of schema " + builder.name()
+ " is not the same type for all documents in the array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
}
@ -486,7 +486,7 @@ else if (valueType == BsonType.ARRAY) {
else {
for (BsonValue element : value.asArray()) {
if (element.getBsonType() != valueType) {
throw new ConnectException("Field " + key + " of schema " + builder.name() + " is not a homogenous array.\n"
throw new DebeziumException("Field " + key + " of schema " + builder.name() + " is not a homogenous array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
}
}

View File

@ -79,7 +79,7 @@ public void afterEach() {
}
finally {
if (context != null) {
context.getConnectionContext().shutdown();
context.getConnectionContext().close();
}
}
}

View File

@ -14,8 +14,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.ReadPreference;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
import io.debezium.util.Testing;
public abstract class AbstractMongoIT extends AbstractBaseMongoIT {
@ -24,8 +25,8 @@ public abstract class AbstractMongoIT extends AbstractBaseMongoIT {
protected Configuration config;
protected MongoDbTaskContext context;
protected MongoPrimary primary;
protected ReplicaSet replicaSet;
protected RetryingMongoClient primary;
@Before
public void beforeEach() {
@ -71,9 +72,9 @@ private void initialize(boolean restartFromBeginning) {
}
context = new MongoDbTaskContext(config);
assertThat(context.getConnectionContext().hosts()).isNotEmpty();
assertThat(context.getConnectionContext().connectionSeed()).isNotEmpty();
replicaSet = ReplicaSet.parse(context.getConnectionContext().hosts());
replicaSet = new ReplicaSet(mongo.getConnectionString());
context.configureLoggingContext(replicaSet.replicaSetName());
// Restore Source position (if there are some) ...
@ -82,14 +83,15 @@ private void initialize(boolean restartFromBeginning) {
}
// Get a connection to the primary ...
primary = context.getConnectionContext().primaryFor(replicaSet, context.filters(), TestHelper.connectionErrorHandler(3));
primary = context.getConnectionContext().connect(
replicaSet, ReadPreference.primary(), context.filters(), TestHelper.connectionErrorHandler(3));
}
@After
public void afterEach() {
if (context != null) {
// close all connections
context.getConnectionContext().shutdown();
context.getConnectionContext().close();
}
}
}

View File

@ -13,7 +13,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
@ -28,6 +27,7 @@
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneOptions;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.util.Testing;
@ -40,7 +40,7 @@ public void setUp() {
TestHelper.cleanDatabase(mongo, "dbC");
}
@Test(expected = ConnectException.class)
@Test(expected = DebeziumException.class)
public void shouldUseSSL() throws InterruptedException, IOException {
// Use the DB configuration to define the connector's configuration ...
useConfiguration(config.edit()

View File

@ -1,92 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient;
import io.debezium.config.Configuration;
/**
* @author Randall Hauch
*
*/
public class MongoClientsIT {
private static List<ServerAddress> addresses;
@BeforeClass
public static void beforeAll() {
Configuration config = TestHelper.getConfiguration();
String host = config.getString(MongoDbConnectorConfig.HOSTS);
addresses = MongoUtil.parseAddresses(host);
}
private MongoClients clients;
@Before
public void beforeEach() {
clients = MongoClients.create().build();
}
@After
public void afterEach() {
if (clients != null) {
try {
clients.clear();
}
finally {
clients = null;
}
}
}
@Test
public void shouldReturnSameInstanceForSameAddress() {
addresses.forEach(address -> {
MongoClient client1 = clients.clientFor(address);
MongoClient client2 = clients.clientFor(address);
assertThat(client1).isSameAs(client2);
MongoClient client3 = clients.clientFor(address.toString());
MongoClient client4 = clients.clientFor(address);
assertThat(client3).isSameAs(client4);
assertThat(client3).isSameAs(client1);
MongoClient client5 = clients.clientFor(address.toString());
MongoClient client6 = clients.clientFor(address.toString());
assertThat(client5).isSameAs(client6);
assertThat(client5).isSameAs(client1);
});
}
@Test
public void shouldReturnSameInstanceForSameAddresses() {
MongoClient client1 = clients.clientForMembers(addresses);
MongoClient client2 = clients.clientForMembers(addresses);
assertThat(client1).isSameAs(client2);
MongoClient client3 = clients.clientForMembers(addresses);
MongoClient client4 = clients.clientForMembers(addresses);
assertThat(client3).isSameAs(client4);
assertThat(client3).isSameAs(client1);
String addressesStr = MongoUtil.toString(addresses);
MongoClient client5 = clients.clientForMembers(addressesStr);
MongoClient client6 = clients.clientForMembers(addressesStr);
assertThat(client5).isSameAs(client6);
assertThat(client5).isSameAs(client1);
}
}

View File

@ -77,7 +77,7 @@ public class MongoDbConnectorIT extends AbstractMongoConnectorIT {
@Test
public void shouldNotStartWithInvalidConfiguration() {
config = Configuration.create()
.with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, "true")
.with(MongoDbConnectorConfig.SSL_ENABLED, "true")
.build();
// we expect the engine will log at least one error, so preface it ...
@ -95,11 +95,10 @@ public void shouldFailToValidateInvalidConfiguration() {
MongoDbConnector connector = new MongoDbConnector();
Config result = connector.validate(config.asMap());
assertConfigurationErrors(result, MongoDbConnectorConfig.HOSTS, 1);
assertConfigurationErrors(result, MongoDbConnectorConfig.CONNECTION_STRING, 1);
assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.USER);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST);
@ -222,7 +221,6 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.USER);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST);
assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST);

View File

@ -50,7 +50,6 @@ private Configuration getConfig(String connectionString, boolean ssl) {
.with(CommonConnectorConfig.TOPIC_PREFIX, "mongo")
.with(MongoDbConnectorConfig.CONNECTION_STRING, connectionString)
.with(MongoDbConnectorConfig.SSL_ENABLED, ssl)
.with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true)
.build();
}

View File

@ -24,7 +24,7 @@ public class MongoDbSchemaIT {
@After
public void afterEach() {
if (taskContext != null) {
taskContext.getConnectionContext().shutdown();
taskContext.getConnectionContext().close();
}
}

View File

@ -21,10 +21,9 @@ public class MongoDbTaskContextTest implements Testing {
@Before
public void setup() {
this.config = Configuration.create()
.with(MongoDbConnectorConfig.HOSTS, "rs0/localhost:27017")
.with(MongoDbConnectorConfig.CONNECTION_STRING, "mongodb://dummy:27017")
.with(MongoDbConnectorConfig.TASK_ID, 42)
.with(MongoDbConnectorConfig.TOPIC_PREFIX, "bistromath")
.with(MongoDbConnectorConfig.HOSTS, "dummy")
.with(MongoDbConnectorConfig.CAPTURE_MODE, MongoDbConnectorConfig.CaptureMode.CHANGE_STREAMS)
.build();
this.context = new MongoDbTaskContext(config);

View File

@ -1,51 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Optional;
import org.junit.Test;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ServerDescription;
/**
* @author Chris Collingwood
*/
public class MongoUtilIT extends AbstractMongoIT {
@Test
public void testGetPrimaryAddress() {
useConfiguration(config.edit()
.with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true)
.build());
Optional<ServerAddress> expectedPrimaryAddress;
try (var client = connect()) {
client.listDatabaseNames().first();
var servers = client.getClusterDescription().getServerDescriptions();
expectedPrimaryAddress = servers.stream()
.filter(ServerDescription::isPrimary)
.findFirst()
.map(ServerDescription::getAddress);
}
assertThat(expectedPrimaryAddress).isPresent();
primary.execute("shouldConnect", mongo -> {
ServerAddress primaryAddress = MongoUtil.getPreferredAddress(mongo, ReadPreference.primary());
assertThat(primaryAddress.getHost()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getHost).get());
assertThat(primaryAddress.getPort()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getPort).get());
});
}
}

View File

@ -14,6 +14,7 @@
import org.junit.Before;
import org.junit.Test;
import com.mongodb.ConnectionString;
import com.mongodb.MongoException;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.ServerAddress;
@ -37,7 +38,7 @@ public void setup() {
connectionContext = mock(ConnectionContext.class);
mongoClient = mock(MongoClient.class);
when(context.getConnectionContext()).thenReturn(connectionContext);
when(connectionContext.clientForSeedConnection()).thenReturn(mongoClient);
when(connectionContext.connect()).thenReturn(mongoClient);
replicaSetDiscovery = new ReplicaSetDiscovery(context);
}
@ -53,6 +54,10 @@ public void shouldGetFirstValidReplicaSetName() {
ServerAddress host1Address = new ServerAddress("host1");
ServerAddress host2Address = new ServerAddress("host2");
var cs = "mongodb://" + host1Address + "," + host1Address;
when(connectionContext.connectionSeed()).thenReturn(cs);
when(connectionContext.connectionString()).thenReturn(new ConnectionString(cs));
List<ServerDescription> serverDescriptions = List.of(
ServerDescription.builder()
.address(host1Address)
@ -73,8 +78,7 @@ public void shouldGetFirstValidReplicaSetName() {
when(mongoClient.getClusterDescription()).thenReturn(clusterDescription);
ReplicaSets replicaSets = replicaSetDiscovery.getReplicaSets();
assertThat(replicaSets.validReplicaSets().size()).isEqualTo(1);
assertThat(replicaSets.validReplicaSets().get(0).replicaSetName()).isEqualTo("my_rs");
assertThat(replicaSets.validReplicaSets().get(0).addresses()).isEqualTo(List.of(host2Address));
assertThat(replicaSets.all().size()).isEqualTo(1);
assertThat(replicaSets.all().get(0).replicaSetName()).isEqualTo("my_rs");
}
}

View File

@ -12,7 +12,9 @@
import org.junit.Test;
import com.mongodb.ServerAddress;
import com.mongodb.ConnectionString;
import io.debezium.connector.mongodb.connection.ReplicaSet;
/**
* @author Randall Hauch
@ -20,259 +22,162 @@
*/
public class ReplicaSetsTest {
private ReplicaSets sets;
private ReplicaSet rs;
@Test
public void shouldHaveNoReplicaSetsInEmptyInstance() {
assertThat(ReplicaSets.empty().replicaSetCount()).isEqualTo(0);
assertThat(ReplicaSets.empty().size()).isEqualTo(0);
}
@Test
public void shouldParseNullHostString() {
assertThat(ReplicaSets.parse(null)).isEqualTo(ReplicaSets.empty());
assertThat(ConnectionStrings.parseFromHosts(null)).isEmpty();
}
@Test
public void shouldParseEmptyHostString() {
assertThat(ReplicaSets.parse("")).isEqualTo(ReplicaSets.empty());
assertThat(ConnectionStrings.parseFromHosts("")).isEmpty();
}
@Test
public void shouldParseBlankHostString() {
assertThat(ReplicaSets.parse(" ")).isEqualTo(ReplicaSets.empty());
assertThat(ConnectionStrings.parseFromHosts("")).isEmpty();
}
@Test
public void shouldParseHostStringWithStandaloneAddress() {
sets = ReplicaSets.parse("localhost:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("localhost:27017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isFalse();
assertThat(rs.isStandaloneServer()).isTrue();
assertThat(rs.replicaSetName()).isNull();
assertThat(rs.shardName()).isNull();
ServerAddress expected = new ServerAddress("localhost", 27017);
assertThat(rs.addresses().size()).isEqualTo(1);
assertThat(rs.addresses()).containsOnly(expected);
public void shouldParseSingleHostStringWithStandaloneAddress() {
var cs = ConnectionStrings.parseFromHosts("localhost:27017");
assertThat(cs).hasValue("mongodb://localhost:27017/");
}
@Test
public void shouldParseHostStringWithStandaloneAddresses() {
sets = ReplicaSets.parse("localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("1.2.3.4:27017,[fe80::601:9bff:feab:ec01]:27017,localhost:27017,localhost:28017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isFalse();
assertThat(rs.isStandaloneServer()).isTrue();
assertThat(rs.replicaSetName()).isNull();
assertThat(rs.shardName()).isNull();
ServerAddress expected1 = new ServerAddress("1.2.3.4", 27017);
ServerAddress expected2 = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
ServerAddress expected3 = new ServerAddress("localhost", 27017);
ServerAddress expected4 = new ServerAddress("localhost", 28017);
assertThat(rs.addresses().size()).isEqualTo(4);
assertThat(rs.addresses()).containsOnly(expected1, expected2, expected3, expected4);
var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017";
var cs = ConnectionStrings.parseFromHosts(hosts);
assertThat(cs).hasValue("mongodb://" + hosts + "/");
}
@Test
public void shouldParseHostStringWithAddressForOneReplicaSet() {
sets = ReplicaSets.parse("myReplicaSet/localhost:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("myReplicaSet/localhost:27017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isNull();
ServerAddress expected = new ServerAddress("localhost", 27017);
assertThat(rs.addresses().size()).isEqualTo(1);
assertThat(rs.addresses()).containsOnly(expected);
public void shouldParseHostStringWithAddressAndReplicaSet() {
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/localhost:27017");
assertThat(cs).hasValue("mongodb://localhost:27017/?replicaSet=myReplicaSet");
}
@Test
public void shouldParseHostStringWithIpv6AddressForOneReplicaSet() {
sets = ReplicaSets.parse("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isNull();
ServerAddress expected = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
assertThat(rs.addresses().size()).isEqualTo(1);
assertThat(rs.addresses()).containsOnly(expected);
public void shouldParseHostStringWithIpv6AddressAndReplicaSet() {
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017");
assertThat(cs).hasValue("mongodb://[fe80::601:9bff:feab:ec01]:27017/?replicaSet=myReplicaSet");
}
@Test
public void shouldParseHostStringWithAddressesForOneReplicaSet() {
sets = ReplicaSets.parse("myReplicaSet/localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("myReplicaSet/1.2.3.4:27017,[fe80::601:9bff:feab:ec01]:27017,localhost:27017,localhost:28017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isNull();
ServerAddress expected1 = new ServerAddress("1.2.3.4", 27017);
ServerAddress expected2 = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
ServerAddress expected3 = new ServerAddress("localhost", 27017);
ServerAddress expected4 = new ServerAddress("localhost", 28017);
assertThat(rs.addresses().size()).isEqualTo(4);
assertThat(rs.addresses()).containsOnly(expected1, expected2, expected3, expected4);
public void shouldParseHostStringWithAddressesAndReplicaSet() {
var hosts = "localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017";
var cs = ConnectionStrings.parseFromHosts("myReplicaSet/" + hosts);
assertThat(cs).hasValue("mongodb://" + hosts + "/?replicaSet=myReplicaSet");
}
@Test
public void shouldParseHostStringWithAddressesForMultipleReplicaSet() {
sets = ReplicaSets.parse("myReplicaSet/host1:27017,[fe80::601:9bff:feab:ec01]:27017;otherReplicaset/1.2.3.4:27017,localhost:28017");
assertThat(sets.replicaSetCount()).isEqualTo(2);
assertThat(sets.hosts()).isEqualTo("myReplicaSet/[fe80::601:9bff:feab:ec01]:27017,host1:27017;otherReplicaset/1.2.3.4:27017,localhost:28017");
public void shouldHaveAttributesFromConnectionString() {
var cs = new ConnectionString("mongodb://localhost:27017/?replicaSet=rs0");
var rs = new ReplicaSet(cs);
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isNull();
ServerAddress expected1 = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
ServerAddress expected2 = new ServerAddress("host1", 27017);
assertThat(rs.addresses().size()).isEqualTo(2);
assertThat(rs.addresses()).containsOnly(expected1, expected2);
rs = sets.all().get(1);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("otherReplicaset");
assertThat(rs.shardName()).isNull();
expected1 = new ServerAddress("1.2.3.4", 27017);
expected2 = new ServerAddress("localhost", 28017);
assertThat(rs.addresses().size()).isEqualTo(2);
assertThat(rs.addresses()).containsOnly(expected1, expected2);
}
@Test
public void shouldParseHostStringWithAddressesForOneShard() {
sets = ReplicaSets.parse("shard1=myReplicaSet/localhost:27017,1.2.3.4:27017,localhost:28017,[fe80::601:9bff:feab:ec01]:27017");
assertThat(sets.replicaSetCount()).isEqualTo(1);
assertThat(sets.hosts()).isEqualTo("shard1=myReplicaSet/1.2.3.4:27017,[fe80::601:9bff:feab:ec01]:27017,localhost:27017,localhost:28017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isEqualTo("shard1");
ServerAddress expected1 = new ServerAddress("1.2.3.4", 27017);
ServerAddress expected2 = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
ServerAddress expected3 = new ServerAddress("localhost", 27017);
ServerAddress expected4 = new ServerAddress("localhost", 28017);
assertThat(rs.addresses().size()).isEqualTo(4);
assertThat(rs.addresses()).containsOnly(expected1, expected2, expected3, expected4);
}
@Test
public void shouldParseHostStringWithAddressesForMultipleShard() {
sets = ReplicaSets.parse("shard1=myReplicaSet/host1:27017,[fe80::601:9bff:feab:ec01]:27017;shard2=otherReplicaset/1.2.3.4:27017,localhost:28017");
assertThat(sets.replicaSetCount()).isEqualTo(2);
assertThat(sets.hosts()).isEqualTo("shard1=myReplicaSet/[fe80::601:9bff:feab:ec01]:27017,host1:27017;shard2=otherReplicaset/1.2.3.4:27017,localhost:28017");
rs = sets.all().get(0);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("myReplicaSet");
assertThat(rs.shardName()).isEqualTo("shard1");
ServerAddress expected1 = new ServerAddress("[fe80::601:9bff:feab:ec01]", 27017);
ServerAddress expected2 = new ServerAddress("host1", 27017);
assertThat(rs.addresses().size()).isEqualTo(2);
assertThat(rs.addresses()).containsOnly(expected1, expected2);
rs = sets.all().get(1);
assertThat(rs.hasReplicaSetName()).isTrue();
assertThat(rs.isStandaloneServer()).isFalse();
assertThat(rs.replicaSetName()).isEqualTo("otherReplicaset");
assertThat(rs.shardName()).isEqualTo("shard2");
expected1 = new ServerAddress("1.2.3.4", 27017);
expected2 = new ServerAddress("localhost", 28017);
assertThat(rs.addresses().size()).isEqualTo(2);
assertThat(rs.addresses()).containsOnly(expected1, expected2);
assertThat(rs.replicaSetName()).isEqualTo("rs0");
assertThat(rs.connectionString()).isEqualTo(cs);
}
@Test
public void shouldConsiderUnchangedSameInstance() {
sets = ReplicaSets.parse("localhost:27017");
var rs = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets = ReplicaSets.of(rs);
assertThat(sets.haveChangedSince(sets)).isFalse();
}
@Test
public void shouldConsiderUnchangedSimilarReplicaSets() {
ReplicaSets sets1 = ReplicaSets.parse("localhost:27017");
ReplicaSets sets2 = ReplicaSets.parse("localhost:27017");
assertThat(sets1.haveChangedSince(sets2)).isFalse();
public void shouldConsiderUnchangedSameReplicaSets() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs1);
sets1 = ReplicaSets.parse("shard1=myReplicaSet/host1:27017,[fe80::601:9bff:feab:ec01]:27017;shard2=otherReplicaset/1.2.3.4:27017,localhost:28017");
sets2 = ReplicaSets.parse("shard1=myReplicaSet/host1:27017,[fe80::601:9bff:feab:ec01]:27017;shard2=otherReplicaset/1.2.3.4:27017,localhost:28017");
assertThat(sets1.haveChangedSince(sets2)).isFalse();
assertThat(sets0.haveChangedSince(sets1)).isFalse();
}
@Test
public void shouldConsiderChangedReplicaSetsWithOneReplicaSetContainingDifferentLocalServers() {
ReplicaSets sets1 = ReplicaSets.parse("localhost:27017");
ReplicaSets sets2 = ReplicaSets.parse("localhost:27017,host2:27017");
assertThat(sets1.haveChangedSince(sets2)).isTrue();
public void shouldConsiderUnchangedSameReplicaSetsWithDifferentAddresses() {
var rs0 = new ReplicaSet("mongodb://1.2.3.4:27017,localhost:28017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs2 = new ReplicaSet("mongodb://localhost:28017/?replicaSet=rs2");
var sets0 = ReplicaSets.of(rs0, rs2);
var sets1 = ReplicaSets.of(rs1, rs2);
assertThat(sets0.haveChangedSince(sets1)).isFalse();
}
@Test
public void shouldConsiderUnchangedReplicaSetsWithAdditionalServerAddressInExistingReplicaSet() {
ReplicaSets sets1 = ReplicaSets.parse("rs1/localhost:27017");
ReplicaSets sets2 = ReplicaSets.parse("rs1/localhost:27017,host2:27017");
assertThat(sets1.haveChangedSince(sets2)).isFalse();
public void shouldConsiderChangedDifferentReplicaSetsWithSameAddresses() {
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs1");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs1);
assertThat(sets0.haveChangedSince(sets1)).isTrue();
}
@Test
public void shouldConsiderChangedReplicaSetsWithAdditionalReplicaSet() {
ReplicaSets sets1 = ReplicaSets.parse("rs1/localhost:27017;rs2/host2:17017");
ReplicaSets sets2 = ReplicaSets.parse("rs1/localhost:27017");
assertThat(sets1.haveChangedSince(sets2)).isTrue();
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0);
var sets1 = ReplicaSets.of(rs0, rs1);
assertThat(sets1.haveChangedSince(sets0)).isTrue();
}
@Test
public void shouldConsiderChangedReplicaSetsWithRemovedReplicaSet() {
ReplicaSets sets1 = ReplicaSets.parse("rs1/localhost:27017");
ReplicaSets sets2 = ReplicaSets.parse("rs1/localhost:27017;rs2/host2:17017");
assertThat(sets1.haveChangedSince(sets2)).isTrue();
var rs0 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://localhost:27017/?replicaSet=rs0");
var sets0 = ReplicaSets.of(rs0, rs1);
var sets1 = ReplicaSets.of(rs0);
assertThat(sets1.haveChangedSince(sets0)).isTrue();
}
@Test
public void shouldNotSubdivideOneReplicaSet() {
sets = ReplicaSets.parse("rs1/host1:27017,host2:27017");
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var sets = ReplicaSets.of(rs0);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(1, divided::add);
assertThat(divided.size()).isEqualTo(1);
assertThat(divided.get(0)).isSameAs(sets);
assertThat(divided.get(0)).isEqualTo(sets);
}
@Test
public void shouldNotSubdivideMultipleReplicaSetsIntoOneGroup() {
sets = ReplicaSets.parse("rs1/host1:27017,host2:27017;rs2/host3:27017");
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01");
var sets = ReplicaSets.of(rs0);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(1, divided::add);
assertThat(divided.size()).isEqualTo(1);
assertThat(divided.get(0)).isSameAs(sets);
assertThat(divided.get(0)).isEqualTo(sets);
}
@Test
public void shouldSubdivideMultipleReplicaSetsWithIntoMultipleGroups() {
sets = ReplicaSets.parse("rs1/host1:27017,host2:27017;rs2/host3:27017");
var rs0 = new ReplicaSet("mongodb://host0:27017,host1:27017/?replicaSet=rs0");
var rs1 = new ReplicaSet("mongodb://host2:27017/?replicaSet=rs01");
var sets = ReplicaSets.of(rs0, rs1);
List<ReplicaSets> divided = new ArrayList<>();
sets.subdivide(2, divided::add);
assertThat(divided.size()).isEqualTo(2);
ReplicaSets dividedSet1 = divided.get(0);
assertThat(dividedSet1.replicaSetCount()).isEqualTo(1);
assertThat(dividedSet1.all().get(0)).isSameAs(sets.all().get(0));
ReplicaSets dividedSet2 = divided.get(1);
assertThat(dividedSet2.replicaSetCount()).isEqualTo(1);
assertThat(dividedSet2.all().get(0)).isSameAs(sets.all().get(1));
assertThat(divided.get(0).all()).containsExactly(sets.all().get(0));
assertThat(divided.get(1).all()).containsExactly(sets.all().get(1));
}
}

View File

@ -23,7 +23,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
@ -56,25 +55,17 @@ private static List<Integer> getMongoVersion() {
.collect(Collectors.toList());
}
public static String hostsFor(MongoDbReplicaSet mongo) {
var connectionString = new ConnectionString(mongo.getConnectionString());
var hosts = String.join(",", connectionString.getHosts());
return mongo.getName() + "/" + hosts;
}
public static Configuration getConfiguration() {
return getConfiguration("rs0/localhost:27017");
return getConfiguration("mongodb://dummy:27017");
}
public static Configuration getConfiguration(MongoDbReplicaSet mongo) {
var hosts = hostsFor(mongo);
return getConfiguration(hosts);
return getConfiguration(mongo.getConnectionString());
}
public static Configuration getConfiguration(String hosts) {
public static Configuration getConfiguration(String connectionString) {
final Builder cfgBuilder = Configuration.fromSystemProperties("connector.").edit()
.withDefault(MongoDbConnectorConfig.HOSTS, hosts)
.withDefault(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, true)
.withDefault(MongoDbConnectorConfig.CONNECTION_STRING, connectionString)
.withDefault(CommonConnectorConfig.TOPIC_PREFIX, "mongo1");
return cfgBuilder.build();
}

View File

@ -13,12 +13,12 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.junit.Before;
import org.junit.Test;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.ArrayEncoding;
/**
@ -81,7 +81,7 @@ public void setup() throws Exception {
builder = SchemaBuilder.struct().name("array");
}
@Test(expected = ConnectException.class)
@Test(expected = DebeziumException.class)
public void shouldDetectHeterogenousArray() throws Exception {
final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
final BsonDocument val = BsonDocument.parse(HETEROGENOUS_ARRAY);
@ -90,7 +90,7 @@ public void shouldDetectHeterogenousArray() throws Exception {
}
}
@Test(expected = ConnectException.class)
@Test(expected = DebeziumException.class)
public void shouldDetectHeterogenousDocumentInArray() throws Exception {
final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
final BsonDocument val = BsonDocument.parse(HETEROGENOUS_DOCUMENT_IN_ARRAY);

View File

@ -90,7 +90,7 @@ public void afterEach() {
}
finally {
if (context != null) {
context.getConnectionContext().shutdown();
context.getConnectionContext().close();
}
}
outboxEventRouter.close();

View File

@ -1010,6 +1010,26 @@ default boolean hasKey(Field field) {
*/
String getString(String key);
default List<String> getList(Field field) {
return getList(field.name());
}
default List<String> getList(String key) {
return getList(key, ",", Function.identity());
}
default <T> List<T> getList(Field field, String separator, Function<String, T> converter) {
return getList(field.name(), separator, converter);
}
default <T> List<T> getList(String key, String separator, Function<String, T> converter) {
var value = getString(key);
return Arrays.stream(value.split(separator))
.map(String::trim)
.map(converter)
.collect(Collectors.toList());
}
/**
* Get the string value associated with the given key, returning the default value if there is no such key-value pair.
*

View File

@ -145,7 +145,7 @@ public String getConnectionString() {
return "mongodb://" + members.stream()
.map(MongoDbContainer::getClientAddress)
.map(Objects::toString)
.collect(joining(","));
.collect(joining(",")) + "/?replicaSet=" + name;
}
/**

View File

@ -102,7 +102,7 @@ public void testCluster(MongoDbReplicaSet.Builder replicaSet) throws Interrupted
// Create a connection string with a desired read preference
var readPreference = ReadPreference.primary();
var connectionString = new ConnectionString(cluster.getConnectionString() + "/?readPreference=" + readPreference.getName());
var connectionString = new ConnectionString(cluster.getConnectionString() + "&readPreference=" + readPreference.getName());
LOGGER.info("Connecting to cluster: {}", connectionString);
try (var client = MongoClients.create(connectionString)) {