DBZ-823 Restrict the type with generics

This commit is contained in:
Jiri Pechanec 2023-03-03 10:33:24 +01:00
parent 5e666f00ee
commit 3da2434c6c
8 changed files with 15 additions and 10 deletions

View File

@ -75,8 +75,9 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
.withDefault("database.useCursorFetch", connectorConfig.useCursorFetch())
.build();
MainConnectionProvidingConnectionFactory<MySqlConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new MySqlConnection(new MySqlConnectionConfiguration(config),
connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig)));
MainConnectionProvidingConnectionFactory<MySqlConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new MySqlConnection(new MySqlConnectionConfiguration(config),
connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig)));
connection = connectionFactory.mainConnection();

View File

@ -57,7 +57,8 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig();
MainConnectionProvidingConnectionFactory<OracleConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new OracleConnection(jdbcConfig));
MainConnectionProvidingConnectionFactory<OracleConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new OracleConnection(jdbcConfig));
jdbcConnection = connectionFactory.mainConnection();
validateRedoLogConfiguration();

View File

@ -39,7 +39,8 @@ public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactor
private final SlotCreationResult slotCreatedInfo;
private final SlotState startingSlotInfo;
public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter, MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory,
public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter,
MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory,
ErrorHandler errorHandler, PostgresEventDispatcher<TableId> dispatcher, Clock clock, PostgresSchema schema,
PostgresTaskContext taskContext, ReplicationConnection replicationConnection, SlotCreationResult slotCreatedInfo,
SlotState startingSlotInfo) {

View File

@ -71,8 +71,9 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode());
MainConnectionProvidingConnectionFactory<SqlServerConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new SqlServerConnection(connectorConfig.getJdbcConfig(),
valueConverters, connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(), connectorConfig.getOptionRecompile()));
MainConnectionProvidingConnectionFactory<SqlServerConnection> connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(
() -> new SqlServerConnection(connectorConfig.getJdbcConfig(),
valueConverters, connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(), connectorConfig.getOptionRecompile()));
dataConnection = connectionFactory.mainConnection();
metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters,
connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase());

View File

@ -11,7 +11,7 @@
* @param <T>
*/
@FunctionalInterface
public interface ConnectionFactory<T> {
public interface ConnectionFactory<T extends JdbcConnection> {
T newConnection();
}

View File

@ -5,7 +5,7 @@
*/
package io.debezium.jdbc;
public class DefaultMainConnectionProvidingConnectionFactory<T> implements MainConnectionProvidingConnectionFactory<T> {
public class DefaultMainConnectionProvidingConnectionFactory<T extends JdbcConnection> implements MainConnectionProvidingConnectionFactory<T> {
private ConnectionFactory<T> delegate;

View File

@ -12,7 +12,7 @@
*
* @param <T>
*/
public interface MainConnectionProvidingConnectionFactory<T> extends ConnectionFactory<T> {
public interface MainConnectionProvidingConnectionFactory<T extends JdbcConnection> extends ConnectionFactory<T> {
T mainConnection();
}

View File

@ -81,7 +81,8 @@ public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O
protected final Clock clock;
private final SnapshotProgressListener<P> snapshotProgressListener;
public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory<? extends JdbcConnection> jdbcConnectionFactory,
public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<? extends JdbcConnection> jdbcConnectionFactory,
RelationalDatabaseSchema schema, EventDispatcher<P, TableId> dispatcher, Clock clock,
SnapshotProgressListener<P> snapshotProgressListener) {
super(connectorConfig, snapshotProgressListener);