DBZ-5043 Move driver pass-through parameters into dedicated namespace

Currently we mix database parameters like DB hostname, username or
password with other JDBC driver parameters which are passed to JDBC
driver as it is. Move these pass-through parameters into dedicated
namespace `driver.*` to separate them from common database parameters.
JDBC parameters which are defined on the connector level still stay
in `database` namespace.
This commit is contained in:
Vojtech Juranek 2022-08-18 12:21:10 +02:00 committed by Jiri Pechanec
parent 3cdcf795d8
commit a7a293c5c8
5 changed files with 55 additions and 7 deletions

View File

@ -6,6 +6,9 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.time.Duration; import java.time.Duration;
@ -33,7 +36,6 @@
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Strings; import io.debezium.util.Strings;
/** /**
@ -497,7 +499,7 @@ public MySqlConnectionConfiguration(Configuration config) {
.edit() .edit()
.withDefault(MySqlConnectorConfig.PORT, MySqlConnectorConfig.PORT.defaultValue()) .withDefault(MySqlConnectorConfig.PORT, MySqlConnectorConfig.PORT.defaultValue())
.build() .build()
.subset("database.", true); .subset(DATABASE_CONFIG_PREFIX, true).merge(config.subset(DRIVER_CONFIG_PREFIX, true));
final Builder jdbcConfigBuilder = dbConfig final Builder jdbcConfigBuilder = dbConfig
.edit() .edit()

View File

@ -5,6 +5,9 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Map; import java.util.Map;
@ -37,7 +40,8 @@ public enum MySqlVersion {
* @return the MySQLConnection instance; never null * @return the MySQLConnection instance; never null
*/ */
public static MySqlTestConnection forTestDatabase(String databaseName) { public static MySqlTestConnection forTestDatabase(String databaseName) {
return new MySqlTestConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) return new MySqlTestConnection(JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName) .withDatabase(databaseName)
.with("characterEncoding", "utf8") .with("characterEncoding", "utf8")
.build()); .build());
@ -50,7 +54,8 @@ public static MySqlTestConnection forTestDatabase(String databaseName) {
* @return the MySQLConnection instance; never null * @return the MySQLConnection instance; never null
*/ */
public static MySqlTestConnection forTestDatabase(String databaseName, Map<String, Object> urlProperties) { public static MySqlTestConnection forTestDatabase(String databaseName, Map<String, Object> urlProperties) {
JdbcConfiguration.Builder builder = JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) JdbcConfiguration.Builder builder = JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName) .withDatabase(databaseName)
.with("characterEncoding", "utf8"); .with("characterEncoding", "utf8");
urlProperties.forEach(builder::with); urlProperties.forEach(builder::with);
@ -66,7 +71,8 @@ public static MySqlTestConnection forTestDatabase(String databaseName, Map<Strin
* @return the MySQLConnection instance; never null * @return the MySQLConnection instance; never null
*/ */
public static MySqlTestConnection forTestDatabase(String databaseName, String username, String password) { public static MySqlTestConnection forTestDatabase(String databaseName, String username, String password) {
return new MySqlTestConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) return new MySqlTestConnection(JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName) .withDatabase(databaseName)
.withUser(username) .withUser(username)
.withPassword(password) .withPassword(password)

View File

@ -309,6 +309,7 @@ public static SchemaNameAdjustmentMode parse(String value) {
public static final int DEFAULT_QUERY_FETCH_SIZE = 0; public static final int DEFAULT_QUERY_FETCH_SIZE = 0;
public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500; public static final long DEFAULT_POLL_INTERVAL_MILLIS = 500;
public static final String DATABASE_CONFIG_PREFIX = "database."; public static final String DATABASE_CONFIG_PREFIX = "database.";
public static final String DRIVER_CONFIG_PREFIX = "driver.";
private static final String CONVERTER_TYPE_SUFFIX = ".type"; private static final String CONVERTER_TYPE_SUFFIX = ".type";
public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L; public static final long DEFAULT_RETRIABLE_RESTART_WAIT = 10000L;
public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes; public static final long DEFAULT_MAX_QUEUE_SIZE_IN_BYTES = 0; // In case we don't want to pass max.queue.size.in.bytes;

View File

@ -17,6 +17,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -1552,6 +1553,44 @@ default Configuration subset(String prefix, boolean removePrefix) {
return filter(key -> key != null && key.startsWith(prefixWithSeparator)).map(prefixRemover); return filter(key -> key != null && key.startsWith(prefixWithSeparator)).map(prefixRemover);
} }
/**
* Return a new {@link Configuration} that merges several {@link Configuration}s into one.
* <p>
* This method returns this Configuration instance if the supplied {@code configs} is null or empty.
*
* @param configs Configurations to be merged
* @return the subset of this Configuration; never null
*/
default Configuration merge(Configuration... configs) {
if (configs == null || configs.length == 0) {
return this;
}
Set<String> keys = new HashSet<>();
for (Configuration config : configs) {
keys.addAll(config.keys());
}
return new Configuration() {
@Override
public Set<String> keys() {
return Collect.unmodifiableSet(Configuration.this.keys().stream()
.filter(k -> k != null)
.collect(Collectors.toSet()));
}
@Override
public String getString(String key) {
return Configuration.this.getString(key);
}
@Override
public String toString() {
return withMaskedPasswords().asProperties().toString();
}
};
}
/** /**
* Return a new {@link Configuration} that contains only the subset of keys that satisfy the given predicate. * Return a new {@link Configuration} that contains only the subset of keys that satisfy the given predicate.
* *

View File

@ -37,7 +37,6 @@
import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.ColumnNameFilterFactory; import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Strings; import io.debezium.util.Strings;
@ -523,7 +522,8 @@ protected RelationalDatabaseConnectorConfig(Configuration config, String logical
this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper); this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper);
this.tableIdMapper = tableIdMapper; this.tableIdMapper = tableIdMapper;
this.jdbcConfig = JdbcConfiguration.adapt(config.subset(DATABASE_CONFIG_PREFIX, true)); this.jdbcConfig = JdbcConfiguration.adapt(
config.subset(DATABASE_CONFIG_PREFIX, true).merge(config.subset(DRIVER_CONFIG_PREFIX, true)));
if (systemTablesFilter != null && tableIdMapper != null) { if (systemTablesFilter != null && tableIdMapper != null) {
this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper, useCatalogBeforeSchema); this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper, useCatalogBeforeSchema);