DBZ-4801 Using JdbcConfiguration in JdbcConnection

This commit is contained in:
Gunnar Morling 2022-03-01 10:29:16 +01:00
parent c1c322ecf0
commit d20a6f00e7
24 changed files with 83 additions and 78 deletions

View File

@ -28,6 +28,7 @@
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -64,7 +65,7 @@ public class MySqlConnection extends JdbcConnection {
* @param fieldReader binary or text protocol based readers
*/
public MySqlConnection(MySqlConnectionConfiguration connectionConfig, MysqlFieldReader fieldReader) {
super(connectionConfig.config(), connectionConfig.factory(), QUOTED_CHARACTER, QUOTED_CHARACTER);
super(connectionConfig.jdbcConfig, connectionConfig.factory(), QUOTED_CHARACTER, QUOTED_CHARACTER);
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
}
@ -481,7 +482,7 @@ public static class MySqlConnectionConfiguration {
protected static final String JDBC_PROPERTY_CONNECTION_TIME_ZONE = "connectionTimeZone";
protected static final String JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE = "serverTimezone";
private final Configuration jdbcConfig;
private final JdbcConfiguration jdbcConfig;
private final ConnectionFactory factory;
private final Configuration config;
@ -513,7 +514,7 @@ else if ("true".equals(legacyDateTime)) {
jdbcConfigBuilder.with(JDBC_PROPERTY_CONNECTION_TIME_ZONE, determineConnectionTimeZone(dbConfig));
this.jdbcConfig = jdbcConfigBuilder.build();
this.jdbcConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build());
String driverClassName = this.jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
factory = JdbcConnection.patternBasedFactory(MySqlConnection.URL_PATTERN, driverClassName, getClass().getClassLoader());
}
@ -537,7 +538,7 @@ private static String determineConnectionTimeZone(final Configuration dbConfig)
return connectionTimeZone != null ? connectionTimeZone : "SERVER";
}
public Configuration config() {
public JdbcConfiguration config() {
return jdbcConfig;
}

View File

@ -85,7 +85,7 @@ else if ("true".equals(legacyDateTime)) {
jdbcConfig = jdbcConfigBuilder.build();
String driverClassName = jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
this.jdbc = new JdbcConnection(jdbcConfig,
this.jdbc = new JdbcConnection(JdbcConfiguration.adapt(jdbcConfig),
JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL, driverClassName, getClass().getClassLoader()), "`", "`");
}

View File

@ -102,13 +102,13 @@ public static boolean isPerconaServer() {
return comment.startsWith("Percona");
}
private static Configuration addDefaultSettings(Configuration configuration) {
return configuration.edit()
private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
return JdbcConfiguration.adapt(configuration.edit()
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
.withDefault(JdbcConfiguration.PORT, 3306)
.withDefault(JdbcConfiguration.USER, "mysqluser")
.withDefault(JdbcConfiguration.PASSWORD, "mysqlpw")
.build();
.build());
}
@ -119,7 +119,7 @@ private static Configuration addDefaultSettings(Configuration configuration) {
*
* @param config the configuration; may not be null
*/
public MySqlTestConnection(Configuration config) {
public MySqlTestConnection(JdbcConfiguration config) {
super(addDefaultSettings(config), FACTORY, null, null, "`", "`");
}

View File

@ -29,9 +29,9 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
@ -70,7 +70,7 @@ public class OracleConnection extends JdbcConnection {
private static final String QUOTED_CHARACTER = "\"";
public OracleConnection(Configuration config, Supplier<ClassLoader> classLoaderSupplier) {
public OracleConnection(JdbcConfiguration config, Supplier<ClassLoader> classLoaderSupplier) {
super(config, resolveConnectionFactory(config), classLoaderSupplier, QUOTED_CHARACTER, QUOTED_CHARACTER);
this.databaseVersion = resolveOracleDatabaseVersion();
@ -474,12 +474,12 @@ public String buildSelectWithRowLimits(TableId tableId,
return sql.toString();
}
public static String connectionString(Configuration config) {
public static String connectionString(JdbcConfiguration config) {
return config.getString(URL) != null ? config.getString(URL)
: ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl();
}
private static ConnectionFactory resolveConnectionFactory(Configuration config) {
private static ConnectionFactory resolveConnectionFactory(JdbcConfiguration config) {
return JdbcConnection.patternBasedFactory(connectionString(config));
}

View File

@ -72,7 +72,8 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config);
try (OracleConnection connection = new OracleConnection(connectorConfig.getJdbcConfig(), () -> getClass().getClassLoader())) {
LOGGER.debug("Successfully tested connection for {} with user '{}'", OracleConnection.connectionString(config), connection.username());
LOGGER.debug("Successfully tested connection for {} with user '{}'", OracleConnection.connectionString(connectorConfig.getJdbcConfig()),
connection.username());
}
catch (SQLException | RuntimeException e) {
LOGGER.error("Failed testing connection for {} with user '{}'", config.withMaskedPasswords(), userValue, e);

View File

@ -19,6 +19,7 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.oracle.StreamingAdapter.TableNameCaseSensitivity;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
@ -51,7 +52,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
TopicSelector<TableId> topicSelector = OracleTopicSelector.defaultSelector(connectorConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
Configuration jdbcConfig = connectorConfig.getJdbcConfig();
JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
jdbcConnection = new OracleConnection(jdbcConfig, () -> getClass().getClassLoader());
validateRedoLogConfiguration();

View File

@ -172,7 +172,7 @@ public static Configuration.Builder defaultConfig() {
public static OracleConnection defaultConnection() {
Configuration config = defaultConfig().build();
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
return createConnection(config, jdbcConfig, true);
return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), true);
}
/**
@ -257,7 +257,7 @@ private static Configuration getDatabaseConfig(String prefix) {
public static OracleConnection testConnection() {
Configuration config = testConfig().build();
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
return createConnection(config, jdbcConfig, false);
return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false);
}
/**
@ -267,7 +267,7 @@ public static OracleConnection testConnection() {
public static OracleConnection adminConnection() {
Configuration config = adminConfig().build();
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
return createConnection(config, jdbcConfig, false);
return createConnection(config, JdbcConfiguration.adapt(jdbcConfig), false);
}
/**
@ -278,7 +278,7 @@ public static OracleConnection adminConnection() {
* @param autoCommit whether the connection should enforce auto-commit
* @return the connection
*/
private static OracleConnection createConnection(Configuration config, Configuration jdbcConfig, boolean autoCommit) {
private static OracleConnection createConnection(Configuration config, JdbcConfiguration jdbcConfig, boolean autoCommit) {
OracleConnection connection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader);
try {
connection.setAutoCommit(autoCommit);
@ -299,7 +299,7 @@ public static void forceLogfileSwitch() {
Configuration config = adminConfig().build();
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
try (OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader)) {
try (OracleConnection jdbcConnection = new OracleConnection(JdbcConfiguration.adapt(jdbcConfig), TestHelper.class::getClassLoader)) {
if ((new OracleConnectorConfig(defaultConfig().build())).getPdbName() != null) {
jdbcConnection.resetSessionToCdb();
}
@ -314,7 +314,7 @@ public static int getNumberOfOnlineLogGroups() {
Configuration config = adminConfig().build();
Configuration jdbcConfig = config.subset(DATABASE_PREFIX, true);
try (OracleConnection jdbcConnection = new OracleConnection(jdbcConfig, TestHelper.class::getClassLoader)) {
try (OracleConnection jdbcConnection = new OracleConnection(JdbcConfiguration.adapt(jdbcConfig), TestHelper.class::getClassLoader)) {
if ((new OracleConnectorConfig(defaultConfig().build())).getPdbName() != null) {
jdbcConnection.resetSessionToCdb();
}

View File

@ -83,7 +83,7 @@ public class PostgresConnection extends JdbcConnection {
* @param config {@link Configuration} instance, may not be null.
* @param valueConverterBuilder supplies a configured {@link PostgresValueConverter} for a given {@link TypeRegistry}
*/
public PostgresConnection(Configuration config, PostgresValueConverterBuilder valueConverterBuilder) {
public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder) {
super(addDefaultSettings(config), FACTORY, PostgresConnection::validateServerVersion, null, "\"", "\"");
if (Objects.isNull(valueConverterBuilder)) {
@ -122,15 +122,15 @@ public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegis
*
* @param config {@link Configuration} instance, may not be null.
*/
public PostgresConnection(Configuration config) {
public PostgresConnection(JdbcConfiguration config) {
this(config, null);
}
static Configuration addDefaultSettings(Configuration configuration) {
static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
// we require Postgres 9.4 as the minimum server version since that's where logical replication was first introduced
return configuration.edit()
return JdbcConfiguration.adapt(configuration.edit()
.with("assumeMinServerVersion", "9.4")
.build();
.build());
}
/**

View File

@ -37,11 +37,11 @@
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
@ -122,14 +122,14 @@ private PostgresReplicationConnection(PostgresConnectorConfig config,
this.hasInitedSlot = false;
}
private static Configuration addDefaultSettings(Configuration configuration) {
private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
// first copy the parent's default settings...
// then set some additional replication specific settings
return PostgresConnection.addDefaultSettings(configuration)
return JdbcConfiguration.adapt(PostgresConnection.addDefaultSettings(configuration)
.edit()
.with("replication", "database")
.with("preferQueryMode", "simple") // replication protocol only supports simple query mode
.build();
.build());
}
private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {

View File

@ -150,7 +150,7 @@ public static PostgresConnection createWithTypeRegistry() {
* @return the PostgresConnection instance; never null
*/
public static PostgresConnection create(String appName) {
return new PostgresConnection(defaultJdbcConfig().edit().with("ApplicationName", appName).build());
return new PostgresConnection(JdbcConfiguration.adapt(defaultJdbcConfig().edit().with("ApplicationName", appName).build()));
}
/**

View File

@ -26,6 +26,7 @@
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;
@ -232,7 +233,7 @@ public void shouldSupportPG95RestartLsn() throws Exception {
// "fake" a pg95 response by not returning confirmed_flushed_lsn
private PostgresConnection buildPG95PGConn(String name) {
return new PostgresConnection(TestHelper.defaultJdbcConfig().edit().with("ApplicationName", name).build()) {
return new PostgresConnection(JdbcConfiguration.adapt(TestHelper.defaultJdbcConfig().edit().with("ApplicationName", name).build())) {
@Override
protected ServerInfo.ReplicationSlot queryForSlot(String slotName, String database, String pluginName,
ResultSetMapper<ServerInfo.ReplicationSlot> map)

View File

@ -124,7 +124,7 @@ public class SqlServerConnection extends JdbcConnection {
* @param classLoaderSupplier class loader supplier
* @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
*/
public SqlServerConnection(Configuration config, SourceTimestampMode sourceTimestampMode,
public SqlServerConnection(JdbcConfiguration config, SourceTimestampMode sourceTimestampMode,
SqlServerValueConverters valueConverters, Supplier<ClassLoader> classLoaderSupplier,
Set<Envelope.Operation> skippedOperations, boolean multiPartitionMode) {
super(config, createConnectionFactory(multiPartitionMode), classLoaderSupplier, OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
@ -182,7 +182,7 @@ public SqlServerConnection(Configuration config, SourceTimestampMode sourceTimes
* @param skippedOperations a set of {@link Envelope.Operation} to skip in streaming
* @param optionRecompile Includes query option RECOMPILE on incremental snapshots
*/
public SqlServerConnection(Configuration config, SourceTimestampMode sourceTimestampMode,
public SqlServerConnection(JdbcConfiguration config, SourceTimestampMode sourceTimestampMode,
SqlServerValueConverters valueConverters, Supplier<ClassLoader> classLoaderSupplier,
Set<Envelope.Operation> skippedOperations, boolean multiPartitionMode, boolean optionRecompile) {
this(config, sourceTimestampMode, valueConverters, classLoaderSupplier, skippedOperations, multiPartitionMode);

View File

@ -144,7 +144,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
}
private SqlServerConnection connect(SqlServerConnectorConfig sqlServerConfig) {
return new SqlServerConnection(sqlServerConfig.jdbcConfig(),
return new SqlServerConnection(sqlServerConfig.getJdbcConfig(),
sqlServerConfig.getSourceTimestampMode(), null,
() -> getClass().getClassLoader(),
Collections.emptySet(),

View File

@ -428,10 +428,6 @@ else if (databaseNames != null) {
this.optionRecompile = config.getBoolean(INCREMENTAL_SNAPSHOT_OPTION_RECOMPILE);
}
public Configuration jdbcConfig() {
return getConfig().subset(DATABASE_CONFIG_PREFIX, true);
}
public List<String> getDatabaseNames() {
return databaseNames;
}

View File

@ -25,9 +25,7 @@
import io.debezium.pipeline.metrics.DefaultChangeEventSourceMetricsFactory;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
@ -59,11 +57,6 @@ public String version() {
@Override
public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext> start(Configuration config) {
final Clock clock = Clock.system();
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
// By default do not load whole result sets into memory
config = config.edit()
@ -71,12 +64,17 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
.withDefault("database.fetchSize", 10_000)
.build();
final Configuration jdbcConfig = config.filter(
x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
dataConnection = new SqlServerConnection(jdbcConfig, connectorConfig.getSourceTimestampMode(), valueConverters, () -> getClass().getClassLoader(),
final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
dataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), connectorConfig.getSourceTimestampMode(), valueConverters,
() -> getClass().getClassLoader(),
connectorConfig.getSkippedOperations(), connectorConfig.isMultiPartitionModeEnabled(), connectorConfig.getOptionRecompile());
metadataConnection = new SqlServerConnection(jdbcConfig, connectorConfig.getSourceTimestampMode(), valueConverters, () -> getClass().getClassLoader(),
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), connectorConfig.getSourceTimestampMode(), valueConverters,
() -> getClass().getClassLoader(),
connectorConfig.getSkippedOperations(), connectorConfig.isMultiPartitionModeEnabled());
this.schema = new SqlServerDatabaseSchema(connectorConfig, metadataConnection.getDefaultValueConverter(), valueConverters, topicSelector, schemaNameAdjuster);

View File

@ -250,25 +250,25 @@ public static SqlServerConnection multiPartitionTestConnection() {
}
public static SqlServerConnection testConnection(String databaseName) {
Configuration config = defaultJdbcConfig()
JdbcConfiguration config = JdbcConfiguration.adapt(defaultJdbcConfig()
.edit()
.with(JdbcConfiguration.ON_CONNECT_STATEMENTS, "USE [" + databaseName + "]")
.build();
.build());
return testConnection(config);
}
private static SqlServerConnection testConnection(Configuration config) {
private static SqlServerConnection testConnection(JdbcConfiguration config) {
return new SqlServerConnection(config, SourceTimestampMode.getDefaultMode(),
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), () -> TestHelper.class.getClassLoader(),
Collections.emptySet(), true);
}
public static SqlServerConnection testConnectionWithOptionRecompile() {
Configuration config = defaultJdbcConfig()
JdbcConfiguration config = JdbcConfiguration.adapt(defaultJdbcConfig()
.edit()
.with(JdbcConfiguration.ON_CONNECT_STATEMENTS, "USE [" + TEST_DATABASE + "]")
.build();
.build());
return new SqlServerConnection(config, SourceTimestampMode.getDefaultMode(),
new SqlServerValueConverters(JdbcValueConverters.DecimalMode.PRECISE, TemporalPrecisionMode.ADAPTIVE, null), () -> TestHelper.class.getClassLoader(),

View File

@ -109,6 +109,10 @@ public String toString() {
};
}
public static JdbcConfiguration empty() {
return JdbcConfiguration.adapt(Configuration.empty());
}
/**
* The JDBC-specific builder used to construct and/or alter JDBC configuration instances.
*

View File

@ -46,7 +46,6 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
@ -307,7 +306,7 @@ private static String findAndReplace(String url, String name, Properties props,
return url;
}
private final Configuration config;
private final JdbcConfiguration config;
private final ConnectionFactory factory;
private final Operations initialOps;
private final String openingQuoteCharacter;
@ -320,7 +319,7 @@ private static String findAndReplace(String url, String name, Properties props,
* @param config the configuration; may not be null
* @param connectionFactory the connection factory; may not be null
*/
public JdbcConnection(Configuration config, ConnectionFactory connectionFactory, String openingQuoteCharacter, String closingQuoteCharacter) {
public JdbcConnection(JdbcConfiguration config, ConnectionFactory connectionFactory, String openingQuoteCharacter, String closingQuoteCharacter) {
this(config, connectionFactory, null, null, openingQuoteCharacter, closingQuoteCharacter);
}
@ -330,7 +329,7 @@ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory,
* @param config the configuration; may not be null
* @param connectionFactory the connection factory; may not be null
*/
public JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Supplier<ClassLoader> classLoaderSupplier, String openingQuoteCharacter,
public JdbcConnection(JdbcConfiguration config, ConnectionFactory connectionFactory, Supplier<ClassLoader> classLoaderSupplier, String openingQuoteCharacter,
String closingQuoteCharacter) {
this(config, connectionFactory, null, classLoaderSupplier, openingQuoteCharacter, closingQuoteCharacter);
}
@ -346,7 +345,7 @@ public JdbcConnection(Configuration config, ConnectionFactory connectionFactory,
* @param openingQuotingChar the opening quoting character
* @param closingQuotingChar the closing quoting character
*/
protected JdbcConnection(Configuration config, ConnectionFactory connectionFactory, Operations initialOperations,
protected JdbcConnection(JdbcConfiguration config, ConnectionFactory connectionFactory, Operations initialOperations,
Supplier<ClassLoader> classLoaderSupplier, String openingQuotingChar, String closingQuotingChar) {
this.config = config;
this.factory = classLoaderSupplier == null ? connectionFactory : new ConnectionFactoryDecorator(connectionFactory, classLoaderSupplier);
@ -362,7 +361,7 @@ protected JdbcConnection(Configuration config, ConnectionFactory connectionFacto
* @return the JDBC configuration; never null
*/
public JdbcConfiguration config() {
return JdbcConfiguration.adapt(config);
return config;
}
public JdbcConnection setAutoCommit(boolean autoCommit) throws SQLException {

View File

@ -34,6 +34,7 @@
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.DatabaseHistory;
/**
* Configuration options shared across the relational CDC connectors.
@ -619,7 +620,7 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
private final TemporalPrecisionMode temporalPrecisionMode;
private final KeyMapper keyMapper;
private final TableIdToStringMapper tableIdMapper;
private final Configuration jdbcConfig;
private final JdbcConfiguration jdbcConfig;
private final String heartbeatActionQuery;
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter,
@ -630,7 +631,11 @@ protected RelationalDatabaseConnectorConfig(Configuration config, String logical
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
this.keyMapper = CustomKeyMapper.getInstance(config.getString(MSG_KEY_COLUMNS), tableIdMapper);
this.tableIdMapper = tableIdMapper;
this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true);
this.jdbcConfig = JdbcConfiguration.adapt(
config.filter(x -> !x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING)
|| x.equals(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name()))
.subset(DATABASE_CONFIG_PREFIX, true));
if (systemTablesFilter != null && tableIdMapper != null) {
this.tableFilters = new RelationalTableFilters(config, systemTablesFilter, tableIdMapper);
@ -684,7 +689,7 @@ public KeyMapper getKeyMapper() {
* settings, without the "database." prefix. Typically used for passing through
* driver settings.
*/
public Configuration getJdbcConfig() {
public JdbcConfiguration getJdbcConfig() {
return jdbcConfig;
}

View File

@ -27,7 +27,6 @@
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
public class JdbcConnectionTest {
@ -35,7 +34,7 @@ public class JdbcConnectionTest {
@Test
public void testNormalClose() throws SQLException {
ConnectionFactory connFactory = (config) -> new NormalConnection();
JdbcConnection conn = new JdbcConnection(Configuration.empty(), connFactory, "\"", "\"");
JdbcConnection conn = new JdbcConnection(JdbcConfiguration.empty(), connFactory, "\"", "\"");
conn.connect();
conn.close();
}
@ -43,7 +42,7 @@ public void testNormalClose() throws SQLException {
@Test
public void testForceClose() throws SQLException {
ConnectionFactory connFactory = (config) -> new TimingOutConnection();
JdbcConnection conn = new JdbcConnection(Configuration.empty(), connFactory, "\"", "\"");
JdbcConnection conn = new JdbcConnection(JdbcConfiguration.empty(), connFactory, "\"", "\"");
conn.connect();
conn.close();
}
@ -51,7 +50,7 @@ public void testForceClose() throws SQLException {
@Test(expected = SQLException.class)
public void testRogueConnection() throws SQLException {
ConnectionFactory connFactory = (config) -> new RogueConnection();
JdbcConnection conn = new JdbcConnection(Configuration.empty(), connFactory, "\"", "\"");
JdbcConnection conn = new JdbcConnection(JdbcConfiguration.empty(), connFactory, "\"", "\"");
conn.connect();
conn.close();
}

View File

@ -46,7 +46,7 @@ public String getConnectorName() {
@Test
public void testBuildQueryOnePkColumn() {
final SignalBasedIncrementalSnapshotChangeEventSource<? extends Partition, TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
config(), new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
DataChangeEventListener.NO_OP());
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
@ -68,7 +68,7 @@ public void testBuildQueryOnePkColumn() {
@Test
public void testBuildQueryThreePkColumns() {
final SignalBasedIncrementalSnapshotChangeEventSource<? extends Partition, TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
config(), new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
DataChangeEventListener.NO_OP());
final IncrementalSnapshotContext<TableId> context = new SignalBasedIncrementalSnapshotContext<>();
source.setContext(context);
@ -94,7 +94,7 @@ public void testBuildQueryThreePkColumns() {
@Test
public void testMaxQuery() {
final SignalBasedIncrementalSnapshotChangeEventSource<? extends Partition, TableId> source = new SignalBasedIncrementalSnapshotChangeEventSource<>(
config(), new JdbcConnection(config().getConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
config(), new JdbcConnection(config().getJdbcConfig(), config -> null, "\"", "\""), null, null, null, SnapshotProgressListener.NO_OP(),
DataChangeEventListener.NO_OP());
final Column pk1 = Column.editor().name("pk1").create();
final Column pk2 = Column.editor().name("pk2").create();

View File

@ -21,8 +21,8 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
@ -93,7 +93,7 @@ public void testPulsar() throws Exception {
records.add(consumer.receive());
return records.size() >= MESSAGE_COUNT;
});
final Configuration config = Configuration.create()
final JdbcConfiguration config = JdbcConfiguration.create()
.with("hostname", dbHostname)
.with("port", dbPort)
.with("user", dbUser)

View File

@ -14,9 +14,9 @@
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
@ -46,7 +46,7 @@ public class RedisOffsetIT {
protected static Jedis jedis;
private PostgresConnection getPostgresConnection() {
return new PostgresConnection(Configuration.create()
return new PostgresConnection(JdbcConfiguration.create()
.with("user", PostgresTestResourceLifecycleManager.POSTGRES_USER)
.with("password", PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD)
.with("dbname", PostgresTestResourceLifecycleManager.POSTGRES_DBNAME)

View File

@ -12,9 +12,9 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
@ -38,7 +38,7 @@
public class RedisStreamIT {
private PostgresConnection getPostgresConnection() {
return new PostgresConnection(Configuration.create()
return new PostgresConnection(JdbcConfiguration.create()
.with("user", PostgresTestResourceLifecycleManager.POSTGRES_USER)
.with("password", PostgresTestResourceLifecycleManager.POSTGRES_PASSWORD)
.with("dbname", PostgresTestResourceLifecycleManager.POSTGRES_DBNAME)