DBZ-793 Using TableFilter consistently;

Only the MySQL connector's Filters class still uses Predicate for the time being, so to avoid to much merging trouble with DBZ-175.
This commit is contained in:
Gunnar Morling 2018-07-10 13:14:58 +02:00 committed by Jiri Pechanec
parent f9b8d830a8
commit 1523f230ca
10 changed files with 53 additions and 78 deletions

View File

@ -16,8 +16,6 @@
import io.debezium.relational.ColumnId;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableNameFilter;
import io.debezium.util.Collect;
/**
@ -108,10 +106,6 @@ public Predicate<TableId> tableFilter() {
return tableFilter;
}
public TableNameFilter tableNameFilter() {
return Tables.filterFor(tableFilter);
}
public Predicate<TableId> builtInTableFilter() {
return isBuiltInTable;
}

View File

@ -30,6 +30,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlChanges.DatabaseStatementStringConsumer;
import io.debezium.relational.ddl.DdlParser;
@ -85,7 +86,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, Predicate<String> gtidFil
super(
configuration,
topicSelector,
new Filters(configuration.getConfig()).tableFilter(),
TableFilter.fromPredicate(new Filters(configuration.getConfig()).tableFilter()),
new Filters(configuration.getConfig()).columnFilter(),
new TableSchemaBuilder(
getValueConverters(configuration.getConfig()), SchemaNameAdjuster.create(logger), SourceInfo.SCHEMA)

View File

@ -15,29 +15,29 @@
import io.debezium.relational.ColumnId;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
/**
* A utility that is contains various filters for acceptable {@link TableId}s and columns.
*
*
* @author Horia Chiorean
*/
@Immutable
public class Filters {
protected static final List<String> SYSTEM_SCHEMAS = Arrays.asList("pg_catalog", "information_schema");
protected static final String SYSTEM_SCHEMA_BLACKLIST = SYSTEM_SCHEMAS.stream().collect(Collectors.joining(","));
protected static final Predicate<String> IS_SYSTEM_SCHEMA = SYSTEM_SCHEMAS::contains;
protected static final String TEMP_TABLE_BLACKLIST = ".*\\.pg_temp.*";
private final Predicate<TableId> tableFilter;
protected static final String TEMP_TABLE_BLACKLIST = ".*\\.pg_temp.*";
private final TableFilter tableFilter;
private final Predicate<ColumnId> columnFilter;
/**
* @param config the configuration; may not be null
*/
public Filters(PostgresConnectorConfig config) {
// we always want to exclude PG system schemas as they are never part of logical decoding events
String schemaBlacklist = config.schemaBlacklist();
if (schemaBlacklist != null) {
@ -45,36 +45,32 @@ public Filters(PostgresConnectorConfig config) {
} else {
schemaBlacklist = SYSTEM_SCHEMA_BLACKLIST;
}
String tableBlacklist = config.tableBlacklist();
if (tableBlacklist != null) {
tableBlacklist = tableBlacklist + "," + TEMP_TABLE_BLACKLIST;
tableBlacklist = tableBlacklist + "," + TEMP_TABLE_BLACKLIST;
} else {
tableBlacklist = TEMP_TABLE_BLACKLIST;
}
// Define the filter using the whitelists and blacklists for table names ...
this.tableFilter = Selectors.tableSelector()
this.tableFilter = TableFilter.fromPredicate(Selectors.tableSelector()
.includeTables(config.tableWhitelist())
.excludeTables(tableBlacklist)
.includeSchemas(config.schemaWhitelist())
.excludeSchemas(schemaBlacklist)
.build();
.build());
// Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
this.columnFilter = Selectors.excludeColumns(config.columnBlacklist());
}
protected Predicate<TableId> tableFilter() {
protected TableFilter tableFilter() {
return tableFilter;
}
protected Predicate<ColumnId> columnFilter() {
return columnFilter;
}
protected Tables.TableNameFilter tableNameFilter() {
return Tables.filterFor(tableFilter);
}
}
}

View File

@ -9,7 +9,6 @@
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
@ -80,7 +79,7 @@ protected PostgresSchema refresh(PostgresConnection connection, boolean printRep
}
// read all the information from the DB
connection.readSchema(tables(), null, null, filters.tableNameFilter(), null, true);
connection.readSchema(tables(), null, null, filters.tableFilter(), null, true);
if (printReplicaIdentityInfo) {
// print out all the replica identity info
tableIds().forEach(tableId -> printReplicaIdentityInfo(connection, tableId));
@ -108,8 +107,7 @@ private void printReplicaIdentityInfo(PostgresConnection connection, TableId tab
*/
protected void refresh(PostgresConnection connection, TableId tableId) throws SQLException {
Tables temp = new Tables();
Tables.TableNameFilter tableNameFilter = Tables.filterFor(Predicate.isEqual(tableId));
connection.readSchema(temp, null, null, tableNameFilter, null, true);
connection.readSchema(temp, null, null, tableId::equals, null, true);
// we expect the refreshed table to be there
assert temp.size() == 1;
@ -132,7 +130,7 @@ protected void refresh(Table table) {
}
protected boolean isFilteredOut(TableId id) {
return !filters.tableFilter().test(id);
return !filters.tableFilter().isIncluded(id);
}
/**

View File

@ -42,7 +42,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.util.Collect;
import io.debezium.util.Strings;
@ -803,7 +803,7 @@ protected int resolveNativeType(String typeName) {
* @throws SQLException if an error occurs while accessing the database metadata
*/
public void readSchema(Tables tables, String databaseCatalog, String schemaNamePattern,
TableNameFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
TableFilter tableFilter, ColumnNameFilter columnFilter, boolean removeTablesNotFoundInJdbc)
throws SQLException {
// Before we make any changes, get the copy of the set of table IDs ...
Set<TableId> tableIdsBefore = new HashSet<>(tables.tableIds());
@ -832,7 +832,7 @@ public void readSchema(Tables tables, String databaseCatalog, String schemaNameP
if (viewIds.contains(tableId)) {
continue;
}
if (tableFilter == null || tableFilter.matches(catalogName, schemaName, tableName)) {
if (tableFilter == null || tableFilter.isIncluded(tableId)) {
List<Column> cols = columnsByTable.computeIfAbsent(tableId, name -> new ArrayList<>());
String columnName = rs.getString(4);
if (columnFilter == null || columnFilter.matches(catalogName, schemaName, tableName, columnName)) {

View File

@ -8,6 +8,7 @@
import java.util.function.Predicate;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.TopicSelector;
@ -15,7 +16,7 @@ public abstract class HistorizedRelationalDatabaseSchema extends RelationalDatab
implements HistorizedDatabaseSchema<TableId> {
protected HistorizedRelationalDatabaseSchema(CommonConnectorConfig config, TopicSelector<TableId> topicSelector,
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, TableSchemaBuilder schemaBuilder,
TableFilter tableFilter, Predicate<ColumnId> columnFilter, TableSchemaBuilder schemaBuilder,
boolean tableIdCaseInsensitive) {
super(config, topicSelector, tableFilter, columnFilter, schemaBuilder, tableIdCaseInsensitive);
}

View File

@ -5,8 +5,6 @@
*/
package io.debezium.relational;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
@ -15,6 +13,7 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.relational.Tables.TableFilter;
/**
* Configuration options shared across the relational CDC connectors.
@ -61,7 +60,7 @@ public class RelationalDatabaseConnectorConfig extends CommonConnectorConfig {
private final RelationalTableFilters tableFilters;
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, Predicate<TableId> systemTablesFilter) {
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter) {
super(config, logicalName);
this.tableFilters = new RelationalTableFilters(config, systemTablesFilter);

View File

@ -13,6 +13,7 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
@ -26,7 +27,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId
private final TopicSelector<TableId> topicSelector;
private final TableSchemaBuilder schemaBuilder;
private final Predicate<TableId> tableFilter;
private final TableFilter tableFilter;
private final Predicate<ColumnId> columnFilter;
private final ColumnMappers columnMappers;
@ -35,7 +36,7 @@ public abstract class RelationalDatabaseSchema implements DatabaseSchema<TableId
private final Tables tables;
protected RelationalDatabaseSchema(CommonConnectorConfig config, TopicSelector<TableId> topicSelector,
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, TableSchemaBuilder schemaBuilder,
TableFilter tableFilter, Predicate<ColumnId> columnFilter, TableSchemaBuilder schemaBuilder,
boolean tableIdCaseInsensitive) {
this.topicSelector = topicSelector;
@ -96,7 +97,7 @@ public TableSchema schemaFor(TableId id) {
* or if the table has been excluded by the filters
*/
public Table tableFor(TableId id) {
return tableFilter.test(id) ? tables.forTable(id) : null;
return tableFilter.isIncluded(id) ? tables.forTable(id) : null;
}
protected Tables tables() {
@ -108,7 +109,7 @@ protected void clearSchemas() {
}
protected void buildAndRegisterSchema(Table table) {
if (tableFilter.test(table.id())) {
if (tableFilter.isIncluded(table.id())) {
TableSchema schema = schemaBuilder.create(schemaPrefix, getEnvelopeSchemaName(table), table, columnFilter, columnMappers);
schemasByTableId.put(table.id(), schema);
}

View File

@ -8,13 +8,14 @@
import java.util.function.Predicate;
import io.debezium.config.Configuration;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.schema.DataCollectionFilters;
public class RelationalTableFilters implements DataCollectionFilters {
private final TableFilter tableFilter;
public RelationalTableFilters(Configuration config, Predicate<TableId> systemTablesFilter) {
public RelationalTableFilters(Configuration config, TableFilter systemTablesFilter) {
// Define the filter using the whitelists and blacklists for tables and database names ...
Predicate<TableId> predicate = Selectors.tableSelector()
// .includeDatabases(config.getString(RelationalDatabaseConnectorConfig.DATABASE_WHITELIST))
@ -26,7 +27,7 @@ public RelationalTableFilters(Configuration config, Predicate<TableId> systemTab
Predicate<TableId> finalPredicate = config.getBoolean(RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN)
? predicate.and(systemTablesFilter.negate())
? predicate.and(systemTablesFilter::isIncluded)
: predicate;
this.tableFilter = t -> finalPredicate.test(t);
@ -36,11 +37,4 @@ public RelationalTableFilters(Configuration config, Predicate<TableId> systemTab
public TableFilter dataCollectionFilter() {
return tableFilter;
}
@FunctionalInterface
public interface TableFilter extends DataCollectionFilter<TableId>{
@Override
boolean isIncluded(TableId tableId);
}
}

View File

@ -19,6 +19,8 @@
import org.apache.kafka.connect.data.Schema;
import io.debezium.annotation.ThreadSafe;
import io.debezium.schema.DataCollectionFilters.DataCollectionFilter;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Collect;
import io.debezium.util.FunctionalReadWriteLock;
@ -30,35 +32,24 @@
@ThreadSafe
public final class Tables {
/**
* Create a {@link TableNameFilter} for the given {@link Predicate Predicate<TableId>}.
* @param predicate the {@link TableId} predicate filter; may be null
* @return the TableNameFilter; never null
*/
public static TableNameFilter filterFor( Predicate<TableId> predicate) {
if ( predicate == null ) return (catalogName, schemaName, tableName)->true;
return (catalogName, schemaName, tableName)->{
TableId id = new TableId(catalogName, schemaName, tableName);
return predicate.test(id);
};
}
/**
* A filter for tables.
*/
@FunctionalInterface
public static interface TableNameFilter {
public interface TableFilter extends DataCollectionFilter<TableId> {
/**
* Determine whether the named table should be included.
*
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
* show a schema for this table
* @param tableName the name of the table
* @return {@code true} if the table should be included, or {@code false} if the table should be excluded
* Determines whether the given table should be included in the current {@link DatabaseSchema}.
*/
boolean matches(String catalogName, String schemaName, String tableName);
@Override
boolean isIncluded(TableId tableId);
/**
* Creates a {@link TableFilter} from the given predicate.
*/
public static TableFilter fromPredicate(Predicate<TableId> predicate) {
return t -> predicate.test(t);
}
}
/**
@ -325,12 +316,12 @@ public boolean equals(Object obj) {
return false;
}
public Tables subset(Predicate<TableId> filter) {
public Tables subset(TableFilter filter) {
if (filter == null) return this;
return lock.read(() -> {
Tables result = new Tables(tableIdCaseInsensitive);
tablesByTableId.forEach((tableId, table) -> {
if (filter.test(tableId)) {
if (filter.isIncluded(tableId)) {
result.overwriteTable(table);
}
});