DBZ-541 Only whitelisted tables are recorded in history
This commit is contained in:
parent
ea4b366b7f
commit
01577b40c3
@ -844,6 +844,7 @@ public static final Field MASK_COLUMN(int length) {
|
||||
KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
|
||||
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS,
|
||||
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
|
||||
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL,
|
||||
DatabaseHistory.DDL_FILTER);
|
||||
|
||||
protected static ConfigDef configDef() {
|
||||
@ -853,7 +854,8 @@ protected static ConfigDef configDef() {
|
||||
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
|
||||
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
|
||||
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY,
|
||||
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, DatabaseHistory.DDL_FILTER);
|
||||
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, DatabaseHistory.DDL_FILTER,
|
||||
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
|
||||
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
|
||||
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
|
||||
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER,
|
||||
|
@ -74,6 +74,8 @@ public class MySqlSchema {
|
||||
private Tables tables;
|
||||
private final boolean skipUnparseableDDL;
|
||||
private final boolean tableIdCaseInsensitive;
|
||||
private final boolean storeOnlyMonitoredTablesDdl;
|
||||
|
||||
/**
|
||||
* Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}.
|
||||
*
|
||||
@ -136,8 +138,8 @@ protected boolean isPositionAtOrBefore(Document recorded, Document desired) {
|
||||
this.dbHistory.configure(dbHistoryConfig, historyComparator); // validates
|
||||
|
||||
this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS);
|
||||
|
||||
tableSchemaByTableId = new SchemasByTableId(tableIdCaseInsensitive);
|
||||
this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
|
||||
}
|
||||
|
||||
protected HistoryRecordComparator historyComparator() {
|
||||
@ -293,6 +295,7 @@ protected void refreshSchemas() {
|
||||
*/
|
||||
public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatements,
|
||||
DatabaseStatementStringConsumer statementConsumer) {
|
||||
Set<TableId> changes;
|
||||
if (ignoredQueryStatements.contains(ddlStatements)) return false;
|
||||
try {
|
||||
this.ddlChanges.reset();
|
||||
@ -305,6 +308,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
changes = tables.drainChanges();
|
||||
// No need to send schema events or store DDL if no table has changed
|
||||
// Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here
|
||||
// (which writes to the public schema change topic); if required, a second option could be added
|
||||
// for controlling this, too
|
||||
if (!storeOnlyMonitoredTablesDdl || !changes.isEmpty()) {
|
||||
if (statementConsumer != null) {
|
||||
|
||||
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
|
||||
@ -335,15 +344,19 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
|
||||
// schema change records so that failure recovery (which is based on of the history) won't lose
|
||||
// schema change records.
|
||||
try {
|
||||
if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) {
|
||||
dbHistory.record(source.partition(), source.offset(), databaseName, tables, ddlStatements);
|
||||
} else {
|
||||
logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
throw new ConnectException(
|
||||
"Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Figure out what changed ...
|
||||
Set<TableId> changes = tables.drainChanges();
|
||||
changes.forEach(tableId -> {
|
||||
Table table = tables.forTable(tableId);
|
||||
if (table == null) { // removed
|
||||
|
@ -45,6 +45,16 @@ public interface DatabaseHistory {
|
||||
+ "which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.")
|
||||
.withDefault(false);
|
||||
|
||||
public static final Field STORE_ONLY_MONITORED_TABLES_DDL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "store.only.monitored.tables.ddl")
|
||||
.withDisplayName("Store only DDL that modifies whitelisted/not-blacklisted tables")
|
||||
.withType(Type.BOOLEAN)
|
||||
.withWidth(Width.SHORT)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("Controls what DDL will Debezium store in database history."
|
||||
+ "By default (false) Debezium will store all incoming DDL statements. If set to true"
|
||||
+ "then only DDL that manipulates a monitored table will be stored.")
|
||||
.withDefault(false);
|
||||
|
||||
public static final Field DDL_FILTER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
|
||||
.withDisplayName("DDL filter")
|
||||
.withType(Type.STRING)
|
||||
|
Loading…
Reference in New Issue
Block a user