diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index 366452ac8..9d9297cdb 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -6,6 +6,9 @@ package io.debezium.connector.mysql; +import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; +import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX; + import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; @@ -33,7 +36,6 @@ import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; -import io.debezium.relational.history.DatabaseHistory; import io.debezium.util.Strings; /** @@ -497,7 +499,7 @@ public MySqlConnectionConfiguration(Configuration config) { .edit() .withDefault(MySqlConnectorConfig.PORT, MySqlConnectorConfig.PORT.defaultValue()) .build() - .subset("database.", true); + .subset(DATABASE_CONFIG_PREFIX, true).merge(config.subset(DRIVER_CONFIG_PREFIX, true)); final Builder jdbcConfigBuilder = dbConfig .edit() diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java index 5146e3a9a..689a83a85 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java @@ -5,6 +5,9 @@ */ package io.debezium.connector.mysql; +import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; +import static io.debezium.config.CommonConnectorConfig.DRIVER_CONFIG_PREFIX; + import java.sql.SQLException; import java.util.Map; @@ -37,7 +40,8 @@ public enum MySqlVersion { * @return the MySQLConnection instance; never null */ public static MySqlTestConnection forTestDatabase(String databaseName) { - return new MySqlTestConnection(JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + return new MySqlTestConnection(JdbcConfiguration.copy( + Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX))) .withDatabase(databaseName) .with("characterEncoding", "utf8") .build()); @@ -50,7 +54,8 @@ public static MySqlTestConnection forTestDatabase(String databaseName) { * @return the MySQLConnection instance; never null */ public static MySqlTestConnection forTestDatabase(String databaseName, Map urlProperties) { - JdbcConfiguration.Builder builder = JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + JdbcConfiguration.Builder builder = JdbcConfiguration.copy( + Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX))) .withDatabase(databaseName) .with("characterEncoding", "utf8"); urlProperties.forEach(builder::with); @@ -66,7 +71,8 @@ public static MySqlTestConnection forTestDatabase(String databaseName, Map key != null && key.startsWith(prefixWithSeparator)).map(prefixRemover); } + /** + * Return a new {@link Configuration} that merges several {@link Configuration}s into one. + *

+ * This method returns this Configuration instance if the supplied {@code configs} is null or empty. + * + * @param configs Configurations to be merged + * @return the subset of this Configuration; never null + */ + default Configuration merge(Configuration... configs) { + if (configs == null || configs.length == 0) { + return this; + } + + Set keys = new HashSet<>(); + for (Configuration config : configs) { + keys.addAll(config.keys()); + } + + return new Configuration() { + @Override + public Set keys() { + return Collect.unmodifiableSet(Configuration.this.keys().stream() + .filter(k -> k != null) + .collect(Collectors.toSet())); + } + + @Override + public String getString(String key) { + return Configuration.this.getString(key); + } + + @Override + public String toString() { + return withMaskedPasswords().asProperties().toString(); + } + }; + } + /** * Return a new {@link Configuration} that contains only the subset of keys that satisfy the given predicate. * diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java index edb64df66..19ae63cd7 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java @@ -37,7 +37,6 @@ import io.debezium.relational.Tables.ColumnNameFilter; import io.debezium.relational.Tables.ColumnNameFilterFactory; import io.debezium.relational.Tables.TableFilter; -import io.debezium.relational.history.DatabaseHistory; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.SchemaNameAdjuster; import io.debezium.util.Strings; @@ -523,7 +522,8 @@ protected RelationalDatabaseConnectorConfig(Configuration config, String logical this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper); this.tableIdMapper = tableIdMapper; - this.jdbcConfig = JdbcConfiguration.adapt(config.subset(DATABASE_CONFIG_PREFIX, true)); + this.jdbcConfig = JdbcConfiguration.adapt( + config.subset(DATABASE_CONFIG_PREFIX, true).merge(config.subset(DRIVER_CONFIG_PREFIX, true))); if (systemTablesFilter != null && tableIdMapper != null) { this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper, useCatalogBeforeSchema);