diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 2f8db798f..128e77d58 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -61,6 +61,7 @@ public class MySqlConnectorTask extends BaseSourceTask queue; private volatile AbstractConnectorConnection connection; + private volatile AbstractConnectorConnection beanRegistryJdbcConnection; private volatile ErrorHandler errorHandler; private volatile MySqlDatabaseSchema schema; @@ -126,10 +127,11 @@ public ChangeEventSourceCoordinator start(Co } // Manual Bean Registration + beanRegistryJdbcConnection = connectionFactory.newConnection(); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); - connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); + connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters); // Service providers @@ -267,6 +269,15 @@ protected void doStop() { LOGGER.error("Exception while closing JDBC connection", e); } + try { + if (beanRegistryJdbcConnection != null) { + beanRegistryJdbcConnection.close(); + } + } + catch (SQLException e) { + LOGGER.error("Exception while closing JDBC bean registry connection", e); + } + if (schema != null) { schema.close(); } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java index b09aedfd7..4637e471a 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleConnectorTask.java @@ -48,6 +48,7 @@ public class OracleConnectorTask extends BaseSourceTask queue; private volatile OracleConnection jdbcConnection; + private volatile OracleConnection beanRegistryJdbcConnection; private volatile ErrorHandler errorHandler; private volatile OracleDatabaseSchema schema; @@ -76,10 +77,11 @@ public ChangeEventSourceCoordinator start( topicNamingStrategy, tableNameCaseSensitivity); // Manual Bean Registration + beanRegistryJdbcConnection = connectionFactory.newConnection(); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); - connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); + connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters); // Service providers @@ -188,6 +190,15 @@ public void doStop() { LOGGER.error("Exception while closing JDBC connection", e); } + try { + if (beanRegistryJdbcConnection != null) { + beanRegistryJdbcConnection.close(); + } + } + catch (SQLException e) { + LOGGER.error("Exception while closing JDBC bean registry connection", e); + } + if (schema != null) { schema.close(); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index 07aa3a683..cab980e65 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -64,6 +64,7 @@ public class PostgresConnectorTask extends BaseSourceTask queue; private volatile PostgresConnection jdbcConnection; + private volatile PostgresConnection beanRegistryJdbcConnection; private volatile ReplicationConnection replicationConnection = null; private volatile ErrorHandler errorHandler; @@ -114,10 +115,11 @@ public ChangeEventSourceCoordinator st final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); // Manual Bean Registration + beanRegistryJdbcConnection = connectionFactory.newConnection(); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); - connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); + connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverter); // Service providers @@ -318,6 +320,15 @@ protected void doStop() { LOGGER.trace("Error while closing replication connection", e); } + try { + if (beanRegistryJdbcConnection != null) { + beanRegistryJdbcConnection.close(); + } + } + catch (Exception e) { + LOGGER.trace("Error while closing JDBC bean registry connection", e); + } + if (jdbcConnection != null) { jdbcConnection.close(); }