DBZ-7275 Close connection registerd with bean registry

New connection created for bean registry is never closed. Close it
during when stopping the task.
This commit is contained in:
Vojtech Juranek 2023-12-19 00:31:00 +01:00 committed by Jiri Pechanec
parent 67260f8286
commit 37ebb8ecae
3 changed files with 36 additions and 3 deletions

View File

@ -61,6 +61,7 @@ public class MySqlConnectorTask extends BaseSourceTask<MySqlPartition, MySqlOffs
private volatile MySqlTaskContext taskContext; private volatile MySqlTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue; private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile AbstractConnectorConnection connection; private volatile AbstractConnectorConnection connection;
private volatile AbstractConnectorConnection beanRegistryJdbcConnection;
private volatile ErrorHandler errorHandler; private volatile ErrorHandler errorHandler;
private volatile MySqlDatabaseSchema schema; private volatile MySqlDatabaseSchema schema;
@ -126,10 +127,11 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
} }
// Manual Bean Registration // Manual Bean Registration
beanRegistryJdbcConnection = connectionFactory.newConnection();
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection);
connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters);
// Service providers // Service providers
@ -267,6 +269,15 @@ protected void doStop() {
LOGGER.error("Exception while closing JDBC connection", e); LOGGER.error("Exception while closing JDBC connection", e);
} }
try {
if (beanRegistryJdbcConnection != null) {
beanRegistryJdbcConnection.close();
}
}
catch (SQLException e) {
LOGGER.error("Exception while closing JDBC bean registry connection", e);
}
if (schema != null) { if (schema != null) {
schema.close(); schema.close();
} }

View File

@ -48,6 +48,7 @@ public class OracleConnectorTask extends BaseSourceTask<OraclePartition, OracleO
private volatile OracleTaskContext taskContext; private volatile OracleTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue; private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile OracleConnection jdbcConnection; private volatile OracleConnection jdbcConnection;
private volatile OracleConnection beanRegistryJdbcConnection;
private volatile ErrorHandler errorHandler; private volatile ErrorHandler errorHandler;
private volatile OracleDatabaseSchema schema; private volatile OracleDatabaseSchema schema;
@ -76,10 +77,11 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
topicNamingStrategy, tableNameCaseSensitivity); topicNamingStrategy, tableNameCaseSensitivity);
// Manual Bean Registration // Manual Bean Registration
beanRegistryJdbcConnection = connectionFactory.newConnection();
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection);
connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverters);
// Service providers // Service providers
@ -188,6 +190,15 @@ public void doStop() {
LOGGER.error("Exception while closing JDBC connection", e); LOGGER.error("Exception while closing JDBC connection", e);
} }
try {
if (beanRegistryJdbcConnection != null) {
beanRegistryJdbcConnection.close();
}
}
catch (SQLException e) {
LOGGER.error("Exception while closing JDBC bean registry connection", e);
}
if (schema != null) { if (schema != null) {
schema.close(); schema.close();
} }

View File

@ -64,6 +64,7 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, Pos
private volatile PostgresTaskContext taskContext; private volatile PostgresTaskContext taskContext;
private volatile ChangeEventQueue<DataChangeEvent> queue; private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile PostgresConnection jdbcConnection; private volatile PostgresConnection jdbcConnection;
private volatile PostgresConnection beanRegistryJdbcConnection;
private volatile ReplicationConnection replicationConnection = null; private volatile ReplicationConnection replicationConnection = null;
private volatile ErrorHandler errorHandler; private volatile ErrorHandler errorHandler;
@ -114,10 +115,11 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset(); final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
// Manual Bean Registration // Manual Bean Registration
beanRegistryJdbcConnection = connectionFactory.newConnection();
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, config);
connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig); connectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, connectorConfig);
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, connectionFactory.newConnection()); connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection);
connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverter); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverter);
// Service providers // Service providers
@ -318,6 +320,15 @@ protected void doStop() {
LOGGER.trace("Error while closing replication connection", e); LOGGER.trace("Error while closing replication connection", e);
} }
try {
if (beanRegistryJdbcConnection != null) {
beanRegistryJdbcConnection.close();
}
}
catch (Exception e) {
LOGGER.trace("Error while closing JDBC bean registry connection", e);
}
if (jdbcConnection != null) { if (jdbcConnection != null) {
jdbcConnection.close(); jdbcConnection.close();
} }