DBZ-7379 Cleaned up MongoClient creating and added ability to use connector specific keystore/truststore

This commit is contained in:
Jakub Cechacek 2024-01-29 11:24:45 +01:00 committed by Jiri Pechanec
parent 2bfa92e6af
commit 31dac84135
5 changed files with 320 additions and 61 deletions

View File

@ -5,6 +5,9 @@
*/ */
package io.debezium.connector.mongodb; package io.debezium.connector.mongodb;
import static java.util.function.Predicate.not;
import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -30,7 +33,6 @@
import io.debezium.config.Field.ValidationOutput; import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.mongodb.connection.ConnectionStrings;
import io.debezium.connector.mongodb.connection.DefaultMongoDbAuthProvider; import io.debezium.connector.mongodb.connection.DefaultMongoDbAuthProvider;
import io.debezium.connector.mongodb.connection.MongoDbAuthProvider; import io.debezium.connector.mongodb.connection.MongoDbAuthProvider;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
@ -634,6 +636,61 @@ public static OversizeHandlingMode parse(String value, String defaultValue) {
.withValidation(Field::isBoolean) .withValidation(Field::isBoolean)
.withDescription("Whether invalid host names are allowed when using SSL. If true the connection will not prevent man-in-the-middle attacks"); .withDescription("Whether invalid host names are allowed when using SSL. If true the connection will not prevent man-in-the-middle attacks");
public static final Field SSL_KEYSTORE = Field.create("mongodb.ssl.keystore")
.withDisplayName("SSL Keystore")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 1))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("The location of the key store file. "
+ "This is optional and can be used for two-way authentication between the client and the MySQL Server.");
public static final Field SSL_KEYSTORE_PASSWORD = Field.create("mongodb.ssl.keystore.password")
.withDisplayName("SSL Keystore Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 2))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The password for the key store file. "
+ "This is optional and only needed if 'mongodb.ssl.keystore' is configured.");
public static final Field SSL_KEYSTORE_TYPE = Field.create("mongodb.ssl.keystore.type")
.withDisplayName("SSL Keystore Type")
.withType(Type.STRING)
.withDefault("PKCS12")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 3))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The type of key store file. "
+ "This is optional and only needed if 'mongodb.ssl.keystore' is configured.");
public static final Field SSL_TRUSTSTORE = Field.create("mongodb.ssl.truststore")
.withDisplayName("SSL Truststore")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 4))
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("The location of the trust store file for the server certificate verification.");
public static final Field SSL_TRUSTSTORE_PASSWORD = Field.create("mongodb.ssl.truststore.password")
.withDisplayName("SSL Truststore Password")
.withType(Type.PASSWORD)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 5))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The password for the trust store file. "
+ "Used to check the integrity of the truststore, and unlock the truststore.");
public static final Field SSL_TRUSTSTORE_TYPE = Field.create("mongodb.ssl.truststore.type")
.withDisplayName("SSL Keystore Type")
.withType(Type.STRING)
.withDefault("PKCS12")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION_ADVANCED_SSL, 6))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("The type of key store file. "
+ "This is optional and only needed if 'mongodb.ssl.truststore' is configured.");
public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms") public static final Field CONNECT_TIMEOUT_MS = Field.create("mongodb.connect.timeout.ms")
.withDisplayName("Connect Timeout MS") .withDisplayName("Connect Timeout MS")
.withType(Type.INT) .withType(Type.INT)
@ -974,9 +1031,18 @@ public static ConfigDef configDef() {
private final int snapshotMaxThreads; private final int snapshotMaxThreads;
private final int cursorMaxAwaitTimeMs; private final int cursorMaxAwaitTimeMs;
private final ConnectionString connectionString; private final ConnectionString connectionString;
private final String user;
private final String password;
private final String authSource;
private final MongoDbAuthProvider authProvider; private final MongoDbAuthProvider authProvider;
private final boolean sslEnabled; private final boolean sslEnabled;
private final boolean sslAllowInvalidHostnames; private final boolean sslAllowInvalidHostnames;
private final String sslKeyStore;
private final String sslKeyStorePassword;
private final String sslKeyStoreType;
private final String sslTrustStore;
private final String sslTrustStorePassword;
private final String sslTrustStoreType;
private final int connectTimeoutMs; private final int connectTimeoutMs;
private final int heartbeatFrequencyMs; private final int heartbeatFrequencyMs;
private final int socketTimeoutMs; private final int socketTimeoutMs;
@ -993,11 +1059,21 @@ public MongoDbConnectorConfig(Configuration config) {
this.authProvider = config.getInstance(MongoDbConnectorConfig.AUTH_PROVIDER_CLASS, MongoDbAuthProvider.class); this.authProvider = config.getInstance(MongoDbConnectorConfig.AUTH_PROVIDER_CLASS, MongoDbAuthProvider.class);
this.sslEnabled = config.getBoolean(MongoDbConnectorConfig.SSL_ENABLED); this.sslEnabled = config.getBoolean(MongoDbConnectorConfig.SSL_ENABLED);
this.sslAllowInvalidHostnames = config.getBoolean(MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES); this.sslAllowInvalidHostnames = config.getBoolean(MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES);
this.sslKeyStore = config.getString(MongoDbConnectorConfig.SSL_KEYSTORE);
this.sslKeyStorePassword = config.getString(MongoDbConnectorConfig.SSL_KEYSTORE_PASSWORD);
this.sslKeyStoreType = config.getString(MongoDbConnectorConfig.SSL_KEYSTORE_TYPE);
this.sslTrustStore = config.getString(MongoDbConnectorConfig.SSL_TRUSTSTORE);
this.sslTrustStorePassword = config.getString(MongoDbConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
this.sslTrustStoreType = config.getString(MongoDbConnectorConfig.SSL_TRUSTSTORE_TYPE);
this.connectTimeoutMs = config.getInteger(MongoDbConnectorConfig.CONNECT_TIMEOUT_MS); this.connectTimeoutMs = config.getInteger(MongoDbConnectorConfig.CONNECT_TIMEOUT_MS);
this.heartbeatFrequencyMs = config.getInteger(MongoDbConnectorConfig.HEARTBEAT_FREQUENCY_MS); this.heartbeatFrequencyMs = config.getInteger(MongoDbConnectorConfig.HEARTBEAT_FREQUENCY_MS);
this.socketTimeoutMs = config.getInteger(MongoDbConnectorConfig.SOCKET_TIMEOUT_MS); this.socketTimeoutMs = config.getInteger(MongoDbConnectorConfig.SOCKET_TIMEOUT_MS);
this.serverSelectionTimeoutMs = config.getInteger(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS); this.serverSelectionTimeoutMs = config.getInteger(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS);
this.connectionString = resolveConnectionString(config); this.connectionString = resolveConnectionString(config);
this.user = config.getString(MongoDbConnectorConfig.USER);
this.password = config.getString(MongoDbConnectorConfig.PASSWORD);
this.authSource = config.getString(MongoDbConnectorConfig.AUTH_SOURCE);
// Other configuration // Other configuration
String snapshotModeValue = config.getString(MongoDbConnectorConfig.SNAPSHOT_MODE); String snapshotModeValue = config.getString(MongoDbConnectorConfig.SNAPSHOT_MODE);
@ -1028,29 +1104,6 @@ public MongoDbConnectorConfig(Configuration config) {
this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0); this.cursorMaxAwaitTimeMs = config.getInteger(MongoDbConnectorConfig.CURSOR_MAX_AWAIT_TIME_MS, 0);
} }
private static int validateHosts(Configuration config, Field field, ValidationOutput problems) {
String hosts = config.getString(field);
String connectionString = config.getString(CONNECTION_STRING);
if (hosts == null) {
return 0;
}
LOGGER.warn("Config property '{}' will be removed in the future, use '{}' instead", field.name(), CONNECTION_STRING.name());
if (connectionString != null) {
LOGGER.warn("Config property '{}' is ignored, property '{}' takes precedence", field.name(), CONNECTION_STRING.name());
return 0;
}
if (ConnectionStrings.parseFromHosts(hosts).isEmpty()) {
problems.accept(field, null, "Invalid host specification");
return 1;
}
return 0;
}
private static int validateChangeStreamPipeline(Configuration config, Field field, ValidationOutput problems) { private static int validateChangeStreamPipeline(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field); String value = config.getString(field);
@ -1214,6 +1267,18 @@ public ConnectionString getConnectionString() {
return connectionString; return connectionString;
} }
public String getUser() {
return user;
}
public String getPassword() {
return password;
}
public String getAuthSource() {
return authSource;
}
public int getCursorMaxAwaitTimeMs() { public int getCursorMaxAwaitTimeMs() {
return cursorMaxAwaitTimeMs; return cursorMaxAwaitTimeMs;
} }
@ -1230,6 +1295,34 @@ public boolean isSslAllowInvalidHostnames() {
return sslAllowInvalidHostnames; return sslAllowInvalidHostnames;
} }
public Optional<Path> getSslKeyStore() {
return Optional.ofNullable(sslKeyStore)
.filter(not(Strings::isNullOrBlank))
.map(Path::of);
}
public char[] getSslKeyStorePassword() {
return sslKeyStorePassword != null ? sslKeyStorePassword.toCharArray() : null;
}
public String getSslKeyStoreType() {
return sslKeyStoreType;
}
public Optional<Path> getSslTrustStore() {
return Optional.ofNullable(sslTrustStore)
.filter(not(Strings::isNullOrBlank))
.map(Path::of);
}
public char[] getSslTrustStorePassword() {
return sslTrustStorePassword != null ? sslTrustStorePassword.toCharArray() : null;
}
public String getSslTrustStoreType() {
return sslTrustStoreType;
}
public int getConnectTimeoutMs() { public int getConnectTimeoutMs() {
return connectTimeoutMs; return connectTimeoutMs;
} }

View File

@ -12,24 +12,24 @@
import io.debezium.connector.mongodb.MongoDbConnectorConfig; import io.debezium.connector.mongodb.MongoDbConnectorConfig;
public class DefaultMongoDbAuthProvider implements MongoDbAuthProvider { public class DefaultMongoDbAuthProvider implements MongoDbAuthProvider {
private String username;
private String password; private MongoDbConnectorConfig connectorConfig;
private String adminDbName;
@Override @Override
public void init(Configuration config) { public void init(Configuration config) {
username = config.getString(MongoDbConnectorConfig.USER); this.connectorConfig = new MongoDbConnectorConfig(config);
password = config.getString(MongoDbConnectorConfig.PASSWORD);
adminDbName = config.getString(MongoDbConnectorConfig.AUTH_SOURCE);
} }
@Override @Override
public Builder addAuthConfig(Builder settings) { public Builder addAuthConfig(Builder settings) {
// Use credential if provided as properties // Use credential if provided as properties
if (username != null || password != null) { var user = connectorConfig.getUser();
settings.credential(MongoCredential.createCredential(username, adminDbName, password.toCharArray())); var password = connectorConfig.getPassword();
} var authSource = connectorConfig.getAuthSource();
if (user != null || password != null) {
settings.credential(MongoCredential.createCredential(user, authSource, password.toCharArray()));
}
return settings; return settings;
} }
} }

View File

@ -8,21 +8,20 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.mongodb.ConnectionString; import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType; import com.mongodb.connection.ClusterType;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.MongoDbConnectorConfig; import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoUtils; import io.debezium.connector.mongodb.MongoUtils;
import io.debezium.connector.mongodb.connection.client.DefaultMongoDbClientFactory;
import io.debezium.connector.mongodb.connection.client.MongoDbClientFactory;
/** /**
* @author Randall Hauch * @author Randall Hauch
@ -33,36 +32,14 @@ public class MongoDbConnectionContext {
private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectionContext.class); private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectionContext.class);
private final MongoDbConnectorConfig connectorConfig; private final MongoDbConnectorConfig connectorConfig;
private final MongoClientSettings clientSettings; private final MongoDbClientFactory clientFactory;
/** /**
* @param config the configuration * @param config the configuration
*/ */
public MongoDbConnectionContext(Configuration config) { public MongoDbConnectionContext(Configuration config) {
this.connectorConfig = new MongoDbConnectorConfig(config); this.connectorConfig = new MongoDbConnectorConfig(config);
connectorConfig.getAuthProvider().init(config); this.clientFactory = new DefaultMongoDbClientFactory(config);
var settings = MongoClientSettings.builder()
.applyToSocketSettings(builder -> builder
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToClusterSettings(
builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToServerSettings(builder -> builder
.heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS))
.applyToSocketSettings(builder -> builder
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToClusterSettings(builder -> builder
.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToSslSettings(builder -> builder
.enabled(connectorConfig.isSslEnabled())
.invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames()));
this.clientSettings = connectorConfig.getAuthProvider()
.addAuthConfig(settings)
.applyConnectionString(connectorConfig.getConnectionString())
.build();
} }
public MongoDbConnectorConfig getConnectorConfig() { public MongoDbConnectorConfig getConnectorConfig() {
@ -88,7 +65,7 @@ public String getMaskedConnectionString() {
* @return mongo client * @return mongo client
*/ */
public MongoClient getMongoClient() { public MongoClient getMongoClient() {
return MongoClients.create(clientSettings); return clientFactory.getMongoClient();
} }
public ClusterDescription getClusterDescription() { public ClusterDescription getClusterDescription() {

View File

@ -0,0 +1,65 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.connection.client;
import java.util.concurrent.TimeUnit;
import com.mongodb.MongoClientSettings;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.connection.MongoDbAuthProvider;
public class DefaultMongoDbClientFactory implements MongoDbClientFactory {
private final MongoDbConnectorConfig connectorConfig;
private final MongoClientSettings clientSettings;
private final MongoDbAuthProvider authProvider;
public DefaultMongoDbClientFactory(Configuration config) {
this.connectorConfig = new MongoDbConnectorConfig(config);
this.authProvider = connectorConfig.getAuthProvider();
this.authProvider.init(config);
this.clientSettings = createMongoClientSettings();
}
@Override
public MongoClientSettings getMongoClientSettings() {
return clientSettings;
}
protected MongoClientSettings createMongoClientSettings() {
var sslContext = MongoDbClientFactory.createSSLContext(connectorConfig);
// 1. apply property configuration
var settings = MongoClientSettings.builder()
.applyToSocketSettings(builder -> builder
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToClusterSettings(
builder -> builder.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToServerSettings(builder -> builder
.heartbeatFrequency(connectorConfig.getHeartbeatFrequencyMs(), TimeUnit.MILLISECONDS))
.applyToSocketSettings(builder -> builder
.connectTimeout(connectorConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS)
.readTimeout(connectorConfig.getSocketTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToClusterSettings(builder -> builder
.serverSelectionTimeout(connectorConfig.getServerSelectionTimeoutMs(), TimeUnit.MILLISECONDS))
.applyToSslSettings(builder -> builder
.enabled(connectorConfig.isSslEnabled())
.invalidHostNameAllowed(connectorConfig.isSslAllowInvalidHostnames())
.context(sslContext));
// 2. apply auth provider configuration
authProvider.addAuthConfig(settings);
// 3. apply connection string configuration
settings.applyConnectionString(connectorConfig.getConnectionString());
// build
return settings.build();
}
}

View File

@ -0,0 +1,124 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.connection.client;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
public interface MongoDbClientFactory {
Logger LOGGER = LoggerFactory.getLogger(MongoDbClientFactory.class);
/**
* Creates {@link MongoClientSettings} used to obtain {@link MongoClient} instances
*
* @return client settings
*/
MongoClientSettings getMongoClientSettings();
/**
* Creates native {@link MongoClient} instance
*
* @return mongo client
*/
default MongoClient getMongoClient() {
var clientSettings = getMongoClientSettings();
return MongoClients.create(clientSettings);
}
/**
* Creates keystore
*
* @param type keyfile type
* @param path keyfile path
* @param password keyfile password
* @return keystore with loaded keys
*/
static KeyStore loadKeyStore(String type, Path path, char[] password) {
try (var keys = Files.newInputStream(path)) {
var ks = KeyStore.getInstance(type);
ks.load(keys, password);
return ks;
}
catch (IOException | KeyStoreException | NoSuchAlgorithmException | CertificateException e) {
LOGGER.error("Unable to read key file from '{}'", path);
throw new DebeziumException(e);
}
}
/**
* Creates SSL context initialized with custom
*
* @param connectorConfig connector configuration
* @return ssl context
*/
static SSLContext createSSLContext(MongoDbConnectorConfig connectorConfig) {
try {
var ksPath = connectorConfig.getSslKeyStore();
var ksPass = connectorConfig.getSslKeyStorePassword();
var ksType = connectorConfig.getSslKeyStoreType();
KeyManager[] keyManagers = null;
// Create keystore when configured
if (ksPath.isPresent()) {
var ks = loadKeyStore(ksType, ksPath.get(), ksPass);
var kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, ksPass);
keyManagers = kmf.getKeyManagers();
}
// Create truststore when configured
var tsPath = connectorConfig.getSslTrustStore();
var tsPass = connectorConfig.getSslTrustStorePassword();
var tsType = connectorConfig.getSslTrustStoreType();
TrustManager[] trustManagers = null;
if (tsPath.isPresent()) {
var ts = loadKeyStore(tsType, tsPath.get(), tsPass);
var tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ts);
trustManagers = tmf.getTrustManagers();
}
// Create and initialize SSL context
var context = SSLContext.getInstance("TLS");
context.init(keyManagers, trustManagers, null);
return context;
}
catch (NoSuchAlgorithmException | KeyStoreException | UnrecoverableKeyException e) {
LOGGER.error("Unable to crate KeyStore/TrustStore manager factory");
throw new DebeziumException(e);
}
catch (KeyManagementException e) {
LOGGER.error("Unable to initialize SSL context");
throw new DebeziumException(e);
}
}
}