DBZ-1208 Misc. clean-up;

* Adding reference to source of logic for setting up SSL socket factory
* Making method side-effect free
* Adding Jordon to COPYRIGHT.txt
This commit is contained in:
Gunnar Morling 2019-04-10 11:39:10 +02:00
parent 9ae2ebe329
commit f04147d67d
2 changed files with 51 additions and 43 deletions

View File

@ -39,6 +39,7 @@ Ivan Kovbas
Ivan Vucina
Jiri Pechanec
Jon Casstevens
Jordan Bragg
Josh Stanfield
Joy Gao
Jure Kajzer

View File

@ -5,8 +5,8 @@
*/
package io.debezium.connector.mysql;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
import io.debezium.util.Strings;
import static io.debezium.util.Strings.isNullOrEmpty;
import java.io.IOException;
import java.io.Serializable;
import java.security.GeneralSecurityException;
@ -26,6 +26,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.event.Level;
@ -50,7 +51,9 @@
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.DefaultSSLSocketFactory;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
@ -60,10 +63,9 @@
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import static io.debezium.util.Strings.isNullOrEmpty;
/**
* A component that reads the binlog of a MySQL server, and records any schema changes in {@link MySqlSchema}.
*
@ -193,7 +195,10 @@ public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acce
client.setServerId(serverId);
client.setSSLMode(sslModeFor(connectionContext.sslMode()));
if (connectionContext.sslModeEnabled()) {
setBinlogSSLSocketFactory();
SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectionContext);
if (sslSocketFactory != null) {
client.setSslSocketFactory(sslSocketFactory);
}
}
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
client.setKeepAliveInterval(context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS));
@ -959,7 +964,7 @@ protected void prepareTransaction(Event event) throws InterruptedException {
// do nothing
}
protected SSLMode sslModeFor(SecureConnectionMode mode) {
protected static SSLMode sslModeFor(SecureConnectionMode mode) {
switch (mode) {
case DISABLED:
return SSLMode.DISABLED;
@ -1062,48 +1067,50 @@ public BinlogPosition getCurrentBinlogPosition() {
return new BinlogPosition(client.getBinlogFilename(), client.getBinlogPosition());
}
protected void setBinlogSSLSocketFactory() {
if (connectionContext.jdbc() != null) {
String acceptedTLSVersion = connectionContext.getSessionVariableForSslVersion();
if (!isNullOrEmpty(acceptedTLSVersion)) {
SSLMode sslMode = sslModeFor(connectionContext.sslMode());
private static SSLSocketFactory getBinlogSslSocketFactory(MySqlJdbcContext connectionContext) {
String acceptedTlsVersion = connectionContext.getSessionVariableForSslVersion();
if (!isNullOrEmpty(acceptedTlsVersion)) {
SSLMode sslMode = sslModeFor(connectionContext.sslMode());
if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
client.setSslSocketFactory(new DefaultSSLSocketFactory(acceptedTLSVersion) {
// DBZ-1208 Resembles the logic from the upstream BinaryLogClient, only that
// the accepted TLS version is passed to the constructed factory
if (sslMode == SSLMode.PREFERRED || sslMode == SSLMode.REQUIRED) {
return new DefaultSSLSocketFactory(acceptedTlsVersion) {
@Override
protected void initSSLContext(SSLContext sc)
throws GeneralSecurityException {
sc.init(null, new TrustManager[]{
new X509TrustManager() {
@Override
protected void initSSLContext(SSLContext sc)
throws GeneralSecurityException {
sc.init(null, new TrustManager[]{
new X509TrustManager() {
@Override
public void checkClientTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
@Override
public void checkServerTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
@Override
public void checkClientTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
}, null);
}
});
}
else {
client.setSslSocketFactory(new DefaultSSLSocketFactory(acceptedTLSVersion));
}
@Override
public void checkServerTrusted(
X509Certificate[] x509Certificates,
String s)
throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}, null);
}
};
}
else {
return new DefaultSSLSocketFactory(acceptedTlsVersion);
}
}
return null;
}
}