Merge pull request #29 from rhauch/dbz-28

DBZ-28 Corrected MySQL connector's behavior for representing deletes
This commit is contained in:
Randall Hauch 2016-03-04 10:37:36 -06:00
commit 5c8e68d6d2

View File

@ -59,7 +59,7 @@ final class TableConverters {
private final Map<String, Long> tableNumbersByTableName = new HashMap<>();
private final boolean recordSchemaChangesInSourceRecords;
private final Predicate<TableId> tableFilter;
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN","END","FLUSH PRIVILEGES");
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final Set<TableId> unknownTableIds = new HashSet<>();
public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
@ -76,10 +76,10 @@ public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
Predicate<TableId> knownTables = (id) -> !unknownTableIds.contains(id); // known if not unknown
this.tableFilter = tableFilter != null ? tableFilter.and(knownTables) : knownTables;
}
public void loadTables() {
// Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(id->{
this.tables.tableIds().forEach(id -> {
Table table = this.tables.forTable(id);
TableSchema schema = schemaBuilder.create(table, false);
tableSchemaByTableId.put(id, schema);
@ -90,7 +90,7 @@ public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRe
QueryEventData command = event.getData();
String databaseName = command.getDatabase();
String ddlStatements = command.getSql();
if ( ignoredQueryStatements.contains(ddlStatements) ) return;
if (ignoredQueryStatements.contains(ddlStatements)) return;
logger.debug("Received update table command: {}", event);
try {
this.ddlParser.setCurrentSchema(databaseName);
@ -237,9 +237,11 @@ public void handleInsert(Event event, SourceInfo source, Consumer<SourceRecord>
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping insert row event: {}", event);
@ -263,7 +265,7 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
logger.debug("Received update row event: {}", event);
String topic = converter.topic();
Integer partition = converter.partition();
List<Entry<Serializable[],Serializable[]>> rows = update.getRows();
List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
for (int row = 0; row != rows.size(); ++row) {
Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
Serializable[] before = changes.getKey();
@ -272,9 +274,11 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
Object key = converter.createKey(after, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping update row event: {}", event);
@ -296,10 +300,12 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
Schema keySchema = converter.keySchema();
Object key = converter.createKey(values, includedColumns);
Schema valueSchema = converter.valueSchema();
Struct value = converter.inserted(values, includedColumns);
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
Struct value = converter.deleted(values, includedColumns);
if (value != null || key != null) {
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
keySchema, key, valueSchema, value);
recorder.accept(record);
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Skipping delete row event: {}", event);