diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index eddc9d5fe..055b260bd 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -410,6 +410,14 @@ public static SecureConnectionMode parse(String value, String defaultValue) { .withDependents(DATABASE_WHITELIST_NAME) .withDescription("Flag specifying whether built-in tables should be ignored."); + public static final Field JDBC_DRIVER = Field.create("database.jdbc.driver") + .withDisplayName("Jdbc Driver Class Name") + .withType(Type.CLASS) + .withWidth(Width.MEDIUM) + .withDefault(com.mysql.jdbc.Driver.class.getName()) + .withImportance(Importance.LOW) + .withValidation(Field::isClassName) + .withDescription("JDBC Driver class name used to connect to the MySQL database server."); /** * A comma-separated list of regular expressions that match database names to be monitored. * May not be used with {@link #DATABASE_BLACKLIST}. @@ -679,7 +687,7 @@ public static final Field MASK_COLUMN(int length) { GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, - SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD); + SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER); /** * The set of {@link Field}s that are included in the {@link #configDef() configuration definition}. This includes @@ -695,7 +703,7 @@ public static final Field MASK_COLUMN(int length) { protected static ConfigDef configDef() { ConfigDef config = new ConfigDef(); Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, SERVER_NAME, SERVER_ID, - SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD); + SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER); Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS, KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index 4f8dfece2..9d27f8743 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -50,7 +50,9 @@ public MySqlJdbcContext(Configuration config) { .edit() .with("useSSL", Boolean.toString(useSSL)) .build(); - this.jdbc = new JdbcConnection(jdbcConfig, FACTORY); + String driverClassName = jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER); + this.jdbc = new JdbcConnection(jdbcConfig, + JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL, driverClassName, getClass().getClassLoader())); } public Configuration config() { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java index 90640f2aa..8241387fa 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorTest.java @@ -34,7 +34,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co assertThat(key.importance).isEqualTo(expected.importance()); assertThat(key.documentation).isEqualTo(expected.description()); assertThat(key.type).isEqualTo(expected.type()); - if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY)) { + if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) { assertThat(((Class) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue()); } else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) { assertThat(key.defaultValue).isEqualTo(expected.defaultValue()); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index 200e17f67..649436425 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -116,7 +116,56 @@ public static ConnectionFactory patternBasedFactory(String urlPattern, Field... return conn; }; } - + + /** + * Create a {@link ConnectionFactory} that uses the specific JDBC driver class loaded with the given class loader, and obtains the connection URL by replacing the following variables in the URL pattern: + * + *

+ * This method attempts to instantiate the JDBC driver class and use that instance to connect to the database. + * @param urlPattern the URL pattern string; may not be null + * @param driverClassName the name of the JDBC driver class; may not be null + * @param classloader the ClassLoader that should be used to load the JDBC driver class given by `driverClassName`; may be null if this class' class loader should be used + * @param variables any custom or overridden configuration variables + * @return the connection factory + */ + @SuppressWarnings("unchecked") + public static ConnectionFactory patternBasedFactory(String urlPattern, String driverClassName, + ClassLoader classloader, Field... variables) { + return (config) -> { + LOGGER.trace("Config: {}", config.asProperties()); + Properties props = config.asProperties(); + Field[] varsWithDefaults = combineVariables(variables, + JdbcConfiguration.HOSTNAME, + JdbcConfiguration.PORT, + JdbcConfiguration.USER, + JdbcConfiguration.PASSWORD, + JdbcConfiguration.DATABASE); + String url = findAndReplace(urlPattern, props, varsWithDefaults); + LOGGER.trace("Props: {}", props); + LOGGER.trace("URL: {}", url); + Connection conn = null; + try { + ClassLoader driverClassLoader = classloader; + if (driverClassLoader == null) { + driverClassLoader = JdbcConnection.class.getClassLoader(); + } + Class driverClazz = (Class) Class.forName(driverClassName, true, driverClassLoader); + java.sql.Driver driver = driverClazz.newInstance(); + conn = driver.connect(url, props); + } catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) { + throw new SQLException(e); + } + LOGGER.debug("Connected to {} with {}", url, props); + return conn; + }; + } + private static Field[] combineVariables(Field[] overriddenVariables, Field... defaultVariables) { Map fields = new HashMap<>();