diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 442d94661..6ff2ffe80 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -312,9 +312,6 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem } finally { changes = tables().drainChanges(); // No need to send schema events or store DDL if no table has changed - // Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here - // (which writes to the public schema change topic); if required, a second option could be added - // for controlling this, too if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) { if (statementConsumer != null) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 8ffb161f4..c66e116e0 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -302,10 +302,12 @@ protected void execute() { mysql.query(sql.get(), rs -> { while (rs.next() && isRunning()) { TableId id = new TableId(dbName, null, rs.getString(1)); - if ((createTableFilters == filters && shouldRecordTableSchema(schema, filters, id)) || createTableFilters.tableFilter().test(id)) { + final boolean shouldRecordTableSchema = shouldRecordTableSchema(schema, filters, id); + // Apply only when the whitelist table list is not dynamically reconfigured + if ((createTableFilters == filters && shouldRecordTableSchema) || createTableFilters.tableFilter().test(id)) { createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); } - if (shouldRecordTableSchema(schema, filters, id)) { + if (shouldRecordTableSchema) { tableIds.add(id); logger.info("\t including '{}' for further processing", id); } else { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 76bc53459..8218b865a 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -964,8 +964,11 @@ public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLE // Consume the first records due to startup and initialization of the database ... // Testing.Print.enable(); - // Four tables in database, only two are whitelisted - SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 3 + 2); + // Two databases + // SET + USE + DROP DB + CREATE DB + 4 tables (2 whitelisted) (DROP + CREATE) TABLE + // USE + DROP DB + CREATE DB + (DROP + CREATE) TABLE + SourceRecords records = consumeRecordsByTopic(1 + 1 + 2 + 2 * 4 + 1 + 2 + 2); + // Records for one of the databases only assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4); stopConnector(); } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlRestartIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlRestartIT.java index cfeee748e..aed62ff53 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlRestartIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlRestartIT.java @@ -73,7 +73,7 @@ public void shouldNotDuplicateEventsAfterRestart() throws Exception { // Testing.Print.enable(); - SourceRecords records = consumeRecordsByTopic(7); + SourceRecords records = consumeRecordsByTopic(15); assertThat(records.recordsForTopic(DATABASE.topicForTable("restart_table")).size()).isEqualTo(1); try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ZZZGtidSetIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ZZZGtidSetIT.java index af8fe3912..643ba133d 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ZZZGtidSetIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ZZZGtidSetIT.java @@ -167,10 +167,10 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti // Consume the first records due to startup and initialization of the database ... // Testing.Print.enable(); - SourceRecords records = consumeRecordsByTopic(1 + 5 + 4); // 5 DDL changes + SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 4); // SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 4 data assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(4); assertThat(records.topics().size()).isEqualTo(1 + 1); - assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(5); + assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(11); // Check that all records are valid, can be serialized and deserialized ... records.forEach(this::validate); @@ -196,10 +196,11 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti } purgeDatabaseLogs(); start(MySqlConnector.class, config); - records = consumeRecordsByTopic(1 + 5 + 1 + 8); // 5 DDL changes, DROP TABLE is sent twice + // SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 1 additional DROP whitelisted table + 8 data + records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 1 + 8); assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(8); assertThat(records.topics().size()).isEqualTo(1 + 1); - assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(6); + assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(12); stopConnector(); try (MySQLConnection db = MySQLConnection.forTestDatabase(database.getDatabaseName())) { @@ -216,10 +217,11 @@ public void shouldProcessPurgedLogsWhenDownAndSnapshotNeeded() throws SQLExcepti ); } start(MySqlConnector.class, config); - records = consumeRecordsByTopic(1 + 5 + 1 + 12); // 5 DDL changes, DROP TABLE is sent twice + // SET + DROP/CREATE/USE DB + DROP/CREATE 4 tables + 1 additional DROP whitelisted table + 8 data + records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 1 + 12); assertThat(records.recordsForTopic(database.topicForTable("customers")).size()).isEqualTo(12); assertThat(records.topics().size()).isEqualTo(1 + 1); - assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(6); + assertThat(records.ddlRecordsForDatabase(database.getDatabaseName()).size()).isEqualTo(12); stopConnector(); } } diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java index bc9ba66b4..cc00598fc 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java @@ -167,6 +167,14 @@ public static interface DatabaseStatementStringConsumer { void consume(String databaseName, String ddlStatements); } + /** + * @return true if any event stored is one of + *