DBZ-15 Cached converters are now dropped upon log rotation.

This commit is contained in:
Randall Hauch 2016-03-17 11:02:08 -05:00
parent 91d200df51
commit 5a002dbf62
3 changed files with 97 additions and 61 deletions

View File

@ -150,10 +150,10 @@ public void start(Map<String, String> props) {
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
Predicate<TableId> ignoreBuiltins = (id) -> {
return !BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase()) && !BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase());
Predicate<TableId> isBuiltin = (id) -> {
return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
};
tableFilter = ignoreBuiltins.or(tableFilter);
tableFilter = tableFilter.and(isBuiltin.negate());
}
// Create the queue ...
@ -163,6 +163,7 @@ public void start(Map<String, String> props) {
// Set up our handlers for specific kinds of events ...
tables = new Tables();
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);

View File

@ -26,6 +26,7 @@
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
@ -86,6 +87,17 @@ public void loadTables() {
});
}
public void rotateLogs(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
logger.debug("Rotating logs: {}", event);
RotateEventData command = event.getData();
if (command != null) {
// The logs are being rotated, which means the server was either restarted, or the binlog has transitioned to a new
// file. In either case, the table numbers will change, so we need to discard the cache of converters by the table IDs
// (e.g., the Map<Long,Converter>). Note, however, that we're NOT clearing out the Map<TableId,TableSchema>.
convertersByTableId.clear();
}
}
public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
QueryEventData command = event.getData();
String databaseName = command.getDatabase();
@ -142,8 +154,8 @@ public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRe
public void updateTableMetadata(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
TableMapEventData metadata = event.getData();
long tableNumber = metadata.getTableId();
logger.debug("Received update table metadata event: {}", event);
if (!convertersByTableId.containsKey(tableNumber)) {
logger.debug("Received update table metadata event: {}", event);
// We haven't seen this table ID, so we need to rebuild our converter functions ...
String serverName = source.serverName();
String databaseName = metadata.getDatabase();
@ -153,6 +165,7 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer<SourceR
// Just get the current schema, which should be up-to-date ...
TableId tableId = new TableId(databaseName, null, tableName);
TableSchema tableSchema = tableSchemaByTableId.get(tableId);
logger.debug("Registering metadata for table {} with table #{}", tableId, tableNumber);
if (tableSchema == null) {
// We are seeing an event for a row that's in a table we don't know about, meaning the table
// was created before the binlog was enabled (or before the point we started reading it).
@ -201,7 +214,7 @@ public Struct inserted(Serializable[] row, BitSet includedColumns) {
}
@Override
public Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before,
public Struct updated(Serializable[] before, BitSet includedColumns, Serializable[] after,
BitSet includedColumnsBeforeUpdate) {
// assume all columns in the table are included, and we'll write out only the after state ...
return tableSchema.valueFromColumnData(after);
@ -218,6 +231,8 @@ public Struct deleted(Serializable[] deleted, BitSet includedColumns) {
if (previousTableNumber != null) {
convertersByTableId.remove(previousTableNumber);
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update table metadata event: {}", event);
}
}
@ -226,25 +241,30 @@ public void handleInsert(Event event, SourceInfo source, Consumer<SourceRecord>
long tableNumber = write.getTableId();
BitSet includedColumns = write.getIncludedColumns();
Converter converter = convertersByTableId.get(tableNumber);
if (tableFilter.test(converter.tableId())) {
logger.debug("Received insert row event: {}", event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Serializable[]> rows = write.getRows();
for (int row = 0; row != rows.size(); ++row) {
Serializable[] values = rows.get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (converter != null) {
TableId tableId = converter.tableId();
if (tableFilter.test(tableId)) {
logger.debug("Processing insert row event for {}: {}", tableId, event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Serializable[]> rows = write.getRows();
for (int row = 0; row != rows.size(); ++row) {
Serializable[] values = rows.get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping insert row event: {}", event);
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping insert row event: {}", event);
} else {
logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId);
}
}
@ -261,27 +281,32 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
BitSet includedColumns = update.getIncludedColumns();
BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
Converter converter = convertersByTableId.get(tableNumber);
if (tableFilter.test(converter.tableId())) {
logger.debug("Received update row event: {}", event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
for (int row = 0; row != rows.size(); ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
Schema keySchema = converter.keySchema();
Object key = converter.createKey(after, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (converter != null) {
TableId tableId = converter.tableId();
if (tableFilter.test(tableId)) {
logger.debug("Processing update row event for {}: {}", tableId, event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
for (int row = 0; row != rows.size(); ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
Serializable[] after = changes.getValue();
Schema keySchema = converter.keySchema();
Object key = converter.createKey(after, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update row event: {}", event);
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update row event: {}", event);
} else {
logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId);
}
}
@ -290,25 +315,30 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
long tableNumber = deleted.getTableId();
BitSet includedColumns = deleted.getIncludedColumns();
Converter converter = convertersByTableId.get(tableNumber);
if (tableFilter.test(converter.tableId())) {
logger.debug("Received delete row event: {}", event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Serializable[]> rows = deleted.getRows();
for (int row = 0; row != rows.size(); ++row) {
Serializable[] values = rows.get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.deleted(values, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (converter != null) {
TableId tableId = converter.tableId();
if (tableFilter.test(tableId)) {
logger.debug("Processing delete row event for {}: {}", tableId, event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Serializable[]> rows = deleted.getRows();
for (int row = 0; row != rows.size(); ++row) {
Serializable[] values = rows.get(row);
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.deleted(values, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping delete row event: {}", event);
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping delete row event: {}", event);
} else {
logger.warn("Unable to find converter for table #{} in {}", tableNumber, convertersByTableId);
}
}
@ -327,7 +357,7 @@ protected static interface Converter {
Struct inserted(Serializable[] row, BitSet includedColumns);
Struct updated(Serializable[] after, BitSet includedColumns, Serializable[] before, BitSet includedColumnsBeforeUpdate);
Struct updated(Serializable[] before, BitSet includedColumns, Serializable[] after, BitSet includedColumnsBeforeUpdate);
Struct deleted(Serializable[] deleted, BitSet includedColumns);
}

View File

@ -70,6 +70,9 @@ public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(TOPIC, BOOTSTRAP_SERVERS,
RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS);
private static final String CONSUMER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "consumer.";
private static final String PRODUCER_PREFIX = CONFIGURATION_FIELD_PREFIX_STRING + "producer.";
private final DocumentReader reader = DocumentReader.defaultReader();
private final Integer partition = new Integer(0);
private String topicName;
@ -92,7 +95,7 @@ public void configure(Configuration config) {
String bootstrapServers = config.getString(BOOTSTRAP_SERVERS);
// Copy the relevant portions of the configuration and add useful defaults ...
String clientAndGroupId = UUID.randomUUID().toString();
this.consumerConfig = config.subset("consumer.", true).edit()
this.consumerConfig = config.subset(CONSUMER_PREFIX, true).edit()
.withDefault(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.withDefault(ConsumerConfig.CLIENT_ID_CONFIG, clientAndGroupId)
.withDefault(ConsumerConfig.GROUP_ID_CONFIG, clientAndGroupId)
@ -104,7 +107,7 @@ public void configure(Configuration config) {
.withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build();
this.producerConfig = config.subset("producer.", true).edit()
this.producerConfig = config.subset(PRODUCER_PREFIX, true).edit()
.withDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
.withDefault(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString())
.withDefault(ProducerConfig.ACKS_CONFIG, 1)
@ -116,6 +119,8 @@ public void configure(Configuration config) {
.withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.build();
logger.info("KafkaDatabaseHistory Consumer config: " + consumerConfig);
logger.info("KafkaDatabaseHistory Producer config: " + producerConfig);
}
@Override