DBZ-1010 Handle error deserialization also in deserializers
This commit is contained in:
parent
0e91c662cb
commit
2c3d2ee083
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
||||||
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
|
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
|
||||||
@ -964,17 +965,39 @@ public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
|
public void onEventDeserializationFailure(BinaryLogClient client, Exception ex) {
|
||||||
logger.debug("A deserialization failure event arrived", ex);
|
if(eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
|
||||||
logReaderState();
|
logger.debug("A deserialization failure event arrived", ex);
|
||||||
BinlogReader.this.failed(ex);
|
logReaderState();
|
||||||
|
BinlogReader.this.failed(ex);
|
||||||
|
}
|
||||||
|
else if(eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
|
||||||
|
logger.warn("A deserialization failure event arrived", ex);
|
||||||
|
logReaderState(Level.WARN);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
logger.debug("A deserialization failure event arrived", ex);
|
||||||
|
logReaderState(Level.DEBUG);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logReaderState() {
|
private void logReaderState() {
|
||||||
logger.error("Error during binlog processing. Last offset stored = {}, binlog reader near position = {}",
|
logReaderState(Level.ERROR);
|
||||||
lastOffset,
|
}
|
||||||
client == null ? "N/A" : client.getBinlogFilename() + "/" + client.getBinlogPosition()
|
|
||||||
);
|
private void logReaderState(Level severity) {
|
||||||
|
final Object position = client == null ? "N/A" : client.getBinlogFilename() + "/" + client.getBinlogPosition();
|
||||||
|
final String message = "Error during binlog processing. Last offset stored = {}, binlog reader near position = {}";
|
||||||
|
switch (severity) {
|
||||||
|
case WARN:
|
||||||
|
logger.warn(message, lastOffset, position);
|
||||||
|
break;
|
||||||
|
case DEBUG:
|
||||||
|
logger.debug(message, lastOffset, position);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
logger.error(message, lastOffset, position);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected BinlogReaderMetrics getMetrics() {
|
protected BinlogReaderMetrics getMetrics() {
|
||||||
|
Loading…
Reference in New Issue
Block a user