DBZ-99 Added support for MySQL connector to connect securely to MySQL

Changed the MySQL connector to have several new configuration properties for setting up the SSL key store and trust store (which can be used in place of System or JDK properties) used for MySQL secure connections, and another property to specify what kind of SSL connection be used.

Modified several integration tests to ensure all MySQL connections are made with `useSSL=false`.
This commit is contained in:
Randall Hauch 2016-08-24 12:35:05 -05:00
parent 40318f87a3
commit ce2b2db80c
15 changed files with 350 additions and 48 deletions

View File

@ -36,7 +36,9 @@
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
@ -70,6 +72,7 @@ public BinlogReader(MySqlTaskContext context) {
// Set up the log reader ...
client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password());
client.setServerId(context.serverId());
client.setSSLMode(sslModeFor(context.sslMode()));
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
client.registerEventListener(this::handleEvent);
client.registerLifecycleListener(new ReaderThreadLifecycleListener());
@ -408,6 +411,22 @@ protected void handleDelete(Event event) throws InterruptedException {
}
}
protected SSLMode sslModeFor( SecureConnectionMode mode ) {
switch(mode) {
case DISABLED:
return SSLMode.DISABLED;
case PREFERRED:
return SSLMode.PREFERRED;
case REQUIRED:
return SSLMode.REQUIRED;
case VERIFY_CA:
return SSLMode.VERIFY_CA;
case VERIFY_IDENTITY:
return SSLMode.VERIFY_IDENTITY;
}
return null;
}
protected final class ReaderThreadLifecycleListener implements LifecycleListener {
@Override
public void onDisconnect(BinaryLogClient client) {

View File

@ -17,6 +17,8 @@
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
@ -33,6 +35,7 @@
*/
public class MySqlConnector extends SourceConnector {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String, String> props;
public MySqlConnector() {
@ -88,10 +91,17 @@ public Config validate(Map<String, String> connectorConfigs) {
&& passwordValue.errorMessages().isEmpty()) {
// Try to connect to the database ...
try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(config)) {
jdbcContext.start();
JdbcConnection mysql = jdbcContext.jdbc();
try {
mysql.execute("SELECT version()");
logger.info("Successfully tested connection for {} with user '{}'", jdbcContext.connectionString(), mysql.username());
} catch (SQLException e) {
logger.info("Failed testing connection for {} with user '{}'", jdbcContext.connectionString(), mysql.username());
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
} finally {
jdbcContext.shutdown();
}
}
}
return new Config(new ArrayList<>(results.values()));

View File

@ -135,7 +135,75 @@ public static SnapshotMode parse(String value, String defaultValue) {
}
}
private static final String DATABASE_LIST_NAME = "database.list";
/**
* The set of predefined SecureConnectionMode options or aliases.
*/
public static enum SecureConnectionMode {
/**
* Establish an unencrypted connection.
*/
DISABLED("disabled"),
/**
* Establish a secure (encrypted) connection if the server supports secure connections.
* Fall back to an unencrypted connection otherwise.
*/
PREFERRED("preferred"),
/**
* Establish a secure connection if the server supports secure connections.
* The connection attempt fails if a secure connection cannot be established.
*/
REQUIRED("required"),
/**
* Like REQUIRED, but additionally verify the server TLS certificate against the configured Certificate Authority
* (CA) certificates. The connection attempt fails if no valid matching CA certificates are found.
*/
VERIFY_CA("verify_ca"),
/**
* Like VERIFY_CA, but additionally verify that the server certificate matches the host to which the connection is
* attempted.
*/
VERIFY_IDENTITY("verify_identity");
private final String value;
private SecureConnectionMode(String value) {
this.value = value;
}
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SecureConnectionMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (SecureConnectionMode option : SecureConnectionMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SecureConnectionMode parse(String value, String defaultValue) {
SecureConnectionMode mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
private static final String DATABASE_WHITELIST_NAME = "database.whitelist";
private static final String TABLE_WHITELIST_NAME = "table.whitelist";
private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin";
@ -147,7 +215,6 @@ public static SnapshotMode parse(String value, String defaultValue) {
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.HIGH)
.withDependents(DATABASE_LIST_NAME)
.withValidation(Field::isRequired)
.withDescription("Resolvable hostname or IP address of the MySQL database server.");
@ -157,7 +224,6 @@ public static SnapshotMode parse(String value, String defaultValue) {
.withWidth(Width.SHORT)
.withDefault(3306)
.withImportance(Importance.HIGH)
.withDependents(DATABASE_LIST_NAME)
.withValidation(Field::isInteger)
.withDescription("Port of the MySQL database server.");
@ -166,7 +232,6 @@ public static SnapshotMode parse(String value, String defaultValue) {
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDependents(DATABASE_LIST_NAME)
.withValidation(Field::isRequired)
.withDescription("Name of the MySQL database user to be used when connecting to the database.");
@ -175,7 +240,6 @@ public static SnapshotMode parse(String value, String defaultValue) {
.withType(Type.PASSWORD)
.withWidth(Width.SHORT)
.withImportance(Importance.HIGH)
.withDependents(DATABASE_LIST_NAME)
.withValidation(Field::isRequired)
.withDescription("Password of the MySQL database user to be used when connecting to the database.");
@ -202,6 +266,49 @@ public static SnapshotMode parse(String value, String defaultValue) {
+ "MySQL database cluster as another server (with this unique ID) so it can read "
+ "the binlog. By default, a random number is generated between 5400 and 6400.");
public static final Field SSL_MODE = Field.create("database.ssl.mode")
.withDisplayName("SSL mode")
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLED)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Whether to use an encrypted connection to MySQL. Options include"
+ "'disabled' (the default) to use an unencrypted connection; "
+ "'preferred' to establish a secure (encrypted) connection if the server supports secure connections, "
+ "but fall back to an unencrypted connection otherwise; "
+ "'required' to use a secure (encrypted) connection, and fail if one cannot be established; "
+ "'verify_ca' like 'required' but additionally verify the server TLS certificate against the configured Certificate Authority "
+ "(CA) certificates, or fail if no valid matching CA certificates are found; or"
+ "'verify_identity' like 'verify_ca' but additionally verify that the server certificate matches the host to which the connection is attempted.");
public static final Field SSL_KEYSTORE = Field.create("database.ssl.keystore")
.withDisplayName("SSL Keystore")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Location of the Java keystore file containing an application process's own certificate and private key.");
public static final Field SSL_KEYSTORE_PASSWORD = Field.create("database.ssl.keystore.password")
.withDisplayName("SSL Keystore Password")
.withType(Type.PASSWORD)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Password to access the private key from the keystore file specified by 'ssl.keystore' configuration property or the 'javax.net.ssl.keyStore' system or JVM property. "
+ "This password is used to unlock the keystore file (store password), and to decrypt the private key stored in the keystore (key password).");
public static final Field SSL_TRUSTSTORE = Field.create("database.ssl.truststore")
.withDisplayName("SSL Truststore")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.MEDIUM)
.withDescription("Location of the Java truststore file containing the collection of CA certificates trusted by this application process (trust store).");
public static final Field SSL_TRUSTSTORE_PASSWORD = Field.create("database.ssl.truststore.password")
.withDisplayName("SSL Truststore Password")
.withType(Type.PASSWORD)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Password to unlock the keystore file (store password) specified by 'ssl.trustore' configuration property or the 'javax.net.ssl.trustStore' system or JVM property.");
public static final Field TABLES_IGNORE_BUILTIN = Field.create(TABLE_IGNORE_BUILTIN_NAME)
.withDisplayName("Ignore system databases")
.withType(Type.BOOLEAN)
@ -361,7 +468,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class)
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("The criteria for running a snapshot upon startup of the connector. "
@ -369,8 +476,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
+ "'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; "
+ "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; and "
+ "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the beginning of the binlog. "
+ "The 'never' mode should be used with care, and only when the binlog is known to contain all history.")
.withDefault(SnapshotMode.INITIAL.getValue());
+ "The 'never' mode should be used with care, and only when the binlog is known to contain all history.");
public static final Field SNAPSHOT_MINIMAL_LOCKING = Field.create("snapshot.minimal.locks")
.withDisplayName("Use shortest database locking for snapshots")
@ -387,14 +493,13 @@ public static SnapshotMode parse(String value, String defaultValue) {
public static final Field TIME_PRECISION_MODE = Field.create("time.precision.mode")
.withDisplayName("Time Precision")
.withEnum(TemporalPrecisionMode.class)
.withEnum(TemporalPrecisionMode.class, TemporalPrecisionMode.ADAPTIVE)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Time, date, and timestamps can be represented with different kinds of precisions, including:"
+ "'adaptive' (the default) bases the precision of time, date, and timestamp values on the database column's precision; "
+ "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision .")
.withDefault(TemporalPrecisionMode.ADAPTIVE.getValue());
+ "which uses millisecond precision regardless of the database columns' precision .");
/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
@ -438,7 +543,9 @@ public static final Field MASK_COLUMN(int length) {
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING,
TIME_PRECISION_MODE);
TIME_PRECISION_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD);
/**
* The set of {@link Field}s that are included in the {@link #configDef() configuration definition}. This includes
@ -453,10 +560,11 @@ public static final Field MASK_COLUMN(int length) {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, SERVER_NAME, SERVER_ID);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS, KafkaDatabaseHistory.TOPIC,
KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS,
DATABASE_HISTORY);
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, SERVER_NAME, SERVER_ID,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,

View File

@ -6,9 +6,16 @@
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
@ -19,22 +26,25 @@
*/
public class MySqlJdbcContext implements AutoCloseable {
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false";
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}";
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL);
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final Configuration config;
protected final JdbcConnection jdbc;
private final Map<String,String> originalSystemProperties = new HashMap<>();
public MySqlJdbcContext(Configuration config) {
this.config = config; // must be set before most methods are used
// Set up the JDBC connection without actually connecting, with extra MySQL-specific properties
// to give us better JDBC database metadata behavior ...
boolean useSSL = sslModeEnabled();
Configuration jdbcConfig = config.subset("database.", true)
.edit()
.with("useInformationSchema", "true")
.with("nullCatalogMeansCurrent", "false")
.with("useSSL", Boolean.toString(useSSL))
.build();
this.jdbc = new JdbcConnection(jdbcConfig, FACTORY);
}
@ -67,7 +77,24 @@ public int port() {
return config.getInteger(MySqlConnectorConfig.PORT);
}
public SecureConnectionMode sslMode() {
String mode = config.getString(MySqlConnectorConfig.SSL_MODE);
return SecureConnectionMode.parse(mode);
}
public boolean sslModeEnabled() {
return sslMode() != SecureConnectionMode.DISABLED;
}
public void start() {
if (sslModeEnabled()) {
originalSystemProperties.clear();
// Set the System properties for SSL for the MySQL driver ...
setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true);
setSystemProperty("javax.net.ssl.keyStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false);
setSystemProperty("javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true);
setSystemProperty("javax.net.ssl.trustStorePassword", MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, false);
}
}
public void shutdown() {
@ -75,6 +102,15 @@ public void shutdown() {
jdbc.close();
} catch (SQLException e) {
logger.error("Unexpected error shutting down the database connection", e);
} finally {
// Reset the system properties to their original value ...
originalSystemProperties.forEach((name,value)->{
if ( value != null ) {
System.setProperty(name,value);
} else {
System.clearProperty(name);
}
});
}
}
@ -86,4 +122,23 @@ public void close() {
protected String connectionString() {
return jdbc.connectionString(MYSQL_CONNECTION_URL);
}
protected void setSystemProperty(String property, Field field, boolean showValueInError) {
String value = config.getString(field);
if (value != null) {
String existingValue = System.getProperty(property);
if (existingValue != null && existingValue.equalsIgnoreCase(value)) {
String msg = "System or JVM property '" + property + "' is already defined, but the configuration property '" + field.name()
+ "' defines a different value";
if (showValueInError) {
msg = "System or JVM property '" + property + "' is already defined as " + existingValue
+ ", but the configuration property '" + field.name() + "' defines a different value '" + value + "'";
}
throw new ConnectException(msg);
} else {
String existing = System.setProperty(property, value);
originalSystemProperties.put(property, existing); // the existing value may be null
}
}
}
}

View File

@ -18,6 +18,7 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
import io.debezium.data.SchemaChangeHistory;
@ -99,14 +100,14 @@ protected Configuration.Builder simpleConfig() {
.with(MySqlConnectorConfig.PORT, port)
.with(MySqlConnectorConfig.USER, "replicator")
.with(MySqlConnectorConfig.PASSWORD, "replpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18911)
.with(MySqlConnectorConfig.SERVER_NAME, LOGICAL_NAME)
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, DB_NAME)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false); // eliminates MySQL driver warning about SSL connections
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
@Test

View File

@ -26,6 +26,7 @@ public class MySQLConnection extends JdbcConnection {
public static MySQLConnection forTestDatabase(String databaseName) {
return new MySQLConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database."))
.withDatabase(databaseName)
.with("useSSL", false)
.build());
}
@ -42,6 +43,7 @@ public static MySQLConnection forTestDatabase(String databaseName, String userna
.withDatabase(databaseName)
.withUser(username)
.withPassword(password)
.with("useSSL", false)
.build());
}

View File

@ -24,6 +24,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field.Recommender;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
@ -106,12 +107,72 @@ public void shouldFailToValidateInvalidConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
assertConfigurationErrors(result, KafkaDatabaseHistory.BOOTSTRAP_SERVERS);
assertConfigurationErrors(result, KafkaDatabaseHistory.TOPIC);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS);
}
@Test
public void shouldValidateValidConfigurationWithSSL() {
Configuration config = Configuration.create()
.with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.hostname"))
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.REQUIRED.name().toLowerCase())
.with(MySqlConnectorConfig.SSL_KEYSTORE, "/some/path/to/keystore")
.with(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, "keystore1234")
.with(MySqlConnectorConfig.SSL_TRUSTSTORE, "/some/path/to/truststore")
.with(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "truststore1234")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")
.with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();
MySqlConnector connector = new MySqlConnector();
Config result = connector.validate(config.asMap());
// Can't connect to MySQL using SSL on a container using the 'mysql/mysql-server' image maintained by MySQL team,
// but can actually connect to MySQL using SSL on a container using the 'mysql' image maintained by Docker, Inc.
assertConfigurationErrors(result, MySqlConnectorConfig.HOSTNAME, 0, 1);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PORT);
assertNoConfigurationErrors(result, MySqlConnectorConfig.USER);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_NAME);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_ID);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_WHITELIST);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_BLACKLIST);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLE_WHITELIST);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLE_BLACKLIST);
assertNoConfigurationErrors(result, MySqlConnectorConfig.COLUMN_BLACKLIST);
assertNoConfigurationErrors(result, MySqlConnectorConfig.CONNECTION_TIMEOUT_MS);
assertNoConfigurationErrors(result, MySqlConnectorConfig.KEEP_ALIVE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.MAX_QUEUE_SIZE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.MAX_BATCH_SIZE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.POLL_INTERVAL_MS);
assertNoConfigurationErrors(result, MySqlConnectorConfig.DATABASE_HISTORY);
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.BOOTSTRAP_SERVERS);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.TOPIC);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS);
}
@Test
public void shouldValidateAcceptableConfiguration() {
Configuration config = Configuration.create()
@ -119,13 +180,12 @@ public void shouldValidateAcceptableConfiguration() {
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")
.with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL
// connections
.build();
MySqlConnector connector = new MySqlConnector();
Config result = connector.validate(config.asMap());
@ -151,6 +211,11 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_MODE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.BOOTSTRAP_SERVERS);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.TOPIC);
assertNoConfigurationErrors(result, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS);
@ -206,12 +271,12 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, "connector_test")
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -430,6 +495,7 @@ public void shouldConsumeEventsWithNoSnapshot() throws SQLException, Interrupted
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18780)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer1")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
@ -438,7 +504,6 @@ public void shouldConsumeEventsWithNoSnapshot() throws SQLException, Interrupted
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.name().toLowerCase())
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
@ -479,6 +544,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18780)
.with(MySqlConnectorConfig.SERVER_NAME, "myServer2")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
@ -488,7 +554,6 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
.with(MySqlConnectorConfig.MASK_COLUMN(12), "connector_test_ro.customers.email")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...

View File

@ -25,6 +25,7 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode;
import io.debezium.data.Envelope;
@ -68,6 +69,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "regression")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
@ -76,7 +78,6 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.toString())
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -188,6 +189,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "regression")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
@ -197,7 +199,6 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.toString())
.with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT.toString())
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -307,6 +308,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
.with(MySqlConnectorConfig.PORT, System.getProperty("database.port"))
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18765)
.with(MySqlConnectorConfig.SERVER_NAME, "regression")
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
@ -315,7 +317,6 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.toString())
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL", false) // eliminates MySQL driver warning about SSL connections
.build();
// Start the connector ...
start(MySqlConnector.class, config);

View File

@ -16,6 +16,7 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
@ -69,12 +70,12 @@ protected Configuration.Builder simpleConfig() {
.with(MySqlConnectorConfig.PORT, port)
.with(MySqlConnectorConfig.USER, username)
.with(MySqlConnectorConfig.PASSWORD, password)
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.name().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, serverId)
.with(MySqlConnectorConfig.SERVER_NAME, serverName)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, databaseName)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL",false); // eliminates MySQL driver warning about SSL connections
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
@Test

View File

@ -44,6 +44,7 @@
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.XidEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import com.github.shyiko.mysql.binlog.network.ServerException;
import static org.fest.assertions.Assertions.assertThat;
@ -105,6 +106,7 @@ protected void startClient(Consumer<BinaryLogClient> preConnect) throws IOExcept
client = new BinaryLogClient(config.getHostname(), config.getPort(), "replicator", "replpass");
client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances
client.setKeepAlive(false);
client.setSSLMode(SSLMode.DISABLED);
client.registerEventListener(counters);
client.registerEventListener(this::recordEvent);
client.registerLifecycleListener(new TraceLifecycleListener());

View File

@ -20,6 +20,7 @@
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
import io.debezium.data.SchemaChangeHistory;
@ -70,14 +71,14 @@ protected Configuration.Builder simpleConfig() {
.with(MySqlConnectorConfig.PORT, port)
.with(MySqlConnectorConfig.USER, "snapper")
.with(MySqlConnectorConfig.PASSWORD, "snapperpass")
.with(MySqlConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED.toString().toLowerCase())
.with(MySqlConnectorConfig.SERVER_ID, 18911)
.with(MySqlConnectorConfig.SERVER_NAME, LOGICAL_NAME)
.with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.DATABASE_WHITELIST, DB_NAME)
.with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH)
.with("database.useSSL",false); // eliminates MySQL driver warning about SSL connections
.with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH);
}
@Test

View File

@ -639,8 +639,26 @@ public Field withType(Type type) {
* @return the new field; never null
*/
public <T extends Enum<T>> Field withEnum(Class<T> enumType) {
return withEnum(enumType,null);
}
/**
* Create and return a new Field instance that is a copy of this field but has a {@link #withType(Type) type} of
* {@link org.apache.kafka.connect.data.Schema.Type#STRING}, a {@link #withRecommender(Recommender) recommender}
* that returns a list of {@link Enum#name() Enum names} as valid values, and a validator that verifies values are valid
* enumeration names.
*
* @param enumType the enumeration type for the field
* @param defaultOption the default enumeration value; may be null
* @return the new field; never null
*/
public <T extends Enum<T>> Field withEnum(Class<T> enumType, T defaultOption) {
EnumRecommender<T> recommendator = new EnumRecommender<>(enumType);
return withType(Type.STRING).withRecommender(recommendator).withValidation(recommendator);
Field result = withType(Type.STRING).withRecommender(recommendator).withValidation(recommendator);
if ( defaultOption != null ) {
result = result.withDefault(defaultOption.name().toLowerCase());
}
return result;
}
/**

View File

@ -134,15 +134,22 @@ private static Field[] combineVariables(Field[] overriddenVariables,
private static String findAndReplace(String url, Properties props, Field... variables) {
for (Field field : variables) {
String variable = field.name();
if (variable != null && url.contains("${" + variable + "}")) {
// Otherwise, we have to remove it from the properties ...
String value = props.getProperty(variable);
if (value != null) {
props.remove(variable);
// And replace the variable ...
url = url.replaceAll("\\$\\{" + variable + "\\}", value);
if ( field != null ) url = findAndReplace(url, field.name(), props);
}
for (Object key : new HashSet<>(props.keySet())) {
if (key != null ) url = findAndReplace(url, key.toString(), props);
}
return url;
}
private static String findAndReplace(String url, String name, Properties props) {
if (name != null && url.contains("${" + name + "}")) {
// Otherwise, we have to remove it from the properties ...
String value = props.getProperty(name);
if (value != null) {
props.remove(name);
// And replace the variable ...
url = url.replaceAll("\\$\\{" + name + "\\}", value);
}
}
return url;

View File

@ -5,6 +5,8 @@
*/
package io.debezium.embedded;
import static org.junit.Assert.fail;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
@ -510,6 +512,12 @@ protected void assertConfigurationErrors(Config config, io.debezium.config.Field
assertThat(value.errorMessages().size()).isEqualTo(numErrors);
}
protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int minErrorsInclusive, int maxErrorsInclusive) {
ConfigValue value = configValue(config, field.name());
assertThat(value.errorMessages().size()).isGreaterThanOrEqualTo(minErrorsInclusive);
assertThat(value.errorMessages().size()).isLessThanOrEqualTo(maxErrorsInclusive);
}
protected void assertConfigurationErrors(Config config, io.debezium.config.Field field) {
ConfigValue value = configValue(config, field.name());
assertThat(value.errorMessages().size()).isGreaterThan(0);
@ -518,7 +526,11 @@ protected void assertConfigurationErrors(Config config, io.debezium.config.Field
protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field... fields) {
for (io.debezium.config.Field field : fields) {
ConfigValue value = configValue(config, field.name());
assertThat(value.errorMessages().size()).isEqualTo(0);
if ( value != null ) {
if ( !value.errorMessages().isEmpty() ) {
fail("Error messages on field '" + field.name() + "': " + value.errorMessages());
}
}
}
}

View File

@ -64,7 +64,7 @@
<version.postgresql.server>9.4</version.postgresql.server>
<version.mysql.server>5.7</version.mysql.server>
<version.mysql.driver>5.1.39</version.mysql.driver>
<version.mysql.binlog>0.3.3</version.mysql.binlog>
<version.mysql.binlog>0.4.0</version.mysql.binlog>
<version.mongo.server>3.2.6</version.mongo.server>
<version.mongo.driver>3.2.2</version.mongo.driver>