diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index db6019867..624dd5129 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -79,12 +79,6 @@ public ChangeEventSourceCoordinator start(Configuration conf connection = new MySqlConnection(new MySqlConnectionConfiguration(config), connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() : new MysqlTextProtocolFieldReader()); - try { - connection.setAutoCommit(false); - } - catch (SQLException e) { - throw new DebeziumException(e); - } validateBinlogConfiguration(connectorConfig); @@ -97,7 +91,26 @@ public ChangeEventSourceCoordinator start(Configuration conf this.schema = new MySqlDatabaseSchema(connectorConfig, valueConverters, topicSelector, schemaNameAdjuster, tableIdCaseInsensitive); + LOGGER.info("Closing connection before starting schema recovery"); + + try { + connection.close(); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + validateAndLoadDatabaseHistory(connectorConfig, previousOffset, schema); + + LOGGER.info("Reconnecting after finishing schema recovery"); + + try { + connection.setAutoCommit(false); + } + catch (SQLException e) { + throw new DebeziumException(e); + } + // If the binlog position is not available it is necessary to reexecute snapshot if (validateSnapshotFeasibility(connectorConfig, previousOffset)) { previousOffset = null;