DBZ-4339 Generalising MongoPrimary into MongoPreferredNode

This commit is contained in:
jcechace 2022-11-19 00:57:37 +01:00 committed by Jiri Pechanec
parent 189f3c023e
commit 434322256c
3 changed files with 113 additions and 72 deletions

View File

@ -24,6 +24,7 @@
import com.mongodb.ConnectionString; import com.mongodb.ConnectionString;
import com.mongodb.MongoCredential; import com.mongodb.MongoCredential;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClient;
import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterDescription;
@ -52,7 +53,7 @@ public class ConnectionContext implements AutoCloseable {
protected final Configuration config; protected final Configuration config;
protected final MongoClients pool; protected final MongoClients pool;
protected final DelayStrategy primaryBackoffStrategy; protected final DelayStrategy backoffStrategy;
protected final boolean useHostsAsSeeds; protected final boolean useHostsAsSeeds;
/** /**
@ -109,7 +110,7 @@ public ConnectionContext(Configuration config) {
final int initialDelayInMs = config.getInteger(MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS); final int initialDelayInMs = config.getInteger(MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS);
final long maxDelayInMs = config.getLong(MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS); final long maxDelayInMs = config.getLong(MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS);
this.primaryBackoffStrategy = DelayStrategy.exponential(Duration.ofMillis(initialDelayInMs), Duration.ofMillis(maxDelayInMs)); this.backoffStrategy = DelayStrategy.exponential(Duration.ofMillis(initialDelayInMs), Duration.ofMillis(maxDelayInMs));
} }
public void shutdown() { public void shutdown() {
@ -208,12 +209,8 @@ public Duration pollInterval() {
return Duration.ofMillis(config.getLong(MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS)); return Duration.ofMillis(config.getLong(MongoDbConnectorConfig.MONGODB_POLL_INTERVAL_MS));
} }
public int maxConnectionAttemptsForPrimary() {
return config.getInteger(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS);
}
/** /**
* Obtain a client that will repeated try to obtain a client to the primary node of the replica set, waiting (and using * Obtain a client that will repeatedly try to obtain a client to the primary node of the replica set, waiting (and using
* this context's back-off strategy) if required until the primary becomes available. * this context's back-off strategy) if required until the primary becomes available.
* *
* @param replicaSet the replica set information; may not be null * @param replicaSet the replica set information; may not be null
@ -227,20 +224,36 @@ public ConnectionContext.MongoPrimary primaryFor(ReplicaSet replicaSet, Filters
} }
/** /**
* Obtain a client that will repeated try to obtain a client to the primary node of the replica set, waiting (and using * Obtain a client that will repeatedly try to obtain a client to a node of preferred type of the replica set, waiting (and using
* this context's back-off strategy) if required until the primary becomes available. * this context's back-off strategy) if required until the node becomes available.
* *
* @param replicaSet the replica set information; may not be null * @param replicaSet the replica set information; may not be null
* @param filters the filter configuration
* @param errorHandler the function to be called whenever the node is unable to
* {@link MongoPreferredNode#execute(String, Consumer) execute} an operation to completion; may be null
* @return the client, or {@code null} if no primary could be found for the replica set * @return the client, or {@code null} if no primary could be found for the replica set
*/ */
protected Supplier<MongoClient> primaryClientFor(ReplicaSet replicaSet) { public ConnectionContext.MongoPreferredNode preferredFor(ReplicaSet replicaSet, ReadPreference preference, Filters filters,
return primaryClientFor(replicaSet, (attempts, remaining, error) -> { BiConsumer<String, Throwable> errorHandler) {
return new ConnectionContext.MongoPreferredNode(this, replicaSet, preference, filters, errorHandler);
}
/**
* Obtain a client that will repeatedly try to obtain a client to a node of preferred type of the replica set, waiting (and using
* this context's back-off strategy) if required until the node becomes available.
*
* @param replicaSet the replica set information; may not be null
* @return the client, or {@code null} if no node of preferred type could be found for the replica set
*/
protected Supplier<MongoClient> preferredClientFor(ReplicaSet replicaSet, ReadPreference preference) {
return preferredClientFor(replicaSet, preference, (attempts, remaining, error) -> {
if (error == null) { if (error == null) {
logger().info("Unable to connect to primary node of '{}' after attempt #{} ({} remaining)", replicaSet, attempts, remaining); logger().info("Unable to connect to {} node of '{}' after attempt #{} ({} remaining)", preference.getName(),
replicaSet, attempts, remaining);
} }
else { else {
logger().error("Error while attempting to connect to primary node of '{}' after attempt #{} ({} remaining): {}", replicaSet, logger().error("Error while attempting to connect to {} node of '{}' after attempt #{} ({} remaining): {}", preference.getName(),
attempts, remaining, error.getMessage(), error); replicaSet, attempts, remaining, error.getMessage(), error);
} }
}); });
} }
@ -253,18 +266,18 @@ protected Supplier<MongoClient> primaryClientFor(ReplicaSet replicaSet) {
* @param handler the function that will be called when the primary could not be obtained; may not be null * @param handler the function that will be called when the primary could not be obtained; may not be null
* @return the client, or {@code null} if no primary could be found for the replica set * @return the client, or {@code null} if no primary could be found for the replica set
*/ */
protected Supplier<MongoClient> primaryClientFor(ReplicaSet replicaSet, PrimaryConnectFailed handler) { protected Supplier<MongoClient> preferredClientFor(ReplicaSet replicaSet, ReadPreference preference, PreferredConnectFailed handler) {
Supplier<MongoClient> factory = () -> clientForPrimary(replicaSet); Supplier<MongoClient> factory = () -> clientForPreferred(replicaSet, preference);
int maxAttempts = maxConnectionAttemptsForPrimary(); int maxAttempts = config.getInteger(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS);
return () -> { return () -> {
int attempts = 0; int attempts = 0;
MongoClient primary = null; MongoClient client = null;
while (primary == null) { while (client == null) {
++attempts; ++attempts;
try { try {
// Try to get the primary // Try to get the primary
primary = factory.get(); client = factory.get();
if (primary != null) { if (client != null) {
break; break;
} }
} }
@ -272,35 +285,50 @@ protected Supplier<MongoClient> primaryClientFor(ReplicaSet replicaSet, PrimaryC
handler.failed(attempts, maxAttempts - attempts, t); handler.failed(attempts, maxAttempts - attempts, t);
} }
if (attempts > maxAttempts) { if (attempts > maxAttempts) {
throw new ConnectException("Unable to connect to primary node of '" + replicaSet + "' after " + throw new ConnectException("Unable to connect to " + preference.getName() + " node of '" + replicaSet +
attempts + " failed attempts"); "' after " + attempts + " failed attempts");
} }
handler.failed(attempts, maxAttempts - attempts, null); handler.failed(attempts, maxAttempts - attempts, null);
primaryBackoffStrategy.sleepWhen(true); backoffStrategy.sleepWhen(true);
continue;
} }
return primary; return client;
}; };
} }
@FunctionalInterface @FunctionalInterface
public static interface PrimaryConnectFailed { public static interface PreferredConnectFailed {
void failed(int attemptNumber, int attemptsRemaining, Throwable error); void failed(int attemptNumber, int attemptsRemaining, Throwable error);
} }
/** /**
* A supplier of a client that connects only to the primary of a replica set. Operations on the primary will continue * A supplier of a client that connects only to the primary of a replica set. Operations on the primary will continue
*/ */
public static class MongoPrimary { public static class MongoPrimary extends MongoPreferredNode {
protected MongoPrimary(ConnectionContext context, ReplicaSet replicaSet, Filters filters, BiConsumer<String, Throwable> errorHandler) {
super(context, replicaSet, ReadPreference.primary(), filters, errorHandler);
}
}
/**
* A supplier of a client that connects only to node of preferred type from a replica set. Operations on this node will continue
*/
public static class MongoPreferredNode {
private final ReplicaSet replicaSet; private final ReplicaSet replicaSet;
private final Supplier<MongoClient> primaryConnectionSupplier; private final Supplier<MongoClient> connectionSupplier;
private final Filters filters; private final Filters filters;
private final BiConsumer<String, Throwable> errorHandler; private final BiConsumer<String, Throwable> errorHandler;
private final AtomicBoolean running = new AtomicBoolean(true); private final AtomicBoolean running = new AtomicBoolean(true);
private final ReadPreference preference;
protected MongoPrimary(ConnectionContext context, ReplicaSet replicaSet, Filters filters, BiConsumer<String, Throwable> errorHandler) { protected MongoPreferredNode(
ConnectionContext context,
ReplicaSet replicaSet,
ReadPreference preference,
Filters filters,
BiConsumer<String, Throwable> errorHandler) {
this.replicaSet = replicaSet; this.replicaSet = replicaSet;
this.primaryConnectionSupplier = context.primaryClientFor(replicaSet); this.preference = preference;
this.connectionSupplier = context.preferredClientFor(replicaSet, preference);
this.filters = filters; this.filters = filters;
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
} }
@ -315,35 +343,44 @@ public ReplicaSet replicaSet() {
} }
/** /**
* Get the address of the primary node, if there is one. * Get read preference of
* *
* @return the address of the replica set's primary node, or {@code null} if there is currently no primary * @return the read preference
*/
public ReadPreference getPreference() {
return preference;
}
/**
* Get the address of the node with preferred type, if there is one.
*
* @return the address of the replica set's preferred node, or {@code null} if there is currently no node of preferred type
*/ */
public ServerAddress address() { public ServerAddress address() {
return execute("get replica set primary", primary -> { return execute("get replica set " + preference.getName(), client -> {
return MongoUtil.getPrimaryAddress(primary); return MongoUtil.getPreferredAddress(client, preference);
}); });
} }
/** /**
* Execute the supplied operation using the primary, blocking until a primary is available. Whenever the operation stops * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the primary is no longer primary), then restart the operation using the current primary. * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
* *
* @param desc the description of the operation, for logging purposes * @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on the primary. * @param operation the operation to be performed on a node of preferred type.
*/ */
public void execute(String desc, Consumer<MongoClient> operation) { public void execute(String desc, Consumer<MongoClient> operation) {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) { while (true) {
MongoClient primary = primaryConnectionSupplier.get(); MongoClient client = connectionSupplier.get();
try { try {
operation.accept(primary); operation.accept(client);
return; return;
} }
catch (Throwable t) { catch (Throwable t) {
errorHandler.accept(desc, t); errorHandler.accept(desc, t);
if (!isRunning()) { if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t); throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
} }
try { try {
errorMetronome.pause(); errorMetronome.pause();
@ -356,24 +393,24 @@ public void execute(String desc, Consumer<MongoClient> operation) {
} }
/** /**
* Execute the supplied operation using the primary, blocking until a primary is available. Whenever the operation stops * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the primary is no longer primary), then restart the operation using the current primary. * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
* *
* @param desc the description of the operation, for logging purposes * @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on the primary * @param operation the operation to be performed on a node of preferred type
* @return return value of the executed operation * @return return value of the executed operation
*/ */
public <T> T execute(String desc, Function<MongoClient, T> operation) { public <T> T execute(String desc, Function<MongoClient, T> operation) {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) { while (true) {
MongoClient primary = primaryConnectionSupplier.get(); MongoClient client = connectionSupplier.get();
try { try {
return operation.apply(primary); return operation.apply(client);
} }
catch (Throwable t) { catch (Throwable t) {
errorHandler.accept(desc, t); errorHandler.accept(desc, t);
if (!isRunning()) { if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t); throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
} }
try { try {
errorMetronome.pause(); errorMetronome.pause();
@ -386,19 +423,19 @@ public <T> T execute(String desc, Function<MongoClient, T> operation) {
} }
/** /**
* Execute the supplied operation using the primary, blocking until a primary is available. Whenever the operation stops * Execute the supplied operation using the preferred node, blocking until it is available. Whenever the operation stops
* (e.g., if the primary is no longer primary), then restart the operation using the current primary. * (e.g., if the node is no longer of preferred type), then restart the operation using a current node of that type.
* *
* @param desc the description of the operation, for logging purposes * @param desc the description of the operation, for logging purposes
* @param operation the operation to be performed on the primary. * @param operation the operation to be performed on a node of preferred type.
* @throws InterruptedException if the operation was interrupted * @throws InterruptedException if the operation was interrupted
*/ */
public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException { public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation) throws InterruptedException {
final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM); final Metronome errorMetronome = Metronome.sleeper(PAUSE_AFTER_ERROR, Clock.SYSTEM);
while (true) { while (true) {
MongoClient primary = primaryConnectionSupplier.get(); MongoClient client = connectionSupplier.get();
try { try {
operation.accept(primary); operation.accept(client);
return; return;
} }
catch (InterruptedException e) { catch (InterruptedException e) {
@ -407,7 +444,7 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
catch (Throwable t) { catch (Throwable t) {
errorHandler.accept(desc, t); errorHandler.accept(desc, t);
if (!isRunning()) { if (!isRunning()) {
throw new ConnectException("Operation failed and MongoDB primary termination requested", t); throw new ConnectException("Operation failed and MongoDB " + preference.getName() + " termination requested", t);
} }
errorMetronome.pause(); errorMetronome.pause();
} }
@ -415,18 +452,18 @@ public void executeBlocking(String desc, BlockingConsumer<MongoClient> operation
} }
/** /**
* Use the primary to get the names of all the databases in the replica set, applying the current database * Use a node of preferred type to get the names of all the databases in the replica set, applying the current database
* filter configuration. This method will block until a primary can be obtained to get the names of all * filter configuration. This method will block until a node of preferred type can be obtained to get the names of all
* databases in the replica set. * databases in the replica set.
* *
* @return the database names; never null but possibly empty * @return the database names; never null but possibly empty
*/ */
public Set<String> databaseNames() { public Set<String> databaseNames() {
return execute("get database names", primary -> { return execute("get database names", client -> {
Set<String> databaseNames = new HashSet<>(); Set<String> databaseNames = new HashSet<>();
MongoUtil.forEachDatabaseName( MongoUtil.forEachDatabaseName(
primary, client,
dbName -> { dbName -> {
if (filters.databaseFilter().test(dbName)) { if (filters.databaseFilter().test(dbName)) {
databaseNames.add(dbName); databaseNames.add(dbName);
@ -438,7 +475,7 @@ public Set<String> databaseNames() {
} }
/** /**
* Use the primary to get the identifiers of all the collections in the replica set, applying the current * Use a node of preferred type to get the identifiers of all the collections in the replica set, applying the current
* collection filter configuration. This method will block until a primary can be obtained to get the * collection filter configuration. This method will block until a primary can be obtained to get the
* identifiers of all collections in the replica set. * identifiers of all collections in the replica set.
* *
@ -448,12 +485,12 @@ public List<CollectionId> collections() {
String replicaSetName = replicaSet.replicaSetName(); String replicaSetName = replicaSet.replicaSetName();
// For each database, get the list of collections ... // For each database, get the list of collections ...
return execute("get collections in databases", primary -> { return execute("get collections in databases", client -> {
List<CollectionId> collections = new ArrayList<>(); List<CollectionId> collections = new ArrayList<>();
Set<String> databaseNames = databaseNames(); Set<String> databaseNames = databaseNames();
for (String dbName : databaseNames) { for (String dbName : databaseNames) {
MongoUtil.forEachCollectionNameInDatabase(primary, dbName, collectionName -> { MongoUtil.forEachCollectionNameInDatabase(client, dbName, collectionName -> {
CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName); CollectionId collectionId = new CollectionId(replicaSetName, dbName, collectionName);
if (filters.collectionFilter().test(collectionId)) { if (filters.collectionFilter().test(collectionId)) {
@ -484,7 +521,7 @@ public void stop() {
* @param replicaSet the replica set information; may not be null * @param replicaSet the replica set information; may not be null
* @return the client, or {@code null} if no primary could be found for the replica set * @return the client, or {@code null} if no primary could be found for the replica set
*/ */
protected MongoClient clientForPrimary(ReplicaSet replicaSet) { protected MongoClient clientForPreferred(ReplicaSet replicaSet, ReadPreference preference) {
MongoClient replicaSetClient = clientFor(replicaSet); MongoClient replicaSetClient = clientFor(replicaSet);
final ClusterDescription clusterDescription = MongoUtil.clusterDescription(replicaSetClient); final ClusterDescription clusterDescription = MongoUtil.clusterDescription(replicaSetClient);
if (clusterDescription.getType() == ClusterType.UNKNOWN) { if (clusterDescription.getType() == ClusterType.UNKNOWN) {
@ -497,9 +534,9 @@ protected MongoClient clientForPrimary(ReplicaSet replicaSet) {
"' is not a valid replica set and cannot be used"); "' is not a valid replica set and cannot be used");
} }
// It is a replica set ... // It is a replica set ...
ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(replicaSetClient); ServerAddress preferredAddress = MongoUtil.getPreferredAddress(replicaSetClient, preference);
if (primaryAddress != null) { if (preferredAddress != null) {
return pool.clientFor(primaryAddress); return pool.clientFor(preferredAddress);
} }
return null; return null;
} }

View File

@ -369,28 +369,31 @@ protected static String toString(List<ServerAddress> addresses) {
return Strings.join(ADDRESS_DELIMITER, addresses); return Strings.join(ADDRESS_DELIMITER, addresses);
} }
protected static ServerAddress getPrimaryAddress(MongoClient client) { protected static ServerAddress getPreferredAddress(MongoClient client, ReadPreference preference) {
ClusterDescription clusterDescription = clusterDescription(client); ClusterDescription clusterDescription = clusterDescription(client);
if (!clusterDescription.hasReadableServer(ReadPreference.primaryPreferred())) { if (!clusterDescription.hasReadableServer(preference)) {
throw new DebeziumException("Unable to use cluster description from MongoDB connection: " + clusterDescription); throw new DebeziumException("Unable to use cluster description from MongoDB connection: " + clusterDescription);
} }
List<ServerDescription> serverDescriptions = clusterDescription.getServerDescriptions(); List<ServerDescription> serverDescriptions = preference.choose(clusterDescription);
if (serverDescriptions == null || serverDescriptions.size() == 0) { if (serverDescriptions.size() == 0) {
throw new DebeziumException("Unable to read server descriptions from MongoDB connection (Null or empty list)."); throw new DebeziumException("Unable to read server descriptions from MongoDB connection (Null or empty list).");
} }
Optional<ServerDescription> primaryDescription = serverDescriptions.stream().filter(ServerDescription::isPrimary).findFirst(); Optional<ServerDescription> preferredDescription = serverDescriptions.stream().findFirst();
return primaryDescription return preferredDescription
.map(ServerDescription::getAddress) .map(ServerDescription::getAddress)
.map(address -> new ServerAddress(address.getHost(), address.getPort())) .map(address -> new ServerAddress(address.getHost(), address.getPort()))
.orElseThrow(() -> new DebeziumException("Unable to find primary from MongoDB connection, got '" + serverDescriptions + "'")); .orElseThrow(() -> new DebeziumException("Unable to find primary from MongoDB connection, got '" + serverDescriptions + "'"));
} }
protected static ServerAddress getPrimaryAddress(MongoClient client) {
return getPreferredAddress(client, ReadPreference.primary());
}
/** /**
* Retrieves cluster description, forcing a connection if not yet available * Retrieves cluster description, forcing a connection if not yet available
* *

View File

@ -11,6 +11,7 @@
import org.junit.Test; import org.junit.Test;
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress; import com.mongodb.ServerAddress;
import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerDescription;
@ -41,7 +42,7 @@ public void testGetPrimaryAddress() {
assertThat(expectedPrimaryAddress).isPresent(); assertThat(expectedPrimaryAddress).isPresent();
primary.execute("shouldConnect", mongo -> { primary.execute("shouldConnect", mongo -> {
ServerAddress primaryAddress = MongoUtil.getPrimaryAddress(mongo); ServerAddress primaryAddress = MongoUtil.getPreferredAddress(mongo, ReadPreference.primary());
assertThat(primaryAddress.getHost()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getHost).get()); assertThat(primaryAddress.getHost()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getHost).get());
assertThat(primaryAddress.getPort()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getPort).get()); assertThat(primaryAddress.getPort()).isEqualTo(expectedPrimaryAddress.map(ServerAddress::getPort).get());
}); });