DBZ-3271 Unifying filter handling across relational connectors

This commit is contained in:
Gunnar Morling 2021-03-12 21:21:40 +01:00 committed by Jiri Pechanec
parent 30f0337112
commit 911b5e9a3b
12 changed files with 74 additions and 137 deletions

View File

@ -29,6 +29,7 @@
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.ColumnId;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -981,7 +982,6 @@ protected static ConfigDef configDef() {
private final TemporalPrecisionMode temporalPrecisionMode;
private final Duration connectionTimeout;
private final Predicate<String> gtidSourceFilter;
private final ColumnNameFilter columnFilter;
private final EventProcessingFailureHandlingMode inconsistentSchemaFailureHandlingMode;
private final Predicate<String> ddlFilter;
private final boolean legacy;
@ -993,7 +993,8 @@ public MySqlConnectorConfig(Configuration config) {
config,
config.getString(SERVER_NAME),
TableFilter.fromPredicate(MySqlConnectorConfig::isNotBuiltInTable),
true);
true,
ColumnFilterMode.CATALOG);
this.config = config;
this.legacy = MySqlConnector.isLegacy(config.getString(io.debezium.connector.mysql.MySqlConnector.IMPLEMENTATION_PROP));
@ -1018,13 +1019,6 @@ public MySqlConnectorConfig(Configuration config) {
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
if (columnIncludeList() != null) {
this.columnFilter = getColumnIncludeNameFilter(columnIncludeList());
}
else {
this.columnFilter = getColumnExcludeNameFilter(columnExcludeList());
}
// Set up the DDL filter
final String ddlFilter = config.getString(DatabaseHistory.DDL_FILTER);
this.ddlFilter = (ddlFilter != null) ? Predicates.includes(ddlFilter) : (x -> false);
@ -1266,10 +1260,6 @@ public boolean matches(String catalogName, String schemaName, String tableName,
};
}
public ColumnNameFilter getColumnFilter() {
return columnFilter;
}
public static boolean isBuiltInDatabase(String databaseName) {
if (databaseName == null) {
return false;

View File

@ -12,6 +12,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.ColumnNameFilter;
@ -119,13 +120,12 @@ public Builder(Configuration config) {
String includeColumnsFilter = config.getString(MySqlConnectorConfig.COLUMN_INCLUDE_LIST);
if (includeColumnsFilter != null) {
this.columnFilter = ColumnNameFilterFactory.createIncludeListFilter(includeColumnsFilter);
this.columnFilter = ColumnNameFilterFactory.createIncludeListFilter(includeColumnsFilter, ColumnFilterMode.CATALOG);
}
else {
this.columnFilter = ColumnNameFilterFactory
.createExcludeListFilter(config.getFallbackStringProperty(MySqlConnectorConfig.COLUMN_EXCLUDE_LIST, MySqlConnectorConfig.COLUMN_BLACKLIST));
String excludeColumnsFilter = config.getFallbackStringProperty(MySqlConnectorConfig.COLUMN_EXCLUDE_LIST, MySqlConnectorConfig.COLUMN_BLACKLIST);
this.columnFilter = ColumnNameFilterFactory.createExcludeListFilter(excludeColumnsFilter, ColumnFilterMode.CATALOG);
}
}
/**

View File

@ -11,7 +11,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -34,13 +33,11 @@
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.connector.oracle.xstream.OracleVersion;
import io.debezium.document.Document;
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.util.Strings;
@ -345,7 +342,6 @@ public static ConfigDef configDef() {
private final String pdbName;
private final String xoutServerName;
private final SnapshotMode snapshotMode;
private final ColumnNameFilter columnFilter;
private final Boolean tablenameCaseInsensitive;
private final OracleVersion oracleVersion;
@ -372,7 +368,7 @@ public static ConfigDef configDef() {
private final LogMiningDmlParser dmlParser;
public OracleConnectorConfig(Configuration config) {
super(OracleConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(config), x -> x.schema() + "." + x.table(), true);
super(OracleConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(config), x -> x.schema() + "." + x.table(), true, ColumnFilterMode.SCHEMA);
this.databaseName = toUpperCase(config.getString(DATABASE_NAME));
this.pdbName = toUpperCase(config.getString(PDB_NAME));
@ -384,13 +380,6 @@ public OracleConnectorConfig(Configuration config) {
this.jdbcConfig = config.subset(DATABASE_CONFIG_PREFIX, true);
this.snapshotEnhancementToken = config.getString(SNAPSHOT_ENHANCEMENT_TOKEN);
if (columnIncludeList() != null) {
this.columnFilter = getColumnIncludeNameFilter(columnIncludeList());
}
else {
this.columnFilter = getColumnExcludeNameFilter(columnExcludeList());
}
// LogMiner
this.connectorAdapter = ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER));
this.logMiningStrategy = LogMiningStrategy.parse(config.getString(LOG_MINING_STRATEGY));
@ -421,32 +410,6 @@ private static HistoryRecorder resolveLogMiningHistoryRecorder(Configuration con
return config.getInstance(LOG_MINING_HISTORY_RECORDER_CLASS, HistoryRecorder.class);
}
private static ColumnNameFilter getColumnExcludeNameFilter(String excludedColumnPatterns) {
return new ColumnNameFilter() {
Predicate<ColumnId> delegate = Predicates.excludes(excludedColumnPatterns, ColumnId::toString);
@Override
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
// ignore database name as it's not relevant here
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
}
};
}
private static ColumnNameFilter getColumnIncludeNameFilter(String includedColumnPatterns) {
return new ColumnNameFilter() {
Predicate<ColumnId> delegate = Predicates.includes(includedColumnPatterns, ColumnId::toString);
@Override
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
// ignore database name as it's not relevant here
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
}
};
}
public String getDatabaseName() {
return databaseName;
}
@ -482,10 +445,6 @@ public OracleVersion getOracleVersion() {
return oracleVersion;
}
public ColumnNameFilter getColumnFilter() {
return columnFilter;
}
@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {

View File

@ -13,8 +13,6 @@
import io.debezium.annotation.Immutable;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
/**
@ -31,7 +29,6 @@ public class Filters {
protected static final String TEMP_TABLE_EXCLUDE_LIST = ".*\\.pg_temp.*";
private final TableFilter tableFilter;
private final ColumnNameFilter columnFilter;
/**
* @param config the configuration; may not be null
@ -62,23 +59,9 @@ public Filters(PostgresConnectorConfig config) {
.includeSchemas(config.schemaIncludeList())
.excludeSchemas(schemaExcludeList)
.build());
String columnIncludeList = config.columnIncludeList();
if (columnIncludeList != null) {
this.columnFilter = ColumnNameFilterFactory.createIncludeListFilter(config.columnIncludeList());
}
else {
// Define the filter that excludes columns on the exclude list, truncated columns, and masked columns ...
this.columnFilter = ColumnNameFilterFactory.createExcludeListFilter(config.columnExcludeList());
}
}
protected TableFilter tableFilter() {
return tableFilter;
}
protected ColumnNameFilter columnFilter() {
return columnFilter;
}
}

View File

@ -39,6 +39,7 @@
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
@ -970,7 +971,8 @@ public PostgresConnectorConfig(Configuration config) {
config.getString(RelationalDatabaseConnectorConfig.SERVER_NAME),
new SystemTablesPredicate(),
x -> x.schema() + "." + x.table(),
DEFAULT_SNAPSHOT_FETCH_SIZE);
DEFAULT_SNAPSHOT_FETCH_SIZE,
ColumnFilterMode.SCHEMA);
this.truncateHandlingMode = TruncateHandlingMode.parse(config.getString(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE));
String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE);

View File

@ -60,7 +60,7 @@ public class PostgresSchema extends RelationalDatabaseSchema {
protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegistry, Charset databaseCharset,
TopicSelector<TableId> topicSelector) {
super(config, topicSelector, new Filters(config).tableFilter(),
new Filters(config).columnFilter(), getTableSchemaBuilder(config, typeRegistry, databaseCharset), false,
config.getColumnFilter(), getTableSchemaBuilder(config, typeRegistry, databaseCharset), false,
config.getKeyMapper());
this.typeRegistry = typeRegistry;

View File

@ -7,7 +7,6 @@
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -24,12 +23,10 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.document.Document;
import io.debezium.function.Predicates;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
@ -330,23 +327,17 @@ public static ConfigDef configDef() {
private final SnapshotMode snapshotMode;
private final SnapshotIsolationMode snapshotIsolationMode;
private final SourceTimestampMode sourceTimestampMode;
private final ColumnNameFilter columnFilter;
private final boolean readOnlyDatabaseConnection;
private final boolean skipLowActivityLsnsEnabled;
public SqlServerConnectorConfig(Configuration config) {
super(SqlServerConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(), x -> x.schema() + "." + x.table(), true);
super(SqlServerConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(), x -> x.schema() + "." + x.table(), true,
ColumnFilterMode.SCHEMA);
this.databaseName = config.getString(DATABASE_NAME);
this.instanceName = config.getString(INSTANCE);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
if (columnIncludeList() != null) {
this.columnFilter = getColumnIncludeNameFilter(columnIncludeList());
}
else {
this.columnFilter = getColumnExcludeNameFilter(columnExcludeList());
}
this.readOnlyDatabaseConnection = READ_ONLY_INTENT.equals(config.getString(APPLICATION_INTENT_KEY));
if (readOnlyDatabaseConnection) {
this.snapshotIsolationMode = SnapshotIsolationMode.SNAPSHOT;
@ -360,32 +351,6 @@ public SqlServerConnectorConfig(Configuration config) {
this.skipLowActivityLsnsEnabled = config.getBoolean(MAX_LSN_OPTIMIZATION);
}
private static ColumnNameFilter getColumnExcludeNameFilter(String excludedColumnPatterns) {
return new ColumnNameFilter() {
Predicate<ColumnId> delegate = Predicates.excludes(excludedColumnPatterns, ColumnId::toString);
@Override
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
// ignore database name as it's not relevant here
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
}
};
}
private static ColumnNameFilter getColumnIncludeNameFilter(String excludedColumnPatterns) {
return new ColumnNameFilter() {
Predicate<ColumnId> delegate = Predicates.includes(excludedColumnPatterns, ColumnId::toString);
@Override
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
// ignore database name as it's not relevant here
return delegate.test(new ColumnId(new TableId(null, schemaName, tableName), columnName));
}
};
}
public Configuration jdbcConfig() {
return getConfig().subset(DATABASE_CONFIG_PREFIX, true);
}
@ -410,10 +375,6 @@ public SourceTimestampMode getSourceTimestampMode() {
return sourceTimestampMode;
}
public ColumnNameFilter getColumnFilter() {
return columnFilter;
}
public boolean isReadOnlyDatabaseConnection() {
return readOnlyDatabaseConnection;
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
/**
* Modes for column name filters, either including a catalog (database) or schema name.
*
* @author Gunnar Morling
*/
public enum ColumnFilterMode {
CATALOG {
@Override
public TableId getTableIdForFilter(String catalog, String schema, String table) {
return new TableId(catalog, null, table);
}
},
SCHEMA {
@Override
public TableId getTableIdForFilter(String catalog, String schema, String table) {
return new TableId(null, schema, table);
}
};
public abstract TableId getTableIdForFilter(String catalog, String schema, String table);
}

View File

@ -59,8 +59,8 @@ public abstract class HistorizedRelationalDatabaseConnectorConfig extends Relati
.create();
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, String logicalName,
TableFilter systemTablesFilter, boolean useCatalogBeforeSchema) {
super(config, logicalName, systemTablesFilter, TableId::toString, DEFAULT_SNAPSHOT_FETCH_SIZE);
TableFilter systemTablesFilter, boolean useCatalogBeforeSchema, ColumnFilterMode columnFilterMode) {
super(config, logicalName, systemTablesFilter, TableId::toString, DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode);
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.logicalName = logicalName;
this.connectorClass = connectorClass;
@ -68,8 +68,8 @@ protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConn
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, String logicalName,
TableFilter systemTablesFilter, TableIdToStringMapper tableIdMapper,
boolean useCatalogBeforeSchema) {
super(config, logicalName, systemTablesFilter, tableIdMapper, DEFAULT_SNAPSHOT_FETCH_SIZE);
boolean useCatalogBeforeSchema, ColumnFilterMode columnFilterMode) {
super(config, logicalName, systemTablesFilter, tableIdMapper, DEFAULT_SNAPSHOT_FETCH_SIZE, columnFilterMode);
this.useCatalogBeforeSchema = useCatalogBeforeSchema;
this.logicalName = logicalName;
this.connectorClass = connectorClass;

View File

@ -29,6 +29,8 @@
import io.debezium.relational.Key.CustomKeyMapper;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.relational.Selectors.TableIdToStringMapper;
import io.debezium.relational.Tables.ColumnNameFilter;
import io.debezium.relational.Tables.ColumnNameFilterFactory;
import io.debezium.relational.Tables.TableFilter;
/**
@ -549,12 +551,14 @@ public static DecimalHandlingMode parse(String value, String defaultValue) {
.create();
private final RelationalTableFilters tableFilters;
private final ColumnNameFilter columnFilter;
private final TemporalPrecisionMode temporalPrecisionMode;
private final KeyMapper keyMapper;
private final TableIdToStringMapper tableIdMapper;
protected RelationalDatabaseConnectorConfig(Configuration config, String logicalName, TableFilter systemTablesFilter,
TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) {
TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize,
ColumnFilterMode columnFilterMode) {
super(config, logicalName, defaultSnapshotFetchSize);
this.temporalPrecisionMode = TemporalPrecisionMode.parse(config.getString(TIME_PRECISION_MODE));
@ -568,6 +572,16 @@ protected RelationalDatabaseConnectorConfig(Configuration config, String logical
else {
this.tableFilters = null;
}
String columnExcludeList = config.getFallbackStringProperty(COLUMN_EXCLUDE_LIST, COLUMN_BLACKLIST);
String columnIncludeList = config.getFallbackStringProperty(COLUMN_INCLUDE_LIST, COLUMN_WHITELIST);
if (columnIncludeList != null) {
this.columnFilter = ColumnNameFilterFactory.createIncludeListFilter(columnIncludeList, columnFilterMode);
}
else {
this.columnFilter = ColumnNameFilterFactory.createExcludeListFilter(columnExcludeList, columnFilterMode);
}
}
public RelationalTableFilters getTableFilters() {
@ -624,12 +638,8 @@ public String tableIncludeList() {
return getConfig().getFallbackStringProperty(TABLE_INCLUDE_LIST, TABLE_WHITELIST);
}
public String columnExcludeList() {
return getConfig().getFallbackStringProperty(COLUMN_EXCLUDE_LIST, COLUMN_BLACKLIST);
}
public String columnIncludeList() {
return getConfig().getFallbackStringProperty(COLUMN_INCLUDE_LIST, COLUMN_WHITELIST);
public ColumnNameFilter getColumnFilter() {
return columnFilter;
}
public Boolean isFullColummnScanRequired() {

View File

@ -72,9 +72,10 @@ public static class ColumnNameFilterFactory {
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names to exclude; may be null or
* @return a column name filter; never null
*/
public static ColumnNameFilter createExcludeListFilter(String fullyQualifiedColumnNames) {
public static ColumnNameFilter createExcludeListFilter(String fullyQualifiedColumnNames, ColumnFilterMode columnFilterMode) {
Predicate<ColumnId> delegate = Predicates.excludes(fullyQualifiedColumnNames, ColumnId::toString);
return (catalogName, schemaName, tableName, columnName) -> delegate.test(new ColumnId(new TableId(catalogName, schemaName, tableName), columnName));
return (catalogName, schemaName, tableName, columnName) -> delegate
.test(new ColumnId(columnFilterMode.getTableIdForFilter(catalogName, schemaName, tableName), columnName));
}
/**
@ -87,9 +88,10 @@ public static ColumnNameFilter createExcludeListFilter(String fullyQualifiedColu
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names to include; may be null or
* @return a column name filter; never null
*/
public static ColumnNameFilter createIncludeListFilter(String fullyQualifiedColumnNames) {
public static ColumnNameFilter createIncludeListFilter(String fullyQualifiedColumnNames, ColumnFilterMode columnFilterMode) {
Predicate<ColumnId> delegate = Predicates.includes(fullyQualifiedColumnNames, ColumnId::toString);
return (catalogName, schemaName, tableName, columnName) -> delegate.test(new ColumnId(new TableId(catalogName, schemaName, tableName), columnName));
return (catalogName, schemaName, tableName, columnName) -> delegate
.test(new ColumnId(columnFilterMode.getTableIdForFilter(catalogName, schemaName, tableName), columnName));
}
}

View File

@ -7,6 +7,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Selectors;
import io.debezium.relational.Tables;
@ -15,7 +16,7 @@ public class TestRelationalDatabaseConfig extends RelationalDatabaseConnectorCon
public TestRelationalDatabaseConfig(Configuration config, String logicalName, Tables.TableFilter systemTablesFilter,
Selectors.TableIdToStringMapper tableIdMapper, int defaultSnapshotFetchSize) {
super(config, logicalName, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize);
super(config, logicalName, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, ColumnFilterMode.SCHEMA);
}
@Override