DBZ-4733 Refactored MongoUtil.getPrimaryAddress to properly use mongo driver 4.x

This commit is contained in:
jcechace 2022-05-26 15:15:08 +02:00 committed by Jiri Pechanec
parent 21ed5f9b9f
commit 521156f5c2

View File

@ -28,6 +28,7 @@
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import io.debezium.DebeziumException;
@ -353,17 +354,9 @@ protected static String toString(List<ServerAddress> addresses) {
protected static ServerAddress getPrimaryAddress(MongoClient client) {
ClusterDescription clusterDescription = client.getClusterDescription();
ClusterDescription clusterDescription = clusterDescription(client);
if (clusterDescription == null || !clusterDescription.hasReadableServer(ReadPreference.primaryPreferred())) {
client.listDatabaseNames().first(); // force connection attempt and make async client wait/block
clusterDescription = client.getClusterDescription();
}
if (clusterDescription == null) {
throw new DebeziumException("Unable to read cluster description from MongoDB connection.");
}
else if (!clusterDescription.hasReadableServer(ReadPreference.primaryPreferred())) {
if (!clusterDescription.hasReadableServer(ReadPreference.primaryPreferred())) {
throw new DebeziumException("Unable to use cluster description from MongoDB connection: " + clusterDescription);
}
@ -375,13 +368,10 @@ else if (!clusterDescription.hasReadableServer(ReadPreference.primaryPreferred()
Optional<ServerDescription> primaryDescription = serverDescriptions.stream().filter(ServerDescription::isPrimary).findFirst();
if (!primaryDescription.isPresent()) {
throw new DebeziumException("Unable to find primary from MongoDB connection, got '" + serverDescriptions + "'");
}
ServerAddress primaryAddress = primaryDescription.get().getAddress();
return new ServerAddress(primaryAddress.getHost(), primaryAddress.getPort());
return primaryDescription
.map(ServerDescription::getAddress)
.map(address -> new ServerAddress(address.getHost(), address.getPort()))
.orElseThrow(() -> new DebeziumException("Unable to find primary from MongoDB connection, got '" + serverDescriptions + "'"));
}
/**