diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 58a25abc2..d5e737a0a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -844,6 +844,7 @@ public static final Field MASK_COLUMN(int length) { KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, + DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, DatabaseHistory.DDL_FILTER); protected static ConfigDef configDef() { @@ -853,7 +854,8 @@ protected static ConfigDef configDef() { Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS, KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS, KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY, - DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, DatabaseHistory.DDL_FILTER); + DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, DatabaseHistory.DDL_FILTER, + DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL); Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST, COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST, GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER, diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 778e2df3b..a37b00998 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -74,6 +74,8 @@ public class MySqlSchema { private Tables tables; private final boolean skipUnparseableDDL; private final boolean tableIdCaseInsensitive; + private final boolean storeOnlyMonitoredTablesDdl; + /** * Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}. * @@ -136,8 +138,8 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) { this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); - tableSchemaByTableId = new SchemasByTableId(tableIdCaseInsensitive); + this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL); } protected HistoryRecordComparator historyComparator() { @@ -254,13 +256,13 @@ public void loadHistory(SourceInfo startingPoint) { dbHistory.recover(startingPoint.partition(), startingPoint.offset(), tables, ddlParser); refreshSchemas(); } - + /** * Return true if the database history entity exists */ public boolean historyExists() { return dbHistory.exists(); - } + } /** * Discard any currently-cached schemas and rebuild them using the filters. @@ -293,6 +295,7 @@ protected void refreshSchemas() { */ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatements, DatabaseStatementStringConsumer statementConsumer) { + Set changes; if (ignoredQueryStatements.contains(ddlStatements)) return false; try { this.ddlChanges.reset(); @@ -305,45 +308,55 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem throw e; } } finally { - if (statementConsumer != null) { + changes = tables.drainChanges(); + // No need to send schema events or store DDL if no table has changed + // Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here + // (which writes to the public schema change topic); if required, a second option could be added + // for controlling this, too + if (!storeOnlyMonitoredTablesDdl || !changes.isEmpty()) { + if (statementConsumer != null) { - // We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered - // by database. Unfortunately, the databaseName on the event might not be the same database as that - // being modified by the DDL statements (since the DDL statements can have fully-qualified names). - // Therefore, we have to look at each statement to figure out which database it applies and then - // record the DDL statements (still in the same order) to those databases. + // We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered + // by database. Unfortunately, the databaseName on the event might not be the same database as that + // being modified by the DDL statements (since the DDL statements can have fully-qualified names). + // Therefore, we have to look at each statement to figure out which database it applies and then + // record the DDL statements (still in the same order) to those databases. - if (!ddlChanges.isEmpty() && ddlChanges.applyToMoreDatabasesThan(databaseName)) { + if (!ddlChanges.isEmpty() && ddlChanges.applyToMoreDatabasesThan(databaseName)) { - // We understood at least some of the DDL statements and can figure out to which database they apply. - // They also apply to more databases than 'databaseName', so we need to apply the DDL statements in - // the same order they were read for each _affected_ database, grouped together if multiple apply - // to the same _affected_ database... - ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> { - if (filters.databaseFilter().test(dbName) || dbName == null || "".equals(dbName)) { - if (dbName == null) dbName = ""; - statementConsumer.consume(dbName, ddlStatements); - } - }); - } else if (filters.databaseFilter().test(databaseName) || databaseName == null || "".equals(databaseName)) { - if (databaseName == null) databaseName = ""; - statementConsumer.consume(databaseName, ddlStatements); + // We understood at least some of the DDL statements and can figure out to which database they apply. + // They also apply to more databases than 'databaseName', so we need to apply the DDL statements in + // the same order they were read for each _affected_ database, grouped together if multiple apply + // to the same _affected_ database... + ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> { + if (filters.databaseFilter().test(dbName) || dbName == null || "".equals(dbName)) { + if (dbName == null) dbName = ""; + statementConsumer.consume(dbName, ddlStatements); + } + }); + } else if (filters.databaseFilter().test(databaseName) || databaseName == null || "".equals(databaseName)) { + if (databaseName == null) databaseName = ""; + statementConsumer.consume(databaseName, ddlStatements); + } } - } - // Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the - // schema change records so that failure recovery (which is based on of the history) won't lose - // schema change records. - try { - dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements); - } catch (Throwable e) { - throw new ConnectException( - "Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e); + // Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the + // schema change records so that failure recovery (which is based on of the history) won't lose + // schema change records. + try { + if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) { + dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements); + } else { + logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements); + } + } catch (Throwable e) { + throw new ConnectException( + "Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e); + } } } // Figure out what changed ... - Set changes = tables.drainChanges(); changes.forEach(tableId -> { Table table = tables.forTable(tableId); if (table == null) { // removed diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java index 4ce79a0c2..5ba16f530 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java @@ -45,6 +45,16 @@ public interface DatabaseHistory { + "which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.") .withDefault(false); + public static final Field STORE_ONLY_MONITORED_TABLES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.monitored.tables.ddl") + .withDisplayName("Store only DDL that modifies whitelisted/not-blacklisted tables") + .withType(Type.BOOLEAN) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withDescription("Controls what DDL will Debezium store in database history." + + "By default (false) Debezium will store all incoming DDL statements. If set to true" + + "then only DDL that manipulates a monitored table will be stored.") + .withDefault(false); + public static final Field DDL_FILTER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter") .withDisplayName("DDL filter") .withType(Type.STRING)