From 044e454e0ad36aa6dd1374229f929cc7cc990111 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Sun, 13 Mar 2022 15:08:58 +0800 Subject: [PATCH] DBZ-4832 Don't set truststore/keystore parameters to system variables --- .../connector/mysql/MySqlConnection.java | 55 ++++++++--- .../MySqlStreamingChangeEventSource.java | 29 ++---- .../connector/mysql/legacy/BinlogReader.java | 99 ++++++++++--------- .../mysql/legacy/MySqlJdbcContext.java | 50 +++++++--- .../mysql/legacy/MySqlTaskContext.java | 1 - .../java/io/debezium/jdbc/JdbcConnection.java | 21 ++++ 6 files changed, 156 insertions(+), 99 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 22dc4fe91..5521c936e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -6,7 +6,6 @@ package io.debezium.connector.mysql; -import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; @@ -52,7 +51,7 @@ public class MySqlConnection extends JdbcConnection { private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = "SHOW SESSION STATUS LIKE 'Ssl_version'"; private static final String QUOTED_CHARACTER = "`"; - protected static final String URL_PATTERN = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}"; + protected static final String URL_PATTERN = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}"; private final Map originalSystemProperties = new HashMap<>(); private final MySqlConnectionConfiguration connectionConfig; @@ -79,19 +78,6 @@ public MySqlConnection(MySqlConnectionConfiguration connectionConfig) { this(connectionConfig, new MysqlTextProtocolFieldReader(null)); } - @Override - public synchronized Connection connection(boolean executeOnConnect) throws SQLException { - if (!isConnected() && connectionConfig.sslModeEnabled()) { - originalSystemProperties.clear(); - // Set the System properties for SSL for the MySQL driver ... - setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true); - setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false); - setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true); - setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, false); - } - return super.connection(executeOnConnect); - } - @Override public void close() throws SQLException { try { @@ -472,6 +458,10 @@ protected Map readDatabaseCollations() { } } + public MySqlConnectionConfiguration connectionConfig() { + return connectionConfig; + } + public String connectionString() { return connectionString(URL_PATTERN); } @@ -502,7 +492,22 @@ public MySqlConnectionConfiguration(Configuration config) { final Builder jdbcConfigBuilder = dbConfig .edit() .with("connectTimeout", Long.toString(getConnectionTimeout().toMillis())) - .with("useSSL", Boolean.toString(useSSL)); + .with("sslMode", sslMode().getValue()); + + if (useSSL) { + if (!Strings.isNullOrBlank(sslTrustStore())) { + jdbcConfigBuilder.with("trustCertificateKeyStoreUrl", "file:" + sslTrustStore()); + } + if (sslTrustStorePassword() != null) { + jdbcConfigBuilder.with("trustCertificateKeyStorePassword", String.valueOf(sslTrustStorePassword())); + } + if (!Strings.isNullOrBlank(sslKeyStore())) { + jdbcConfigBuilder.with("clientCertificateKeyStoreUrl", "file:" + sslKeyStore()); + } + if (sslKeyStorePassword() != null) { + jdbcConfigBuilder.with("clientCertificateKeyStorePassword", String.valueOf(sslKeyStorePassword())); + } + } final String legacyDateTime = dbConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME); if (legacyDateTime == null) { @@ -575,6 +580,24 @@ public boolean sslModeEnabled() { return sslMode() != SecureConnectionMode.DISABLED; } + public String sslKeyStore() { + return config.getString(MySqlConnectorConfig.SSL_KEYSTORE); + } + + public char[] sslKeyStorePassword() { + String password = config.getString(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD); + return Strings.isNullOrBlank(password) ? null : password.toCharArray(); + } + + public String sslTrustStore() { + return config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE); + } + + public char[] sslTrustStorePassword() { + String password = config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD); + return Strings.isNullOrBlank(password) ? null : password.toCharArray(); + } + public Duration getConnectionTimeout() { return Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS)); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index d549e9659..4d0b23513 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -7,9 +7,7 @@ import static io.debezium.util.Strings.isNullOrEmpty; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.security.GeneralSecurityException; import java.security.KeyStore; import java.security.KeyStoreException; @@ -1009,19 +1007,17 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connecto SSLMode sslMode = sslModeFor(connectorConfig.sslMode()); LOGGER.info("Enable ssl " + sslMode + " mode for connector " + connectorConfig.getLogicalName()); - // Keystore settings can be passed via system properties too so we need to read them - final String password = System.getProperty("javax.net.ssl.keyStorePassword"); - final String keyFilename = System.getProperty("javax.net.ssl.keyStore"); - final String trustPassword = System.getProperty("javax.net.ssl.trustStorePassword"); - final String trustFilename = System.getProperty("javax.net.ssl.trustStore"); + final char[] keyPasswordArray = connection.connectionConfig().sslKeyStorePassword(); + final String keyFilename = connection.connectionConfig().sslKeyStore(); + final char[] trustPasswordArray = connection.connectionConfig().sslTrustStorePassword(); + final String trustFilename = connection.connectionConfig().sslTrustStore(); KeyManager[] keyManagers = null; if (keyFilename != null) { - final char[] passwordArray = (password == null) ? null : password.toCharArray(); try { - KeyStore ks = loadKeyStore(keyFilename, passwordArray); + KeyStore ks = connection.loadKeyStore(keyFilename, keyPasswordArray); KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509"); - kmf.init(ks, passwordArray); + kmf.init(ks, keyPasswordArray); keyManagers = kmf.getKeyManagers(); } @@ -1033,7 +1029,7 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlConnectorConfig connecto try { KeyStore ks = null; if (trustFilename != null) { - ks = loadKeyStore(trustFilename, (trustPassword == null) ? null : trustPassword.toCharArray()); + ks = connection.loadKeyStore(trustFilename, trustPasswordArray); } if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) { @@ -1081,17 +1077,6 @@ protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { return null; } - private KeyStore loadKeyStore(String filePath, char[] passwordArray) { - try (InputStream in = new FileInputStream(filePath)) { - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(in, passwordArray); - return ks; - } - catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException e) { - throw new DebeziumException("Error loading keystore", e); - } - } - private void logStreamingSourceState() { logStreamingSourceState(Level.ERROR); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/BinlogReader.java index e7a6ebe9c..a526539f0 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/BinlogReader.java @@ -7,7 +7,6 @@ import static io.debezium.util.Strings.isNullOrEmpty; -import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.security.GeneralSecurityException; @@ -36,6 +35,7 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; import org.apache.kafka.connect.errors.ConnectException; @@ -66,6 +66,7 @@ import com.github.shyiko.mysql.binlog.network.SSLMode; import com.github.shyiko.mysql.binlog.network.SSLSocketFactory; +import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode; import io.debezium.config.Configuration; import io.debezium.connector.mysql.EventDataDeserializationExceptionData; @@ -1217,65 +1218,73 @@ private SSLSocketFactory getBinlogSslSocketFactory(MySqlJdbcContext connectionCo String acceptedTlsVersion = connectionContext.getSessionVariableForSslVersion(); if (!isNullOrEmpty(acceptedTlsVersion)) { SSLMode sslMode = sslModeFor(connectionContext.sslMode()); + logger.info("Enable ssl " + sslMode + " mode for connector " + context.getConnectorConfig().getLogicalName()); - // Keystore settings can be passed via system properties too so we need to read them - final String password = System.getProperty("javax.net.ssl.keyStorePassword"); - final String keyFilename = System.getProperty("javax.net.ssl.keyStore"); + final char[] keyPasswordArray = context.getConnectionContext().sslKeyStorePassword(); + final String keyFilename = context.getConnectionContext().sslKeyStore(); + final char[] trustPasswordArray = context.getConnectionContext().sslTrustStorePassword(); + final String trustFilename = context.getConnectionContext().sslTrustStore(); KeyManager[] keyManagers = null; if (keyFilename != null) { - final char[] passwordArray = (password == null) ? null : password.toCharArray(); try { - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(new FileInputStream(keyFilename), passwordArray); + KeyStore ks = context.getConnectionContext().jdbc().loadKeyStore(keyFilename, keyPasswordArray); KeyManagerFactory kmf = KeyManagerFactory.getInstance("NewSunX509"); - kmf.init(ks, passwordArray); + kmf.init(ks, keyPasswordArray); keyManagers = kmf.getKeyManagers(); } - catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException e) { - throw new ConnectException("Could not load keystore", e); + catch (KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException e) { + throw new DebeziumException("Could not load keystore", e); } } + TrustManager[] trustManagers; + try { + KeyStore ks = null; + if (trustFilename != null) { + ks = context.getConnectionContext().jdbc().loadKeyStore(trustFilename, trustPasswordArray); + } + if (ks == null && (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED)) { + trustManagers = new TrustManager[]{ + new X509TrustManager() { + + @Override + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) + throws CertificateException { + } + + @Override + public void checkServerTrusted(X509Certificate[] x509Certificates, String s) + throws CertificateException { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + }; + } + else { + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ks); + trustManagers = tmf.getTrustManagers(); + } + } + catch (KeyStoreException | NoSuchAlgorithmException e) { + throw new DebeziumException("Could not load truststore", e); + } // DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that // the accepted TLS version is passed to the constructed factory - if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) { - final KeyManager[] finalKMS = keyManagers; - return new DefaultSSLSocketFactory(acceptedTlsVersion) { + final KeyManager[] finalKMS = keyManagers; + return new DefaultSSLSocketFactory(acceptedTlsVersion) { - @Override - protected void initSSLContext(SSLContext sc) - throws GeneralSecurityException { - sc.init(finalKMS, new TrustManager[]{ - new X509TrustManager() { - - @Override - public void checkClientTrusted( - X509Certificate[] x509Certificates, - String s) - throws CertificateException { - } - - @Override - public void checkServerTrusted( - X509Certificate[] x509Certificates, - String s) - throws CertificateException { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } - }, null); - } - }; - } - else { - return new DefaultSSLSocketFactory(acceptedTlsVersion); - } + @Override + protected void initSSLContext(SSLContext sc) throws GeneralSecurityException { + sc.init(finalKMS, trustManagers, null); + } + }; } return null; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlJdbcContext.java index e0ade29b1..2f2e8c699 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlJdbcContext.java @@ -40,7 +40,7 @@ */ public class MySqlJdbcContext implements AutoCloseable { - protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}"; + protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=${connectTimeout}"; protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode"; private static final String SQL_SHOW_SYSTEM_VARIABLES = "SHOW VARIABLES"; @@ -73,7 +73,22 @@ public MySqlJdbcContext(MySqlConnectorConfig config) { Builder jdbcConfigBuilder = jdbcConfig .edit() .with("connectTimeout", Long.toString(config.getConnectionTimeout().toMillis())) - .with("useSSL", Boolean.toString(useSSL)); + .with("sslMode", sslMode().getValue()); + + if (useSSL) { + if (!Strings.isNullOrBlank(sslTrustStore())) { + jdbcConfigBuilder.with("trustCertificateKeyStoreUrl", "file:" + sslTrustStore()); + } + if (sslTrustStorePassword() != null) { + jdbcConfigBuilder.with("trustCertificateKeyStorePassword", String.valueOf(sslTrustStorePassword())); + } + if (!Strings.isNullOrBlank(sslKeyStore())) { + jdbcConfigBuilder.with("clientCertificateKeyStoreUrl", "file:" + sslKeyStore()); + } + if (sslKeyStorePassword() != null) { + jdbcConfigBuilder.with("clientCertificateKeyStorePassword", String.valueOf(sslKeyStorePassword())); + } + } final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME); if (legacyDateTime == null) { @@ -126,6 +141,24 @@ public boolean sslModeEnabled() { return sslMode() != SecureConnectionMode.DISABLED; } + public String sslKeyStore() { + return config.getString(MySqlConnectorConfig.SSL_KEYSTORE); + } + + public char[] sslKeyStorePassword() { + String password = config.getString(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD); + return Strings.isNullOrBlank(password) ? null : password.toCharArray(); + } + + public String sslTrustStore() { + return config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE); + } + + public char[] sslTrustStorePassword() { + String password = config.getString(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD); + return Strings.isNullOrBlank(password) ? null : password.toCharArray(); + } + public EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() { String mode = config.getString(CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE); if (mode == null) { @@ -139,17 +172,6 @@ public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() { return EventProcessingFailureHandlingMode.parse(mode); } - public void start() { - if (sslModeEnabled()) { - originalSystemProperties.clear(); - // Set the System properties for SSL for the MySQL driver ... - setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true); - setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false); - setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true); - setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, false); - } - } - public void shutdown() { try { jdbc.close(); @@ -327,7 +349,6 @@ public Map readMySqlSystemVariables() { private Map querySystemVariables(String statement) { Map variables = new HashMap<>(); try { - start(); jdbc.connect().query(statement, rs -> { while (rs.next()) { String varName = rs.getString(1); @@ -356,7 +377,6 @@ private Map querySystemVariables(String statement) { protected Map readDatabaseCollations() { logger.debug("Reading default database charsets"); try { - start(); return jdbc.connect().queryAndMap("SELECT schema_name, default_character_set_name, default_collation_name FROM information_schema.schemata", rs -> { final Map charsets = new HashMap<>(); while (rs.next()) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlTaskContext.java index e840de9a4..f4962b231 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/MySqlTaskContext.java @@ -247,7 +247,6 @@ protected SnapshotMode snapshotMode() { } public void start() { - connectionContext.start(); // Start the MySQL database history, which simply starts up resources but does not recover the history to a specific point dbSchema().start(); } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 8378252f7..120c5fcee 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -5,7 +5,14 @@ */ package io.debezium.jdbc; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.InvocationTargetException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -1510,4 +1517,18 @@ public String quotedTableIdString(TableId tableId) { public String quotedColumnIdString(String columnName) { return openingQuoteCharacter + columnName + closingQuoteCharacter; } + + /** + * Read JKS type keystore/truststore file according related password. + */ + public KeyStore loadKeyStore(String filePath, char[] passwordArray) { + try (InputStream in = new FileInputStream(filePath)) { + KeyStore ks = KeyStore.getInstance("JKS"); + ks.load(in, passwordArray); + return ks; + } + catch (KeyStoreException | IOException | CertificateException | NoSuchAlgorithmException e) { + throw new DebeziumException("Error loading keystore", e); + } + } }