diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 6393d92c2..abe03e5c9 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -75,8 +75,9 @@ public ChangeEventSourceCoordinator start(Co .withDefault("database.useCursorFetch", connectorConfig.useCursorFetch()) .build(); - MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new MySqlConnection(new MySqlConnectionConfiguration(config), - connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig))); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( + () -> new MySqlConnection(new MySqlConnectionConfiguration(config), + connectorConfig.useCursorFetch() ? new MySqlBinaryProtocolFieldReader(connectorConfig) : new MySqlTextProtocolFieldReader(connectorConfig))); connection = connectionFactory.mainConnection(); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index 364db66fc..0eb40394c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -57,7 +57,8 @@ public ChangeEventSourceCoordinator start( SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); JdbcConfiguration jdbcConfig = connectorConfig.getJdbcConfig(); - MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new OracleConnection(jdbcConfig)); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( + () -> new OracleConnection(jdbcConfig)); jdbcConnection = connectionFactory.mainConnection(); validateRedoLogConfiguration(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java index 7aece02b7..7cb19aded 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java @@ -39,7 +39,8 @@ public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactor private final SlotCreationResult slotCreatedInfo; private final SlotState startingSlotInfo; - public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter, MainConnectionProvidingConnectionFactory connectionFactory, + public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter, + MainConnectionProvidingConnectionFactory connectionFactory, ErrorHandler errorHandler, PostgresEventDispatcher dispatcher, Clock clock, PostgresSchema schema, PostgresTaskContext taskContext, ReplicationConnection replicationConnection, SlotCreationResult slotCreatedInfo, SlotState startingSlotInfo) { diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java index 486133984..151bdba79 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorTask.java @@ -71,8 +71,9 @@ public ChangeEventSourceCoordinator final SqlServerValueConverters valueConverters = new SqlServerValueConverters(connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode()); - MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>(() -> new SqlServerConnection(connectorConfig.getJdbcConfig(), - valueConverters, connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(), connectorConfig.getOptionRecompile())); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( + () -> new SqlServerConnection(connectorConfig.getJdbcConfig(), + valueConverters, connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase(), connectorConfig.getOptionRecompile())); dataConnection = connectionFactory.mainConnection(); metadataConnection = new SqlServerConnection(connectorConfig.getJdbcConfig(), valueConverters, connectorConfig.getSkippedOperations(), connectorConfig.useSingleDatabase()); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/ConnectionFactory.java b/debezium-core/src/main/java/io/debezium/jdbc/ConnectionFactory.java index 6b57dea61..7ffd1d4b3 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/ConnectionFactory.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/ConnectionFactory.java @@ -11,7 +11,7 @@ * @param */ @FunctionalInterface -public interface ConnectionFactory { +public interface ConnectionFactory { T newConnection(); } diff --git a/debezium-core/src/main/java/io/debezium/jdbc/DefaultMainConnectionProvidingConnectionFactory.java b/debezium-core/src/main/java/io/debezium/jdbc/DefaultMainConnectionProvidingConnectionFactory.java index 96834e5de..849bb7f07 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/DefaultMainConnectionProvidingConnectionFactory.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/DefaultMainConnectionProvidingConnectionFactory.java @@ -5,7 +5,7 @@ */ package io.debezium.jdbc; -public class DefaultMainConnectionProvidingConnectionFactory implements MainConnectionProvidingConnectionFactory { +public class DefaultMainConnectionProvidingConnectionFactory implements MainConnectionProvidingConnectionFactory { private ConnectionFactory delegate; diff --git a/debezium-core/src/main/java/io/debezium/jdbc/MainConnectionProvidingConnectionFactory.java b/debezium-core/src/main/java/io/debezium/jdbc/MainConnectionProvidingConnectionFactory.java index ff4b389a7..b80ead53a 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/MainConnectionProvidingConnectionFactory.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/MainConnectionProvidingConnectionFactory.java @@ -12,7 +12,7 @@ * * @param */ -public interface MainConnectionProvidingConnectionFactory extends ConnectionFactory { +public interface MainConnectionProvidingConnectionFactory extends ConnectionFactory { T mainConnection(); } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 7a87e14a5..0347d3786 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -81,7 +81,8 @@ public abstract class RelationalSnapshotChangeEventSource

snapshotProgressListener; - public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig connectorConfig, MainConnectionProvidingConnectionFactory jdbcConnectionFactory, + public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig connectorConfig, + MainConnectionProvidingConnectionFactory jdbcConnectionFactory, RelationalDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, SnapshotProgressListener

snapshotProgressListener) { super(connectorConfig, snapshotProgressListener);