DBZ-4832 Don't set truststore/keystore parameters to system variables

This commit is contained in:
harveyyue 2022-03-13 15:08:58 +08:00 committed by Gunnar Morling
parent e1b54c48a8
commit 044e454e0a
6 changed files with 156 additions and 99 deletions

View File

@ -6,7 +6,6 @@
package io.debezium.connector.mysql;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
@ -52,7 +51,7 @@ public class MySqlConnection extends JdbcConnection {
private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = "SHOW SESSION STATUS LIKE 'Ssl_version'";
private static final String QUOTED_CHARACTER = "`";
protected static final String URL_PATTERN = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
protected static final String URL_PATTERN = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
private final Map<String, String> originalSystemProperties = new HashMap<>();
private final MySqlConnectionConfiguration connectionConfig;
@ -79,19 +78,6 @@ public MySqlConnection(MySqlConnectionConfiguration connectionConfig) {
this(connectionConfig, new MysqlTextProtocolFieldReader(null));
}
@Override
public synchronized Connection connection(boolean executeOnConnect) throws SQLException {
if (!isConnected() && connectionConfig.sslModeEnabled()) {
originalSystemProperties.clear();
// Set the System properties for SSL for the MySQL driver ...
setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true);
setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false);
setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true);
setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, false);
}
return super.connection(executeOnConnect);
}
@Override
public void close() throws SQLException {
try {
@ -472,6 +458,10 @@ protected Map<String, DatabaseLocales> readDatabaseCollations() {
}
}
public MySqlConnectionConfiguration connectionConfig() {
return connectionConfig;
}
public String connectionString() {
return connectionString(URL_PATTERN);
}
@ -502,7 +492,22 @@ public MySqlConnectionConfiguration(Configuration config) {
final Builder jdbcConfigBuilder = dbConfig
.edit()
.with("connectTimeout", Long.toString(getConnectionTimeout().toMillis()))
.with("useSSL", Boolean.toString(useSSL));
.with("sslMode", sslMode().getValue());
if (useSSL) {
if (!Strings.isNullOrBlank(sslTrustStore())) {
jdbcConfigBuilder.with("trustCertificateKeyStoreUrl", "file:" + sslTrustStore());
}
if (sslTrustStorePassword() != null) {
jdbcConfigBuilder.with("trustCertificateKeyStorePassword", String.valueOf(sslTrustStorePassword()));
}
if (!Strings.isNullOrBlank(sslKeyStore())) {
jdbcConfigBuilder.with("clientCertificateKeyStoreUrl", "file:" + sslKeyStore());
}
if (sslKeyStorePassword() != null) {
jdbcConfigBuilder.with("clientCertificateKeyStorePassword", String.valueOf(sslKeyStorePassword()));
}
}
final String legacyDateTime = dbConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
if (legacyDateTime == null) {
@ -575,6 +580,24 @@ public boolean sslModeEnabled() {
return sslMode() != SecureConnectionMode.DISABLED;
}
public String sslKeyStore() {
return config.getString(MySqlConnectorConfig.SSL_KEYSTORE);
}
public char[] sslKeyStorePassword() {
String password = config.getString(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
}
public String sslTrustStore() {
return config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE);
}
public char[] sslTrustStorePassword() {
String password = config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
}
public Duration getConnectionTimeout() {
return Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
}

View File

@ -7,9 +7,7 @@
import static io.debezium.util.Strings.isNullOrEmpty;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.KeyStoreException;
@ -1009,19 +1007,17 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connecto
SSLMode sslMode = sslModeFor(connectorConfig.sslMode());
LOGGER.info("Enable ssl " + sslMode + " mode for connector " + connectorConfig.getLogicalName());
// Keystore settings can be passed via system properties too so we need to read them
final String password = System.getProperty("javax.net.ssl.keyStorePassword");
final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
final String trustPassword = System.getProperty("javax.net.ssl.trustStorePassword");
final String trustFilename = System.getProperty("javax.net.ssl.trustStore");
final char[] keyPasswordArray = connection.connectionConfig().sslKeyStorePassword();
final String keyFilename = connection.connectionConfig().sslKeyStore();
final char[] trustPasswordArray = connection.connectionConfig().sslTrustStorePassword();
final String trustFilename = connection.connectionConfig().sslTrustStore();
KeyManager[] keyManagers = null;
if (keyFilename != null) {
final char[] passwordArray = (password == null) ? null : password.toCharArray();
try {
KeyStore ks = loadKeyStore(keyFilename, passwordArray);
KeyStore ks = connection.loadKeyStore(keyFilename, keyPasswordArray);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
kmf.init(ks, passwordArray);
kmf.init(ks, keyPasswordArray);
keyManagers = kmf.getKeyManagers();
}
@ -1033,7 +1029,7 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connecto
try {
KeyStore ks = null;
if (trustFilename != null) {
ks = loadKeyStore(trustFilename, (trustPassword == null) ? null : trustPassword.toCharArray());
ks = connection.loadKeyStore(trustFilename, trustPasswordArray);
}
if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) {
@ -1081,17 +1077,6 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
return null;
}
private KeyStore loadKeyStore(String filePath, char[] passwordArray) {
try (InputStream in = new FileInputStream(filePath)) {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(in, passwordArray);
return ks;
}
catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException e) {
throw new DebeziumException("Error loading keystore", e);
}
}
private void logStreamingSourceState() {
logStreamingSourceState(Level.ERROR);
}

View File

@ -7,7 +7,6 @@
import static io.debezium.util.Strings.isNullOrEmpty;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.security.GeneralSecurityException;
@ -36,6 +35,7 @@
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.apache.kafka.connect.errors.ConnectException;
@ -66,6 +66,7 @@
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.EventDataDeserializationExceptionData;
@ -1217,65 +1218,73 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlJdbcContext connectionCo
String acceptedTlsVersion = connectionContext.getSessionVariableForSslVersion();
if (!isNullOrEmpty(acceptedTlsVersion)) {
SSLMode sslMode = sslModeFor(connectionContext.sslMode());
logger.info("Enable ssl " + sslMode + " mode for connector " + context.getConnectorConfig().getLogicalName());
// Keystore settings can be passed via system properties too so we need to read them
final String password = System.getProperty("javax.net.ssl.keyStorePassword");
final String keyFilename = System.getProperty("javax.net.ssl.keyStore");
final char[] keyPasswordArray = context.getConnectionContext().sslKeyStorePassword();
final String keyFilename = context.getConnectionContext().sslKeyStore();
final char[] trustPasswordArray = context.getConnectionContext().sslTrustStorePassword();
final String trustFilename = context.getConnectionContext().sslTrustStore();
KeyManager[] keyManagers = null;
if (keyFilename != null) {
final char[] passwordArray = (password == null) ? null : password.toCharArray();
try {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(keyFilename), passwordArray);
KeyStore ks = context.getConnectionContext().jdbc().loadKeyStore(keyFilename, keyPasswordArray);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509");
kmf.init(ks, passwordArray);
kmf.init(ks, keyPasswordArray);
keyManagers = kmf.getKeyManagers();
}
catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException e) {
throw new ConnectException("Could not load keystore", e);
catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException e) {
throw new DebeziumException("Could not load keystore", e);
}
}
TrustManager[] trustManagers;
try {
KeyStore ks = null;
if (trustFilename != null) {
ks = context.getConnectionContext().jdbc().loadKeyStore(trustFilename, trustPasswordArray);
}
if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) {
trustManagers = new TrustManager[]{
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
};
}
else {
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(ks);
trustManagers = tmf.getTrustManagers();
}
}
catch (KeyStoreException | NoSuchAlgorithmException e) {
throw new DebeziumException("Could not load truststore", e);
}
// DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
// the accepted TLS version is passed to the constructed factory
if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
final KeyManager[] finalKMS = keyManagers;
return new DefaultSSLSocketFactory(acceptedTlsVersion) {
final KeyManager[] finalKMS = keyManagers;
return new DefaultSSLSocketFactory(acceptedTlsVersion) {
@Override
protected void initSSLContext(SSLContext sc)
throws GeneralSecurityException {
sc.init(finalKMS, new TrustManager[]{
new X509TrustManager() {
@Override
public void checkClientTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
@Override
public void checkServerTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}, null);
}
};
}
else {
return new DefaultSSLSocketFactory(acceptedTlsVersion);
}
@Override
protected void initSSLContext(SSLContext sc) throws GeneralSecurityException {
sc.init(finalKMS, trustManagers, null);
}
};
}
return null;

View File

@ -40,7 +40,7 @@
*/
public class MySqlJdbcContext implements AutoCloseable {
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}";
protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode";
private static final String SQL_SHOW_SYSTEM_VARIABLES = "SHOW VARIABLES";
@ -73,7 +73,22 @@ public MySqlJdbcContext(MySqlConnectorConfig config) {
Builder jdbcConfigBuilder = jdbcConfig
.edit()
.with("connectTimeout", Long.toString(config.getConnectionTimeout().toMillis()))
.with("useSSL", Boolean.toString(useSSL));
.with("sslMode", sslMode().getValue());
if (useSSL) {
if (!Strings.isNullOrBlank(sslTrustStore())) {
jdbcConfigBuilder.with("trustCertificateKeyStoreUrl", "file:" + sslTrustStore());
}
if (sslTrustStorePassword() != null) {
jdbcConfigBuilder.with("trustCertificateKeyStorePassword", String.valueOf(sslTrustStorePassword()));
}
if (!Strings.isNullOrBlank(sslKeyStore())) {
jdbcConfigBuilder.with("clientCertificateKeyStoreUrl", "file:" + sslKeyStore());
}
if (sslKeyStorePassword() != null) {
jdbcConfigBuilder.with("clientCertificateKeyStorePassword", String.valueOf(sslKeyStorePassword()));
}
}
final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
if (legacyDateTime == null) {
@ -126,6 +141,24 @@ public boolean sslModeEnabled() {
return sslMode() != SecureConnectionMode.DISABLED;
}
public String sslKeyStore() {
return config.getString(MySqlConnectorConfig.SSL_KEYSTORE);
}
public char[] sslKeyStorePassword() {
String password = config.getString(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
}
public String sslTrustStore() {
return config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE);
}
public char[] sslTrustStorePassword() {
String password = config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
return Strings.isNullOrBlank(password) ? null : password.toCharArray();
}
public EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() {
String mode = config.getString(CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE);
if (mode == null) {
@ -139,17 +172,6 @@ public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() {
return EventProcessingFailureHandlingMode.parse(mode);
}
public void start() {
if (sslModeEnabled()) {
originalSystemProperties.clear();
// Set the System properties for SSL for the MySQL driver ...
setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true);
setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false);
setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true);
setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, false);
}
}
public void shutdown() {
try {
jdbc.close();
@ -327,7 +349,6 @@ public Map<String, String> readMySqlSystemVariables() {
private Map<String, String> querySystemVariables(String statement) {
Map<String, String> variables = new HashMap<>();
try {
start();
jdbc.connect().query(statement, rs -> {
while (rs.next()) {
String varName = rs.getString(1);
@ -356,7 +377,6 @@ private Map<String, String> querySystemVariables(String statement) {
protected Map<String, DatabaseLocales> readDatabaseCollations() {
logger.debug("Reading default database charsets");
try {
start();
return jdbc.connect().queryAndMap("SELECT schema_name, default_character_set_name, default_collation_name FROM information_schema.schemata", rs -> {
final Map<String, DatabaseLocales> charsets = new HashMap<>();
while (rs.next()) {

View File

@ -247,7 +247,6 @@ protected SnapshotMode snapshotMode() {
}
public void start() {
connectionContext.start();
// Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point
dbSchema().start();
}

View File

@ -5,7 +5,14 @@
*/
package io.debezium.jdbc;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@ -1510,4 +1517,18 @@ public String quotedTableIdString(TableId tableId) {
public String quotedColumnIdString(String columnName) {
return openingQuoteCharacter + columnName + closingQuoteCharacter;
}
/**
* Read JKS type keystore/truststore file according related password.
*/
public KeyStore loadKeyStore(String filePath, char[] passwordArray) {
try (InputStream in = new FileInputStream(filePath)) {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(in, passwordArray);
return ks;
}
catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException e) {
throw new DebeziumException("Error loading keystore", e);
}
}
}