DBZ-177: Providing an alternative way to create JDBC connection based on the configured JDBC driver class name and supplied classloader. The loading/creating the JDBC connections is not reliable when driver libraries in a different classloader than the DriverManager.

This commit is contained in:
Ramesh Reddy 2017-01-13 10:51:27 -06:00
parent 49e6231b69
commit a9aace3480
4 changed files with 64 additions and 5 deletions

View File

@ -410,6 +410,14 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withDependents(DATABASE_WHITELIST_NAME)
.withDescription("Flag specifying whether built-in tables should be ignored.");
public static final Field JDBC_DRIVER = Field.create("database.jdbc.driver")
.withDisplayName("Jdbc Driver Class Name")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withDefault(com.mysql.jdbc.Driver.class.getName())
.withImportance(Importance.LOW)
.withValidation(Field::isClassName)
.withDescription("JDBC Driver class name used to connect to the MySQL database server.");
/**
* A comma-separated list of regular expressions that match database names to be monitored.
* May not be used with {@link #DATABASE_BLACKLIST}.
@ -679,7 +687,7 @@ public static final Field MASK_COLUMN(int length) {
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES,
TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD);
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
/**
* The set of {@link Field}s that are included in the {@link #configDef() configuration definition}. This includes
@ -695,7 +703,7 @@ public static final Field MASK_COLUMN(int length) {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, SERVER_NAME, SERVER_ID,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD);
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY);

View File

@ -50,7 +50,9 @@ public MySqlJdbcContext(Configuration config) {
.edit()
.with("useSSL", Boolean.toString(useSSL))
.build();
this.jdbc = new JdbcConnection(jdbcConfig, FACTORY);
String driverClassName = jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
this.jdbc = new JdbcConnection(jdbcConfig,
JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL, driverClassName, getClass().getClassLoader()));
}
public Configuration config() {

View File

@ -34,7 +34,7 @@ protected static void assertConfigDefIsValid(Connector connector, io.debezium.co
assertThat(key.importance).isEqualTo(expected.importance());
assertThat(key.documentation).isEqualTo(expected.description());
assertThat(key.type).isEqualTo(expected.type());
if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY)) {
if (expected.equals(MySqlConnectorConfig.DATABASE_HISTORY) || expected.equals(MySqlConnectorConfig.JDBC_DRIVER)) {
assertThat(((Class<?>) key.defaultValue).getName()).isEqualTo((String) expected.defaultValue());
} else if (!expected.equals(MySqlConnectorConfig.SERVER_ID)) {
assertThat(key.defaultValue).isEqualTo(expected.defaultValue());

View File

@ -115,7 +115,56 @@ public static ConnectionFactory patternBasedFactory(String urlPattern, Field...
return conn;
};
}
/**
* Create a {@link ConnectionFactory} that uses the specific JDBC driver class loaded with the given class loader, and obtains the connection URL by replacing the following variables in the URL pattern:
* <ul>
* <li><code>${hostname}</code></li>
* <li><code>${port}</code></li>
* <li><code>${dbname}</code></li>
* <li><code>${username}</code></li>
* <li><code>${password}</code></li>
* </ul>
* <p>
* This method attempts to instantiate the JDBC driver class and use that instance to connect to the database.
* @param urlPattern the URL pattern string; may not be null
* @param driverClassName the name of the JDBC driver class; may not be null
* @param classloader the ClassLoader that should be used to load the JDBC driver class given by `driverClassName`; may be null if this class' class loader should be used
* @param variables any custom or overridden configuration variables
* @return the connection factory
*/
@SuppressWarnings("unchecked")
public static ConnectionFactory patternBasedFactory(String urlPattern, String driverClassName,
ClassLoader classloader, Field... variables) {
return (config) -> {
LOGGER.trace("Config: {}", config.asProperties());
Properties props = config.asProperties();
Field[] varsWithDefaults = combineVariables(variables,
JdbcConfiguration.HOSTNAME,
JdbcConfiguration.PORT,
JdbcConfiguration.USER,
JdbcConfiguration.PASSWORD,
JdbcConfiguration.DATABASE);
String url = findAndReplace(urlPattern, props, varsWithDefaults);
LOGGER.trace("Props: {}", props);
LOGGER.trace("URL: {}", url);
Connection conn = null;
try {
ClassLoader driverClassLoader = classloader;
if (driverClassLoader == null) {
driverClassLoader = JdbcConnection.class.getClassLoader();
}
Class<java.sql.Driver> driverClazz = (Class<java.sql.Driver>) Class.forName(driverClassName, true, driverClassLoader);
java.sql.Driver driver = driverClazz.newInstance();
conn = driver.connect(url, props);
} catch (ClassNotFoundException|IllegalAccessException|InstantiationException e) {
throw new SQLException(e);
}
LOGGER.debug("Connected to {} with {}", url, props);
return conn;
};
}
private static Field[] combineVariables(Field[] overriddenVariables,
Field... defaultVariables) {
Map<String, Field> fields = new HashMap<>();