DBZ-3615: Keep database connection closed during schema recovery
This commit is contained in:
parent
331648dce0
commit
b36e8247a3
@ -79,12 +79,6 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
|
|||||||
connection = new MySqlConnection(new MySqlConnectionConfiguration(config),
|
connection = new MySqlConnection(new MySqlConnectionConfiguration(config),
|
||||||
connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader()
|
connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader()
|
||||||
: new MysqlTextProtocolFieldReader());
|
: new MysqlTextProtocolFieldReader());
|
||||||
try {
|
|
||||||
connection.setAutoCommit(false);
|
|
||||||
}
|
|
||||||
catch (SQLException e) {
|
|
||||||
throw new DebeziumException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
validateBinlogConfiguration(connectorConfig);
|
validateBinlogConfiguration(connectorConfig);
|
||||||
|
|
||||||
@ -97,7 +91,26 @@ public ChangeEventSourceCoordinator<MySqlOffsetContext> start(Configuration conf
|
|||||||
|
|
||||||
this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive);
|
this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive);
|
||||||
|
|
||||||
|
LOGGER.info("Closing connection before starting schema recovery");
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
catch (SQLException e) {
|
||||||
|
throw new DebeziumException(e);
|
||||||
|
}
|
||||||
|
|
||||||
validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema);
|
validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema);
|
||||||
|
|
||||||
|
LOGGER.info("Reconnecting after finishing schema recovery");
|
||||||
|
|
||||||
|
try {
|
||||||
|
connection.setAutoCommit(false);
|
||||||
|
}
|
||||||
|
catch (SQLException e) {
|
||||||
|
throw new DebeziumException(e);
|
||||||
|
}
|
||||||
|
|
||||||
// If the binlog position is not available it is necessary to reexecute snapshot
|
// If the binlog position is not available it is necessary to reexecute snapshot
|
||||||
if (validateSnapshotFeasibility(connectorConfig, previousOffset)) {
|
if (validateSnapshotFeasibility(connectorConfig, previousOffset)) {
|
||||||
previousOffset = null;
|
previousOffset = null;
|
||||||
|
Loading…
Reference in New Issue
Block a user