DBZ-599 Emit schema change only for DDL with no errors

This commit is contained in:
Jiri Pechanec 2019-07-10 08:38:08 +02:00
parent 0383f9178c
commit c9997a5fd6

View File

@ -310,51 +310,50 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
} else { } else {
throw e; throw e;
} }
} finally { }
changes = tables().drainChanges(); changes = tables().drainChanges();
// No need to send schema events or store DDL if no table has changed // No need to send schema events or store DDL if no table has changed
if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) { if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) {
if (statementConsumer != null) { if (statementConsumer != null) {
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered // We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
// by database. Unfortunately, the databaseName on the event might not be the same database as that // by database. Unfortunately, the databaseName on the event might not be the same database as that
// being modified by the DDL statements (since the DDL statements can have fully-qualified names). // being modified by the DDL statements (since the DDL statements can have fully-qualified names).
// Therefore, we have to look at each statement to figure out which database it applies and then // Therefore, we have to look at each statement to figure out which database it applies and then
// record the DDL statements (still in the same order) to those databases. // record the DDL statements (still in the same order) to those databases.
if (!ddlChanges.isEmpty() && ddlChanges.applyToMoreDatabasesThan(databaseName)) { if (!ddlChanges.isEmpty() && ddlChanges.applyToMoreDatabasesThan(databaseName)) {
// We understood at least some of the DDL statements and can figure out to which database they apply. // We understood at least some of the DDL statements and can figure out to which database they apply.
// They also apply to more databases than 'databaseName', so we need to apply the DDL statements in // They also apply to more databases than 'databaseName', so we need to apply the DDL statements in
// the same order they were read for each _affected_ database, grouped together if multiple apply // the same order they were read for each _affected_ database, grouped together if multiple apply
// to the same _affected_ database... // to the same _affected_ database...
ddlChanges.groupStatementStringsByDatabase((dbName, tables, ddl) -> { ddlChanges.groupStatementStringsByDatabase((dbName, tables, ddl) -> {
if (filters.databaseFilter().test(dbName) || dbName == null || "".equals(dbName)) { if (filters.databaseFilter().test(dbName) || dbName == null || "".equals(dbName)) {
if (dbName == null) { if (dbName == null) {
dbName = ""; dbName = "";
}
statementConsumer.consume(dbName, tables, ddl);
} }
}); statementConsumer.consume(dbName, tables, ddl);
} else if (filters.databaseFilter().test(databaseName) || databaseName == null || "".equals(databaseName)) {
if (databaseName == null) {
databaseName = "";
} }
statementConsumer.consume(databaseName, changes, ddlStatements); });
} else if (filters.databaseFilter().test(databaseName) || databaseName == null || "".equals(databaseName)) {
if (databaseName == null) {
databaseName = "";
} }
statementConsumer.consume(databaseName, changes, ddlStatements);
} }
}
// Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the // Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the
// schema change records so that failure recovery (which is based on of the history) won't lose // schema change records so that failure recovery (which is based on of the history) won't lose
// schema change records. // schema change records.
if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) { if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) {
dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements); dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements);
}
}
else {
logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements);
} }
} }
else {
logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements);
}
// Figure out what changed ... // Figure out what changed ...
changes.forEach(tableId -> { changes.forEach(tableId -> {
@ -369,7 +368,6 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
return true; return true;
} }
/** /**
* @return true if only monitored tables should be stored in database history, false if all tables should be stored * @return true if only monitored tables should be stored in database history, false if all tables should be stored
*/ */