Merge pull request #96 from rhauch/dbz-109

DBZ-109 Captured MySQL error code and SQLSTATE code in exceptions
This commit is contained in:
Randall Hauch 2016-08-25 08:12:26 -05:00 committed by GitHub
commit efd430bbee
2 changed files with 28 additions and 3 deletions

View File

@ -20,6 +20,8 @@ This release includes all of the fixes from the link:release-0-2-4[0.2.4] releas
* Corrected how the MySQL connector parses some DDL statements. [DBZ-106](https://issues.jboss.org/projects/DBZ/issues/DBZ-106) * Corrected how the MySQL connector parses some DDL statements. [DBZ-106](https://issues.jboss.org/projects/DBZ/issues/DBZ-106)
* Corrected a potential error in the MySQL connector when performing a snapshot on a MySQL server using GTIDs where the GTID set string includes new line characters. [DBZ-107](https://issues.jboss.org/projects/DBZ/issues/DBZ-107) * Corrected a potential error in the MySQL connector when performing a snapshot on a MySQL server using GTIDs where the GTID set string includes new line characters. [DBZ-107](https://issues.jboss.org/projects/DBZ/issues/DBZ-107)
* Removed unused code and test case. [DBZ-108](https://issues.jboss.org/projects/DBZ/issues/DBZ-108)
* Ensure that the MySQL error code and SQLSTATE are included in exceptions reported by the connector. [DBZ-109](https://issues.jboss.org/projects/DBZ/issues/DBZ-109)
## 0.3.0 ## 0.3.0

View File

@ -5,6 +5,7 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -18,6 +19,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.github.shyiko.mysql.binlog.network.ServerException;
import io.debezium.util.Clock; import io.debezium.util.Clock;
import io.debezium.util.Metronome; import io.debezium.util.Metronome;
@ -105,7 +108,7 @@ protected void completeSuccessfully() {
* @param error the error that resulted in the failure; should not be {@code null} * @param error the error that resulted in the failure; should not be {@code null}
*/ */
protected void failed(Throwable error) { protected void failed(Throwable error) {
this.failure.set(new ConnectException(error)); this.failure.set(wrap(error));
} }
/** /**
@ -114,8 +117,28 @@ protected void failed(Throwable error) {
* @param msg the error message; may not be null * @param msg the error message; may not be null
*/ */
protected void failed(Throwable error, String msg) { protected void failed(Throwable error, String msg) {
this.logger.error("Failed due to error: {}", msg, error); ConnectException wrapped = wrap(error);
this.failure.set(new ConnectException(msg,error)); this.logger.error("Failed due to error: {}", msg, wrapped);
this.failure.set(wrapped);
}
/**
* Wraps the specified exception in a {@link ConnectException}, ensuring that all useful state is captured inside
* the new exception's message.
* @param error the exception; may not be null
* @return the wrapped Kafka Connect exception
*/
protected ConnectException wrap(Throwable error) {
assert error != null;
String msg = error.getMessage();
if (error instanceof ServerException) {
ServerException e = (ServerException) error;
msg = msg + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSqlState() + ".";
} else if (error instanceof SQLException) {
SQLException e = (SQLException) error;
msg = e.getMessage() + " Error code: " + e.getErrorCode() + "; SQLSTATE: " + e.getSQLState() + ".";
}
return new ConnectException(msg, error);
} }
/** /**