diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 7f9cb9642..0ff84b0cc 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -150,10 +150,10 @@ public void start(Map props) { config.getString(MySqlConnectorConfig.TABLE_WHITELIST), config.getString(MySqlConnectorConfig.TABLE_BLACKLIST)); if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) { - Predicate ignoreBuiltins = (id) -> { - return !BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase()) && !BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()); + Predicate isBuiltin = (id) -> { + return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase()); }; - tableFilter = ignoreBuiltins.or(tableFilter); + tableFilter = tableFilter.and(isBuiltin.negate()); } // Create the queue ... @@ -163,6 +163,7 @@ public void start(Map props) { // Set up our handlers for specific kinds of events ... tables = new Tables(); tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter); + eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs); eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata); eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand); eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java index b3a0b622e..162cada18 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/TableConverters.java @@ -26,6 +26,7 @@ import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.QueryEventData; +import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.TableMapEventData; import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; @@ -86,6 +87,17 @@ public void loadTables() { }); } + public void rotateLogs(Event event, SourceInfo source, Consumer recorder) { + logger.debug("Rotating logs: {}", event); + RotateEventData command = event.getData(); + if (command != null) { + // The logs are being rotated, which means the server was either restarted, or the binlog has transitioned to a new + // file. In either case, the table numbers will change, so we need to discard the cache of converters by the table IDs + // (e.g., the Map). Note, however, that we're NOT clearing out the Map. + convertersByTableId.clear(); + } + } + public void updateTableCommand(Event event, SourceInfo source, Consumer recorder) { QueryEventData command = event.getData(); String databaseName = command.getDatabase(); @@ -142,8 +154,8 @@ public void updateTableCommand(Event event, SourceInfo source, Consumer recorder) { TableMapEventData metadata = event.getData(); long tableNumber = metadata.getTableId(); + logger.debug("Received update table metadata event: {}", event); if (!convertersByTableId.containsKey(tableNumber)) { - logger.debug("Received update table metadata event: {}", event); // We haven't seen this table ID, so we need to rebuild our converter functions ... String serverName = source.serverName(); String databaseName = metadata.getDatabase(); @@ -153,6 +165,7 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer long tableNumber = write.getTableId(); BitSet includedColumns = write.getIncludedColumns(); Converter converter = convertersByTableId.get(tableNumber); - if (tableFilter.test(converter.tableId())) { - logger.debug("Received insert row event: {}", event); - String topic = converter.topic(); - Integer partition = converter.partition(); - List rows = write.getRows(); - for (int row = 0; row != rows.size(); ++row) { - Serializable[] values = rows.get(row); - Schema keySchema = converter.keySchema(); - Object key = converter.createKey(values, includedColumns); - Schema valueSchema = converter.valueSchema(); - Struct value = converter.inserted(values, includedColumns); - if (value != null || key != null) { - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (converter != null) { + TableId tableId = converter.tableId(); + if (tableFilter.test(tableId)) { + logger.debug("Processing insert row event for {}: {}", tableId, event); + String topic = converter.topic(); + Integer partition = converter.partition(); + List rows = write.getRows(); + for (int row = 0; row != rows.size(); ++row) { + Serializable[] values = rows.get(row); + Schema keySchema = converter.keySchema(); + Object key = converter.createKey(values, includedColumns); + Schema valueSchema = converter.valueSchema(); + Struct value = converter.inserted(values, includedColumns); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } + } else if (logger.isDebugEnabled()) { + logger.debug("Skipping insert row event: {}", event); } - } else if (logger.isDebugEnabled()) { - logger.debug("Skipping insert row event: {}", event); + } else { + logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId); } } @@ -261,27 +281,32 @@ public void handleUpdate(Event event, SourceInfo source, Consumer BitSet includedColumns = update.getIncludedColumns(); BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate(); Converter converter = convertersByTableId.get(tableNumber); - if (tableFilter.test(converter.tableId())) { - logger.debug("Received update row event: {}", event); - String topic = converter.topic(); - Integer partition = converter.partition(); - List> rows = update.getRows(); - for (int row = 0; row != rows.size(); ++row) { - Map.Entry changes = rows.get(row); - Serializable[] before = changes.getKey(); - Serializable[] after = changes.getValue(); - Schema keySchema = converter.keySchema(); - Object key = converter.createKey(after, includedColumns); - Schema valueSchema = converter.valueSchema(); - Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns); - if (value != null || key != null) { - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (converter != null) { + TableId tableId = converter.tableId(); + if (tableFilter.test(tableId)) { + logger.debug("Processing update row event for {}: {}", tableId, event); + String topic = converter.topic(); + Integer partition = converter.partition(); + List> rows = update.getRows(); + for (int row = 0; row != rows.size(); ++row) { + Map.Entry changes = rows.get(row); + Serializable[] before = changes.getKey(); + Serializable[] after = changes.getValue(); + Schema keySchema = converter.keySchema(); + Object key = converter.createKey(after, includedColumns); + Schema valueSchema = converter.valueSchema(); + Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } + } else if (logger.isDebugEnabled()) { + logger.debug("Skipping update row event: {}", event); } - } else if (logger.isDebugEnabled()) { - logger.debug("Skipping update row event: {}", event); + } else { + logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId); } } @@ -290,25 +315,30 @@ public void handleDelete(Event event, SourceInfo source, Consumer long tableNumber = deleted.getTableId(); BitSet includedColumns = deleted.getIncludedColumns(); Converter converter = convertersByTableId.get(tableNumber); - if (tableFilter.test(converter.tableId())) { - logger.debug("Received delete row event: {}", event); - String topic = converter.topic(); - Integer partition = converter.partition(); - List rows = deleted.getRows(); - for (int row = 0; row != rows.size(); ++row) { - Serializable[] values = rows.get(row); - Schema keySchema = converter.keySchema(); - Object key = converter.createKey(values, includedColumns); - Schema valueSchema = converter.valueSchema(); - Struct value = converter.deleted(values, includedColumns); - if (value != null || key != null) { - SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, - keySchema, key, valueSchema, value); - recorder.accept(record); + if (converter != null) { + TableId tableId = converter.tableId(); + if (tableFilter.test(tableId)) { + logger.debug("Processing delete row event for {}: {}", tableId, event); + String topic = converter.topic(); + Integer partition = converter.partition(); + List rows = deleted.getRows(); + for (int row = 0; row != rows.size(); ++row) { + Serializable[] values = rows.get(row); + Schema keySchema = converter.keySchema(); + Object key = converter.createKey(values, includedColumns); + Schema valueSchema = converter.valueSchema(); + Struct value = converter.deleted(values, includedColumns); + if (value != null || key != null) { + SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition, + keySchema, key, valueSchema, value); + recorder.accept(record); + } } + } else if (logger.isDebugEnabled()) { + logger.debug("Skipping delete row event: {}", event); } - } else if (logger.isDebugEnabled()) { - logger.debug("Skipping delete row event: {}", event); + } else { + logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId); } } @@ -327,7 +357,7 @@ protected static interface Converter { Struct inserted(Serializable[] row, BitSet includedColumns); - Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate); + Struct updated(Serializable[] before, BitSet includedColumns, Serializable[] after, BitSet includedColumnsBeforeUpdate); Struct deleted(Serializable[] deleted, BitSet includedColumns); } diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index 31c844a5c..7968a21e0 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -70,6 +70,9 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory { public static Collection ALL_FIELDS = Collect.arrayListOf(TOPIC, BOOTSTRAP_SERVERS, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS); + private static final String CONSUMER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "consumer."; + private static final String PRODUCER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "producer."; + private final DocumentReader reader = DocumentReader.defaultReader(); private final Integer partition = new Integer(0); private String topicName; @@ -92,7 +95,7 @@ public void configure(Configuration config) { String bootstrapServers = config.getString(BOOTSTRAP_SERVERS); // Copy the relevant portions of the configuration and add useful defaults ... String clientAndGroupId = UUID.randomUUID().toString(); - this.consumerConfig = config.subset("consumer.", true).edit() + this.consumerConfig = config.subset(CONSUMER_PREFIX, true).edit() .withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) .withDefault(ConsumerConfig.CLIENT_ID_CONFIG, clientAndGroupId) .withDefault(ConsumerConfig.GROUP_ID_CONFIG, clientAndGroupId) @@ -104,7 +107,7 @@ public void configure(Configuration config) { .withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .build(); - this.producerConfig = config.subset("producer.", true).edit() + this.producerConfig = config.subset(PRODUCER_PREFIX, true).edit() .withDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) .withDefault(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()) .withDefault(ProducerConfig.ACKS_CONFIG, 1) @@ -116,6 +119,8 @@ public void configure(Configuration config) { .withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .build(); + logger.info("KafkaDatabaseHistory Consumer config: " + consumerConfig); + logger.info("KafkaDatabaseHistory Producer config: " + producerConfig); } @Override