DBZ-683 After review changes

This commit is contained in:
Jiri Pechanec 2019-05-21 13:53:17 +02:00 committed by Gunnar Morling
parent 540196091c
commit c1c4d3d149
6 changed files with 26 additions and 14 deletions

View File

@ -312,9 +312,6 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
} finally {
changes = tables().drainChanges();
// No need to send schema events or store DDL if no table has changed
// Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here
// (which writes to the public schema change topic); if required, a second option could be added
// for controlling this, too
if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) {
if (statementConsumer != null) {

View File

@ -302,10 +302,12 @@ protected void execute() {
mysql.query(sql.get(), rs -> {
while (rs.next() && isRunning()) {
TableId id = new TableId(dbName, null, rs.getString(1));
if ((createTableFilters == filters && shouldRecordTableSchema(schema, filters, id)) || createTableFilters.tableFilter().test(id)) {
final boolean shouldRecordTableSchema = shouldRecordTableSchema(schema, filters, id);
// Apply only when the whitelist table list is not dynamically reconfigured
if ((createTableFilters == filters && shouldRecordTableSchema) || createTableFilters.tableFilter().test(id)) {
createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
}
if (shouldRecordTableSchema(schema, filters, id)) {
if (shouldRecordTableSchema) {
tableIds.add(id);
logger.info("\t including '{}' for further processing", id);
} else {

View File

@ -964,8 +964,11 @@ public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLE
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
// Four tables in database, only two are whitelisted
SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 3 + 2);
// Two databases
// SET + USE + DROP DB + CREATE DB + 4 tables (2 whitelisted) (DROP + CREATE) TABLE
// USE + DROP DB + CREATE DB + (DROP + CREATE) TABLE
SourceRecords records = consumeRecordsByTopic(1 + 1 + 2 + 2 * 4 + 1 + 2 + 2);
// Records for one of the databases only
assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4);
stopConnector();
}

View File

@ -73,7 +73,7 @@ public void shouldNotDuplicateEventsAfterRestart() throws Exception {
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(7);
SourceRecords records = consumeRecordsByTopic(15);
assertThat(records.recordsForTopic(DATABASE.topicForTable("restart_table")).size()).isEqualTo(1);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {

View File

@ -167,10 +167,10 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(1 + 5 + 4); // 5 DDL changes
SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 4); // SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 4 data
assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(4);
assertThat(records.topics().size()).isEqualTo(1 + 1);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(5);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(11);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
@ -196,10 +196,11 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti
}
purgeDatabaseLogs();
start(MySqlConnector.class, config);
records = consumeRecordsByTopic(1 + 5 + 1 + 8); // 5 DDL changes, DROP TABLE is sent twice
// SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 1 additional DROP whitelisted table + 8 data
records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 1 + 8);
assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(8);
assertThat(records.topics().size()).isEqualTo(1 + 1);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(12);
stopConnector();
try (MySQLConnection db = MySQLConnection.forTestDatabase(database.getDatabaseName())) {
@ -216,10 +217,11 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti
);
}
start(MySqlConnector.class, config);
records = consumeRecordsByTopic(1 + 5 + 1 + 12); // 5 DDL changes, DROP TABLE is sent twice
// SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 1 additional DROP whitelisted table + 8 data
records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 1 + 12);
assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(12);
assertThat(records.topics().size()).isEqualTo(1 + 1);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(12);
stopConnector();
}
}

View File

@ -167,6 +167,14 @@ public static interface DatabaseStatementStringConsumer {
void consume(String databaseName, String ddlStatements);
}
/**
* @return true if any event stored is one of
* <ul>
* <li>database-wide events and affects whitelisted database</li>
* <li>table related events and the table is whitelisted</li>
* <li>events that set a variable and either affects whitelisted database or is a system-wide variable</li>
* <ul>
*/
public boolean anyMatch(Predicate<String> databaseFilter, Predicate<TableId> tableFilter) {
return events.stream().anyMatch(event ->
(event instanceof DatabaseEvent) && databaseFilter.test(((DatabaseEvent) event).databaseName())