DBZ-7260 Merge now unnecessary client factory into MongoDbConnectionContext
This commit is contained in:
parent
26e46c9f55
commit
acdb8bee8d
@ -1,60 +0,0 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.mongodb.connection;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.client.MongoClient;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
|
||||
/**
|
||||
* A connection pool of MongoClient instances.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@ThreadSafe
|
||||
final class MongoDbClientFactory {
|
||||
|
||||
private final MongoClientSettings defaultSettings;
|
||||
|
||||
/**
|
||||
* Obtains new client factory
|
||||
*
|
||||
* @param configurator settings instance use as template for all clients
|
||||
* @return mongo client factory
|
||||
*/
|
||||
public static MongoDbClientFactory create(Consumer<MongoClientSettings.Builder> configurator) {
|
||||
var settings = MongoClientSettings.builder();
|
||||
configurator.accept(settings);
|
||||
return new MongoDbClientFactory(settings);
|
||||
}
|
||||
|
||||
private MongoDbClientFactory(MongoClientSettings.Builder settings) {
|
||||
this.defaultSettings = settings.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates fresh {@link MongoClientSettings.Builder} from {@link #defaultSettings}
|
||||
* @return connection settings builder
|
||||
*/
|
||||
private MongoClientSettings.Builder settings() {
|
||||
return MongoClientSettings.builder(defaultSettings);
|
||||
}
|
||||
|
||||
public MongoClient client(ConnectionString connectionString) {
|
||||
return client(settings -> settings.applyConnectionString(connectionString));
|
||||
}
|
||||
|
||||
private MongoClient client(Consumer<MongoClientSettings.Builder> configurator) {
|
||||
MongoClientSettings.Builder settings = settings();
|
||||
configurator.accept(settings);
|
||||
|
||||
return com.mongodb.client.MongoClients.create(settings.build());
|
||||
}
|
||||
}
|
@ -14,7 +14,9 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.mongodb.ConnectionString;
|
||||
import com.mongodb.MongoClientSettings;
|
||||
import com.mongodb.client.MongoClient;
|
||||
import com.mongodb.client.MongoClients;
|
||||
import com.mongodb.connection.ClusterDescription;
|
||||
import com.mongodb.connection.ClusterType;
|
||||
|
||||
@ -31,37 +33,36 @@ public class MongoDbConnectionContext {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectionContext.class);
|
||||
|
||||
private final MongoDbConnectorConfig connectorConfig;
|
||||
private final MongoDbClientFactory clientFactory;
|
||||
private final MongoClientSettings clientSettings;
|
||||
|
||||
/**
|
||||
* @param config the configuration
|
||||
*/
|
||||
public MongoDbConnectionContext(Configuration config) {
|
||||
this.connectorConfig = new MongoDbConnectorConfig(config);
|
||||
|
||||
// Set up the client pool so that it ...
|
||||
connectorConfig.getAuthProvider().init(config);
|
||||
clientFactory = MongoDbClientFactory.create(settings -> {
|
||||
settings.applyToSocketSettings(builder -> builder
|
||||
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
|
||||
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToServerSettings(
|
||||
builder -> builder.heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS));
|
||||
|
||||
connectorConfig.getAuthProvider().addAuthConfig(settings);
|
||||
var settings = MongoClientSettings.builder()
|
||||
.applyToSocketSettings(builder -> builder
|
||||
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
|
||||
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToServerSettings(builder -> builder
|
||||
.heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToSocketSettings(builder -> builder
|
||||
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
|
||||
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(builder -> builder
|
||||
.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToSslSettings(builder -> builder
|
||||
.enabled(connectorConfig.isSslEnabled())
|
||||
.invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames()));
|
||||
|
||||
if (connectorConfig.isSslEnabled()) {
|
||||
settings.applyToSslSettings(
|
||||
builder -> builder.enabled(true).invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames()));
|
||||
}
|
||||
|
||||
settings.applyToSocketSettings(builder -> builder.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
|
||||
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
|
||||
.applyToClusterSettings(
|
||||
builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS));
|
||||
});
|
||||
this.clientSettings = connectorConfig.getAuthProvider()
|
||||
.addAuthConfig(settings)
|
||||
.applyConnectionString(connectorConfig.getConnectionString())
|
||||
.build();
|
||||
}
|
||||
|
||||
public MongoDbConnectorConfig getConnectorConfig() {
|
||||
@ -87,7 +88,7 @@ public String getMaskedConnectionString() {
|
||||
* @return mongo client
|
||||
*/
|
||||
public MongoClient getMongoClient() {
|
||||
return clientFactory.client(getConnectionString());
|
||||
return MongoClients.create(clientSettings);
|
||||
}
|
||||
|
||||
public ClusterDescription getClusterDescription() {
|
||||
|
Loading…
Reference in New Issue
Block a user