DBZ-109 Captured MySQL error code and SQLSTATE code in exceptions

The binlog reader and JDBC operations might throw exceptions with this information, so in these cases the connector now captures the error code and SQLSTATE code from the exception and includes them in the message.
This commit is contained in:
Randall Hauch 2016-08-25 07:58:27 -05:00
parent 551df8f629
commit 93d0fae02b
2 changed files with 28 additions and 3 deletions

View File

@ -20,6 +20,8 @@ This release includes all of the fixes from the link:release-0-2-4[0.2.4] releas
* Corrected how the MySQL connector parses some DDL statements. [DBZ-106](https://issues.jboss.org/projects/DBZ/issues/DBZ-106)
* Corrected a potential error in the MySQL connector when performing a snapshot on a MySQL server using GTIDs where the GTID set string includes new line characters. [DBZ-107](https://issues.jboss.org/projects/DBZ/issues/DBZ-107)
* Removed unused code and test case. [DBZ-108](https://issues.jboss.org/projects/DBZ/issues/DBZ-108)
* Ensure that the MySQL error code and SQLSTATE are included in exceptions reported by the connector. [DBZ-109](https://issues.jboss.org/projects/DBZ/issues/DBZ-109)
## 0.3.0

View File

@ -5,6 +5,7 @@
*/
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@ -18,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
@ -105,7 +108,7 @@ protected void completeSuccessfully() {
* @param error the error that resulted in the failure; should not be {@code null}
*/
protected void failed(Throwable error) {
this.failure.set(new ConnectException(error));
this.failure.set(wrap(error));
}
/**
@ -114,8 +117,28 @@ protected void failed(Throwable error) {
* @param msg the error message; may not be null
*/
protected void failed(Throwable error, String msg) {
this.logger.error("Failed due to error: {}", msg, error);
this.failure.set(new ConnectException(msg,error));
ConnectException wrapped = wrap(error);
this.logger.error("Failed due to error: {}", msg, wrapped);
this.failure.set(wrapped);
}
/**
* Wraps the specified exception in a {@link ConnectException}, ensuring that all useful state is captured inside
* the new exception's message.
* @param error the exception; may not be null
* @return the wrapped Kafka Connect exception
*/
protected ConnectException wrap(Throwable error) {
assert error != null;
String msg = error.getMessage();
if (error instanceof ServerException) {
ServerException e = (ServerException) error;
msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + ".";
} else if (error instanceof SQLException) {
SQLException e = (SQLException) error;
msg = e.getMessage() + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSQLState() + ".";
}
return new ConnectException(msg, error);
}
/**