DBZ-588 Fail on inconsistent schema

This commit is contained in:
Jiri Pechanec 2018-02-01 14:30:38 +01:00 committed by Gunnar Morling
parent b3b379a3a3
commit 16f4726a31
5 changed files with 151 additions and 19 deletions

View File

@ -42,7 +42,7 @@
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventDeserializationFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
import io.debezium.function.BlockingConsumer;
@ -71,7 +71,8 @@ public class BinlogReader extends AbstractReader {
private final BinlogReaderMetrics metrics;
private final Clock clock;
private final ElapsedTimeStrategy pollOutputDelay;
private final EventDeserializationFailureHandlingMode eventDeserializationFailureHandlingMode;
private final EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode;
private final EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode;
private int startingRowNumber = 0;
private long recordCounter = 0L;
@ -147,6 +148,7 @@ public BinlogReader(String name, MySqlTaskContext context) {
recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords();
clock = context.clock();
eventDeserializationFailureHandlingMode = context.eventDeserializationFailureHandlingMode();
inconsistentSchemaHandlingMode = context.inconsistentSchemaHandlingMode();
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
@ -485,7 +487,7 @@ protected void handleServerIncident(Event event) {
EventHeaderV4 eventHeader = (EventHeaderV4) data.getCause().getEventHeader(); // safe cast, instantiated that ourselves
// logging some additional context but not the exception itself, this will happen in handleEvent()
if(eventDeserializationFailureHandlingMode == EventDeserializationFailureHandlingMode.FAIL) {
if(eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
logger.error(
"Error while deserializing binlog event at offset {}.{}" +
"Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
@ -498,7 +500,7 @@ protected void handleServerIncident(Event event) {
throw new RuntimeException(data.getCause());
}
else if(eventDeserializationFailureHandlingMode == EventDeserializationFailureHandlingMode.WARN) {
else if(eventDeserializationFailureHandlingMode == EventProcessingFailureHandlingMode.WARN) {
logger.warn(
"Error while deserializing binlog event at offset {}.{}" +
"This exception will be ignored and the event be skipped.{}" +
@ -638,7 +640,48 @@ protected void handleUpdateTableMetadata(Event event) {
if (recordMakers.assign(tableNumber, tableId)) {
logger.debug("Received update table metadata event: {}", event);
} else {
logger.debug("Skipping update table metadata event: {}", event);
if (context.dbSchema().isTableMonitored(tableId)) {
EventHeaderV4 eventHeader = event.getHeader();
if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.FAIL) {
logger.error(
"Schema inconsistency detected while processing binlog event at offset {}.{}" +
"Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
source.offset(),
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
source.binlogFilename()
);
throw new ConnectException("Inconsistency in internal schema detected");
} else if (inconsistentSchemaHandlingMode == EventProcessingFailureHandlingMode.WARN) {
logger.warn(
"Schema inconsistency detected while processing binlog event at offset {}.{}" +
"This exception will be ignored and the event be skipped.{}" +
"Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
source.offset(),
System.lineSeparator(),
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
source.binlogFilename()
);
} else {
logger.debug(
"Schema inconsistency detected while processing binlog event at offset {}.{}" +
"This exception will be ignored and the event be skipped.{}" +
"Use the mysqlbinlog tool to view the problematic event: mysqlbinlog --start-position={} --stop-position={} --verbose {}",
source.offset(),
System.lineSeparator(),
System.lineSeparator(),
eventHeader.getPosition(),
eventHeader.getNextPosition(),
source.binlogFilename()
);
}
} else {
logger.debug("Skipping update table metadata event: {} for non-monitored table {}", event, tableId);
}
}
}

View File

@ -323,9 +323,9 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
}
/**
* The set of predefined modes for dealing with failures during binlog event serialization.
* The set of predefined modes for dealing with failures during binlog event processing.
*/
public static enum EventDeserializationFailureHandlingMode implements EnumeratedValue {
public static enum EventProcessingFailureHandlingMode implements EnumeratedValue {
/**
* Problematic events will be skipped.
@ -344,7 +344,7 @@ public static enum EventDeserializationFailureHandlingMode implements Enumerated
private final String value;
private EventDeserializationFailureHandlingMode(String value) {
private EventProcessingFailureHandlingMode(String value) {
this.value = value;
}
@ -359,14 +359,14 @@ public String getValue() {
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static EventDeserializationFailureHandlingMode parse(String value) {
public static EventProcessingFailureHandlingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (EventDeserializationFailureHandlingMode option : EventDeserializationFailureHandlingMode.values()) {
for (EventProcessingFailureHandlingMode option : EventProcessingFailureHandlingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
@ -765,7 +765,7 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
public static final Field EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE = Field.create("event.deserialization.failure.handling.mode")
.withDisplayName("Event deserialization failure handling")
.withEnum(EventDeserializationFailureHandlingMode.class, EventDeserializationFailureHandlingMode.FAIL)
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Specify how failures during deserialization of binlog events (i.e. when encountering a corrupted event) should be handled, including:"
@ -773,6 +773,16 @@ public static EventDeserializationFailureHandlingMode parse(String value) {
+ "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
+ "'ignore' the problematic event will be skipped.");
public static final Field INCONSISTENT_SCHEMA_HANDLING_MODE = Field.create("inconsistent.schema.handling.mode")
.withDisplayName("Inconsistent schema failure handling")
.withEnum(EventProcessingFailureHandlingMode.class, EventProcessingFailureHandlingMode.FAIL)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:"
+ "'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; "
+ "'warn' the problematic event and its binlog position will be logged and the event will be skipped;"
+ "'ignore' the problematic event will be skipped.");
public static final Field SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE = Field.create("snapshot.select.statement.overrides")
.withDisplayName("List of tables where the default select statement used during snapshotting should be overridden.")
.withType(Type.STRING)
@ -831,7 +841,8 @@ public static final Field MASK_COLUMN(int length) {
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER,
BIGINT_UNSIGNED_HANDLING_MODE,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE);
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE);
/**
* The set of {@link Field}s that are included in the {@link #configDef() configuration definition}. This includes
@ -859,7 +870,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER,
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE);
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE);
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE);

View File

@ -20,7 +20,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventDeserializationFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnection.ConnectionFactory;
@ -96,9 +96,14 @@ public boolean sslModeEnabled() {
return sslMode() != SecureConnectionMode.DISABLED;
}
public EventDeserializationFailureHandlingMode eventDeserializationFailureHandlingMode() {
public EventProcessingFailureHandlingMode eventDeserializationFailureHandlingMode() {
String mode = config.getString(MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE);
return EventDeserializationFailureHandlingMode.parse(mode);
return EventProcessingFailureHandlingMode.parse(mode);
}
public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() {
String mode = config.getString(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE);
return EventProcessingFailureHandlingMode.parse(mode);
}
public void start() {

View File

@ -189,7 +189,7 @@ public Tables tables() {
* or if the table has been excluded by the filters
*/
public Table tableFor(TableId id) {
return filters.tableFilter().test(id) ? tables.forTable(id) : null;
return isTableMonitored(id) ? tables.forTable(id) : null;
}
/**
@ -204,7 +204,17 @@ public Table tableFor(TableId id) {
* or if the table has been excluded by the filters
*/
public TableSchema schemaFor(TableId id) {
return filters.tableFilter().test(id) ? tableSchemaByTableId.get(id) : null;
return isTableMonitored(id) ? tableSchemaByTableId.get(id) : null;
}
/**
* Decide whether events should be captured for a given table
*
* @param id the fully-qualified table identifier; may be null
* @return true if events from the table are captured
*/
public boolean isTableMonitored(TableId id) {
return filters.tableFilter().test(id);
}
/**

View File

@ -10,27 +10,33 @@
import static org.junit.Assert.assertTrue;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.data.Envelope;
import io.debezium.data.KeyValueStore;
import io.debezium.data.KeyValueStore.Collection;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
@ -351,6 +357,63 @@ public void shouldHandleMySQLTimeCorrectly() throws Exception {
assertThat(c5Time).isEqualTo(Duration.ofHours(-838).minusMinutes(59).minusSeconds(58).minusNanos(999999000));
}
@Test(expected = ConnectException.class)
public void shouldFailOnSchemaInconsistency() throws Exception {
inconsistentSchema(null);
consumeAtLeast(2);
}
@Test
public void shouldWarnOnSchemaInconsistency() throws Exception {
inconsistentSchema(EventProcessingFailureHandlingMode.WARN);
int consumed = consumeAtLeast(2, 2, TimeUnit.SECONDS);
assertThat(consumed).isZero();
}
@Test
public void shouldIgnoreOnSchemaInconsistency() throws Exception {
inconsistentSchema(EventProcessingFailureHandlingMode.IGNORE);
int consumed = consumeAtLeast(2, 2, TimeUnit.SECONDS);
assertThat(consumed).isZero();
}
private void inconsistentSchema(EventProcessingFailureHandlingMode mode) throws InterruptedException, SQLException {
if (mode == null) {
config = simpleConfig().build();
} else {
config = simpleConfig()
.with(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE, mode)
.build();
}
context = new MySqlTaskContext(config);
context.start();
context.source().setBinlogStartPoint("",0L); // start from beginning
context.initializeHistory();
reader = new BinlogReader("binlog", context);
// Start reading the binlog ...
reader.start();
// Poll for records ...
// Testing.Print.enable();
int expected = 9 + 9 + 4 + 5; // only the inserts for our 4 tables in this database
int consumed = consumeAtLeast(expected);
assertThat(consumed).isGreaterThanOrEqualTo(expected);
reader.stop();
reader.start();
reader.context.dbSchema().applyDdl(context.source(), DATABASE.getDatabaseName(), "DROP TABLE customers", null);
try (
final MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());
final JdbcConnection connection = db.connect();
final Connection jdbc = connection.connection();
final Statement statement = jdbc.createStatement()) {
statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
}
}
private Duration toDuration(String duration) {
return Duration.parse(duration);
}