DBZ-217 Handling exceptions ocurring during binlog event deserialization;

* Catching the exception and wrapping it into a pseudo INCIDENT event
* Logging the exception and the current SourceInfo state
This commit is contained in:
Gunnar Morling 2017-05-31 16:25:31 +02:00 committed by Jiri Pechanec
parent 55a252521f
commit f705ae4ac6
3 changed files with 116 additions and 9 deletions

View File

@ -35,6 +35,7 @@
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
@ -42,6 +43,7 @@
import com.github.shyiko.mysql.binlog.network.SSLMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode.EventDeserializationFailureHandlingMode;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
@ -112,14 +114,33 @@ public BinlogReader(String name, MySqlTaskContext context) {
EventDeserializer eventDeserializer = new EventDeserializer() {
@Override
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
try {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return event;
}
// DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
catch(EventDataDeserializationException edde) {
EventHeaderV4 header = new EventHeaderV4();
header.setEventType(EventType.INCIDENT);
header.setTimestamp(edde.getEventHeader().getTimestamp());
header.setServerId(edde.getEventHeader().getServerId());
if(edde.getEventHeader() instanceof EventHeaderV4) {
header.setEventLength(((EventHeaderV4)edde.getEventHeader()).getEventLength());
header.setNextPosition(((EventHeaderV4)edde.getEventHeader()).getNextPosition());
header.setFlags(((EventHeaderV4)edde.getEventHeader()).getFlags());
}
EventData data = new EventDataDeserializationExceptionData(edde);
return new Event(header, data);
}
return event;
}
};
// Add our custom deserializers ...
@ -383,7 +404,43 @@ protected void handleServerHeartbeat(Event event) {
* @param event the server stopped event to be processed; may not be null
*/
protected void handleServerIncident(Event event) {
logger.trace("Server incident: {}", event);
if (event.getData() instanceof EventDataDeserializationExceptionData) {
EventDataDeserializationExceptionData data = event.getData();
// TODO make it an option
EventDeserializationFailureHandlingMode deserializationFailureHandling = EventDeserializationFailureHandlingMode.FAIL;
EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated that ourselves
if(deserializationFailureHandling == EventDeserializationFailureHandlingMode.FAIL) {
// logging some additional context but not the exception itself, this will happen in handleEvent()
logger.error(
"Error while deserializing binlog event at offset {}." + System.lineSeparator() +
"Use the mysqlbinlog tool to view the problematic event: `mysqlbinlog --start-position={} --stop-position={} --verbose {}",
source.offset(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
source.binlogFilename()
);
throw new RuntimeException(data.getCause());
}
else if(deserializationFailureHandling == EventDeserializationFailureHandlingMode.WARN) {
logger.warn(
"Error while deserializing binlog event at offset {}." + System.lineSeparator() +
"This exception will be ignored and the event be skipped." + System.lineSeparator() +
"Use the mysqlbinlog tool to view the problematic event: `mysqlbinlog --start-position={} --stop-position={} --verbose {}",
source.offset(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
source.binlogFilename(),
data.getCause()
);
}
}
else {
logger.error("Server incident: {}", event);
}
}
/**

View File

@ -0,0 +1,35 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
/**
* Event data for an event of type {@link EventType#INCIDENT} representing a failure to deserialize a binlog event.
*
* @author Gunnar Morling
*/
public class EventDataDeserializationExceptionData implements EventData {
private static final long serialVersionUID = 1L;
private final EventDataDeserializationException cause;
public EventDataDeserializationExceptionData(EventDataDeserializationException cause) {
this.cause = cause;
}
public EventDataDeserializationException getCause() {
return cause;
}
@Override
public String toString() {
return "EventDataDeserializationExceptionData [cause=" + cause + "]";
}
}

View File

@ -19,8 +19,8 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
@ -357,6 +357,21 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
public static enum EventDeserializationFailureHandlingMode implements EnumeratedValue {
IGNORE,
WARN,
FAIL;
@Override
public String getValue() {
return null;
}
}
}
private static final String DATABASE_WHITELIST_NAME = "database.whitelist";