diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java index ea9f1b247..e4f0cdea8 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Filters.java @@ -16,8 +16,6 @@ import io.debezium.relational.ColumnId; import io.debezium.relational.Selectors; import io.debezium.relational.TableId; -import io.debezium.relational.Tables; -import io.debezium.relational.Tables.TableNameFilter; import io.debezium.util.Collect; /** @@ -108,10 +106,6 @@ public Predicate tableFilter() { return tableFilter; } - public TableNameFilter tableNameFilter() { - return Tables.filterFor(tableFilter); - } - public Predicate builtInTableFilter() { return isBuiltInTable; } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index ff4b5505a..af0893d49 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -30,6 +30,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.TableSchemaBuilder; import io.debezium.relational.Tables; +import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer; import io.debezium.relational.ddl.DdlParser; @@ -85,7 +86,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, Predicate gtidFil super( configuration, topicSelector, - new Filters(configuration.getConfig()).tableFilter(), + TableFilter.fromPredicate(new Filters(configuration.getConfig()).tableFilter()), new Filters(configuration.getConfig()).columnFilter(), new TableSchemaBuilder( getValueConverters(configuration.getConfig()), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Filters.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Filters.java index a1815090d..6745e5728 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Filters.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/Filters.java @@ -15,29 +15,29 @@ import io.debezium.relational.ColumnId; import io.debezium.relational.Selectors; import io.debezium.relational.TableId; -import io.debezium.relational.Tables; +import io.debezium.relational.Tables.TableFilter; /** * A utility that is contains various filters for acceptable {@link TableId}s and columns. - * + * * @author Horia Chiorean */ @Immutable public class Filters { - + protected static final List SYSTEM_SCHEMAS = Arrays.asList("pg_catalog", "information_schema"); protected static final String SYSTEM_SCHEMA_BLACKLIST = SYSTEM_SCHEMAS.stream().collect(Collectors.joining(",")); protected static final Predicate IS_SYSTEM_SCHEMA = SYSTEM_SCHEMAS::contains; - protected static final String TEMP_TABLE_BLACKLIST = ".*\\.pg_temp.*"; - - private final Predicate tableFilter; + protected static final String TEMP_TABLE_BLACKLIST = ".*\\.pg_temp.*"; + + private final TableFilter tableFilter; private final Predicate columnFilter; /** * @param config the configuration; may not be null */ public Filters(PostgresConnectorConfig config) { - + // we always want to exclude PG system schemas as they are never part of logical decoding events String schemaBlacklist = config.schemaBlacklist(); if (schemaBlacklist != null) { @@ -45,36 +45,32 @@ public Filters(PostgresConnectorConfig config) { } else { schemaBlacklist = SYSTEM_SCHEMA_BLACKLIST; } - + String tableBlacklist = config.tableBlacklist(); if (tableBlacklist != null) { - tableBlacklist = tableBlacklist + "," + TEMP_TABLE_BLACKLIST; + tableBlacklist = tableBlacklist + "," + TEMP_TABLE_BLACKLIST; } else { tableBlacklist = TEMP_TABLE_BLACKLIST; } - + // Define the filter using the whitelists and blacklists for table names ... - this.tableFilter = Selectors.tableSelector() + this.tableFilter = TableFilter.fromPredicate(Selectors.tableSelector() .includeTables(config.tableWhitelist()) .excludeTables(tableBlacklist) .includeSchemas(config.schemaWhitelist()) .excludeSchemas(schemaBlacklist) - .build(); + .build()); + - // Define the filter that excludes blacklisted columns, truncated columns, and masked columns ... this.columnFilter = Selectors.excludeColumns(config.columnBlacklist()); } - - protected Predicate tableFilter() { + + protected TableFilter tableFilter() { return tableFilter; } - + protected Predicate columnFilter() { return columnFilter; - } - - protected Tables.TableNameFilter tableNameFilter() { - return Tables.filterFor(tableFilter); - } + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java index 1c35164fe..617d7d640 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchema.java @@ -9,7 +9,6 @@ import java.sql.SQLException; import java.time.ZoneOffset; import java.util.Map; -import java.util.function.Predicate; import org.apache.kafka.connect.data.Schema; import org.slf4j.Logger; @@ -80,7 +79,7 @@ protected PostgresSchema refresh(PostgresConnection connection, boolean printRep } // read all the information from the DB - connection.readSchema(tables(), null, null, filters.tableNameFilter(), null, true); + connection.readSchema(tables(), null, null, filters.tableFilter(), null, true); if (printReplicaIdentityInfo) { // print out all the replica identity info tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId)); @@ -108,8 +107,7 @@ private void printReplicaIdentityInfo(PostgresConnection connection, TableId tab */ protected void refresh(PostgresConnection connection, TableId tableId) throws SQLException { Tables temp = new Tables(); - Tables.TableNameFilter tableNameFilter = Tables.filterFor(Predicate.isEqual(tableId)); - connection.readSchema(temp, null, null, tableNameFilter, null, true); + connection.readSchema(temp, null, null, tableId::equals, null, true); // we expect the refreshed table to be there assert temp.size() == 1; @@ -132,7 +130,7 @@ protected void refresh(Table table) { } protected boolean isFilteredOut(TableId id) { - return !filters.tableFilter().test(id); + return !filters.tableFilter().isIncluded(id); } /** diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java index fcec1f246..6cfcf1d7a 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java @@ -42,7 +42,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.Tables.ColumnNameFilter; -import io.debezium.relational.Tables.TableNameFilter; +import io.debezium.relational.Tables.TableFilter; import io.debezium.util.Collect; import io.debezium.util.Strings; @@ -803,7 +803,7 @@ protected int resolveNativeType(String typeName) { * @throws SQLException if an error occurs while accessing the database metadata */ public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern, - TableNameFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) + TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc) throws SQLException { // Before we make any changes, get the copy of the set of table IDs ... Set tableIdsBefore = new HashSet<>(tables.tableIds()); @@ -832,7 +832,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP if (viewIds.contains(tableId)) { continue; } - if (tableFilter == null || tableFilter.matches(catalogName, schemaName, tableName)) { + if (tableFilter == null || tableFilter.isIncluded(tableId)) { List cols = columnsByTable.computeIfAbsent(tableId, name -> new ArrayList<>()); String columnName = rs.getString(4); if (columnFilter == null || columnFilter.matches(catalogName, schemaName, tableName, columnName)) { diff --git a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java index 4c7ad74af..be53ab00a 100644 --- a/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/HistorizedRelationalDatabaseSchema.java @@ -8,6 +8,7 @@ import java.util.function.Predicate; import io.debezium.config.CommonConnectorConfig; +import io.debezium.relational.Tables.TableFilter; import io.debezium.schema.HistorizedDatabaseSchema; import io.debezium.schema.TopicSelector; @@ -15,7 +16,7 @@ public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatab implements HistorizedDatabaseSchema { protected HistorizedRelationalDatabaseSchema(CommonConnectorConfig config, TopicSelector topicSelector, - Predicate tableFilter, Predicate columnFilter, TableSchemaBuilder schemaBuilder, + TableFilter tableFilter, Predicate columnFilter, TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) { super(config, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive); } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java index a6c382d0d..368759854 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java @@ -5,8 +5,6 @@ */ package io.debezium.relational; -import java.util.function.Predicate; - import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; @@ -15,6 +13,7 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; +import io.debezium.relational.Tables.TableFilter; /** * Configuration options shared across the relational CDC connectors. @@ -61,7 +60,7 @@ public class RelationalDatabaseConnectorConfig extends CommonConnectorConfig { private final RelationalTableFilters tableFilters; - protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, Predicate systemTablesFilter) { + protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter) { super(config, logicalName); this.tableFilters = new RelationalTableFilters(config, systemTablesFilter); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java index b10c70155..0926073a1 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseSchema.java @@ -13,6 +13,7 @@ import org.apache.kafka.connect.data.Schema; import io.debezium.config.CommonConnectorConfig; +import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.mapping.ColumnMappers; import io.debezium.schema.DatabaseSchema; import io.debezium.schema.TopicSelector; @@ -26,7 +27,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema topicSelector; private final TableSchemaBuilder schemaBuilder; - private final Predicate tableFilter; + private final TableFilter tableFilter; private final Predicate columnFilter; private final ColumnMappers columnMappers; @@ -35,7 +36,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema topicSelector, - Predicate tableFilter, Predicate columnFilter, TableSchemaBuilder schemaBuilder, + TableFilter tableFilter, Predicate columnFilter, TableSchemaBuilder schemaBuilder, boolean tableIdCaseInsensitive) { this.topicSelector = topicSelector; @@ -96,7 +97,7 @@ public TableSchema schemaFor(TableId id) { * or if the table has been excluded by the filters */ public Table tableFor(TableId id) { - return tableFilter.test(id) ? tables.forTable(id) : null; + return tableFilter.isIncluded(id) ? tables.forTable(id) : null; } protected Tables tables() { @@ -108,7 +109,7 @@ protected void clearSchemas() { } protected void buildAndRegisterSchema(Table table) { - if (tableFilter.test(table.id())) { + if (tableFilter.isIncluded(table.id())) { TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers); schemasByTableId.put(table.id(), schema); } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalTableFilters.java b/debezium-core/src/main/java/io/debezium/relational/RelationalTableFilters.java index 324ec6d5b..ce7272720 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalTableFilters.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalTableFilters.java @@ -8,13 +8,14 @@ import java.util.function.Predicate; import io.debezium.config.Configuration; +import io.debezium.relational.Tables.TableFilter; import io.debezium.schema.DataCollectionFilters; public class RelationalTableFilters implements DataCollectionFilters { private final TableFilter tableFilter; - public RelationalTableFilters(Configuration config, Predicate systemTablesFilter) { + public RelationalTableFilters(Configuration config, TableFilter systemTablesFilter) { // Define the filter using the whitelists and blacklists for tables and database names ... Predicate predicate = Selectors.tableSelector() // .includeDatabases(config.getString(RelationalDatabaseConnectorConfig.DATABASE_WHITELIST)) @@ -26,7 +27,7 @@ public RelationalTableFilters(Configuration config, Predicate systemTab Predicate finalPredicate = config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN) - ? predicate.and(systemTablesFilter.negate()) + ? predicate.and(systemTablesFilter::isIncluded) : predicate; this.tableFilter = t -> finalPredicate.test(t); @@ -36,11 +37,4 @@ public RelationalTableFilters(Configuration config, Predicate systemTab public TableFilter dataCollectionFilter() { return tableFilter; } - - @FunctionalInterface - public interface TableFilter extends DataCollectionFilter{ - - @Override - boolean isIncluded(TableId tableId); - } } diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index f35655595..ff797fdbd 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -19,6 +19,8 @@ import org.apache.kafka.connect.data.Schema; import io.debezium.annotation.ThreadSafe; +import io.debezium.schema.DataCollectionFilters.DataCollectionFilter; +import io.debezium.schema.DatabaseSchema; import io.debezium.util.Collect; import io.debezium.util.FunctionalReadWriteLock; @@ -30,35 +32,24 @@ @ThreadSafe public final class Tables { - /** - * Create a {@link TableNameFilter} for the given {@link Predicate Predicate}. - * @param predicate the {@link TableId} predicate filter; may be null - * @return the TableNameFilter; never null - */ - public static TableNameFilter filterFor( Predicate predicate) { - if ( predicate == null ) return (catalogName, schemaName, tableName)->true; - return (catalogName, schemaName, tableName)->{ - TableId id = new TableId(catalogName, schemaName, tableName); - return predicate.test(id); - }; - } - /** * A filter for tables. */ @FunctionalInterface - public static interface TableNameFilter { + public interface TableFilter extends DataCollectionFilter { + /** - * Determine whether the named table should be included. - * - * @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not - * show a schema for this table - * @param tableName the name of the table - * @return {@code true} if the table should be included, or {@code false} if the table should be excluded + * Determines whether the given table should be included in the current {@link DatabaseSchema}. */ - boolean matches(String catalogName, String schemaName, String tableName); + @Override + boolean isIncluded(TableId tableId); + + /** + * Creates a {@link TableFilter} from the given predicate. + */ + public static TableFilter fromPredicate(Predicate predicate) { + return t -> predicate.test(t); + } } /** @@ -325,12 +316,12 @@ public boolean equals(Object obj) { return false; } - public Tables subset(Predicate filter) { + public Tables subset(TableFilter filter) { if (filter == null) return this; return lock.read(() -> { Tables result = new Tables(tableIdCaseInsensitive); tablesByTableId.forEach((tableId, table) -> { - if (filter.test(tableId)) { + if (filter.isIncluded(tableId)) { result.overwriteTable(table); } });