From acdb8bee8d2b8ea471cfc29875869b4618c7a011 Mon Sep 17 00:00:00 2001 From: Jakub Cechacek Date: Thu, 28 Dec 2023 23:33:19 +0100 Subject: [PATCH] DBZ-7260 Merge now unnecessary client factory into MongoDbConnectionContext --- .../connection/MongoDbClientFactory.java | 60 ------------------- .../connection/MongoDbConnectionContext.java | 47 ++++++++------- 2 files changed, 24 insertions(+), 83 deletions(-) delete mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java deleted file mode 100644 index a4093dd78..000000000 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbClientFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mongodb.connection; - -import java.util.function.Consumer; - -import com.mongodb.ConnectionString; -import com.mongodb.MongoClientSettings; -import com.mongodb.client.MongoClient; - -import io.debezium.annotation.ThreadSafe; - -/** - * A connection pool of MongoClient instances. - * - * @author Randall Hauch - */ -@ThreadSafe -final class MongoDbClientFactory { - - private final MongoClientSettings defaultSettings; - - /** - * Obtains new client factory - * - * @param configurator settings instance use as template for all clients - * @return mongo client factory - */ - public static MongoDbClientFactory create(Consumer configurator) { - var settings = MongoClientSettings.builder(); - configurator.accept(settings); - return new MongoDbClientFactory(settings); - } - - private MongoDbClientFactory(MongoClientSettings.Builder settings) { - this.defaultSettings = settings.build(); - } - - /** - * Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings} - * @return connection settings builder - */ - private MongoClientSettings.Builder settings() { - return MongoClientSettings.builder(defaultSettings); - } - - public MongoClient client(ConnectionString connectionString) { - return client(settings -> settings.applyConnectionString(connectionString)); - } - - private MongoClient client(Consumer configurator) { - MongoClientSettings.Builder settings = settings(); - configurator.accept(settings); - - return com.mongodb.client.MongoClients.create(settings.build()); - } -} diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnectionContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnectionContext.java index db11e862d..8b541f19c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnectionContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/connection/MongoDbConnectionContext.java @@ -14,7 +14,9 @@ import org.slf4j.LoggerFactory; import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterType; @@ -31,37 +33,36 @@ public class MongoDbConnectionContext { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectionContext.class); private final MongoDbConnectorConfig connectorConfig; - private final MongoDbClientFactory clientFactory; + private final MongoClientSettings clientSettings; /** * @param config the configuration */ public MongoDbConnectionContext(Configuration config) { this.connectorConfig = new MongoDbConnectorConfig(config); - - // Set up the client pool so that it ... connectorConfig.getAuthProvider().init(config); - clientFactory = MongoDbClientFactory.create(settings -> { - settings.applyToSocketSettings(builder -> builder - .connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS) - .readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS)) - .applyToClusterSettings( - builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS)) - .applyToServerSettings( - builder -> builder.heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS)); - connectorConfig.getAuthProvider().addAuthConfig(settings); + var settings = MongoClientSettings.builder() + .applyToSocketSettings(builder -> builder + .connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS) + .readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS)) + .applyToClusterSettings( + builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS)) + .applyToServerSettings(builder -> builder + .heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS)) + .applyToSocketSettings(builder -> builder + .connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS) + .readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS)) + .applyToClusterSettings(builder -> builder + .serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS)) + .applyToSslSettings(builder -> builder + .enabled(connectorConfig.isSslEnabled()) + .invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames())); - if (connectorConfig.isSslEnabled()) { - settings.applyToSslSettings( - builder -> builder.enabled(true).invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames())); - } - - settings.applyToSocketSettings(builder -> builder.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS) - .readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS)) - .applyToClusterSettings( - builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS)); - }); + this.clientSettings = connectorConfig.getAuthProvider() + .addAuthConfig(settings) + .applyConnectionString(connectorConfig.getConnectionString()) + .build(); } public MongoDbConnectorConfig getConnectorConfig() { @@ -87,7 +88,7 @@ public String getMaskedConnectionString() { * @return mongo client */ public MongoClient getMongoClient() { - return clientFactory.client(getConnectionString()); + return MongoClients.create(clientSettings); } public ClusterDescription getClusterDescription() {