DBZ-9 Added MySQL whitelist and blacklists on tables and databases.
This commit is contained in:
parent
89c17d6d16
commit
c501f8486f
@ -16,6 +16,7 @@
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
@ -36,6 +37,7 @@
|
||||
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.relational.Tables;
|
||||
import io.debezium.relational.ddl.DdlParser;
|
||||
import io.debezium.relational.history.DatabaseHistory;
|
||||
@ -133,13 +135,19 @@ public void start(Map<String, String> props) {
|
||||
MySqlConnectorConfig.MAX_BATCH_SIZE, MySqlConnectorConfig.MAX_QUEUE_SIZE, maxBatchSize);
|
||||
}
|
||||
|
||||
// Define the filter using the whitelists and blacklists for tables and database names ...
|
||||
Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
|
||||
config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
|
||||
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
|
||||
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
|
||||
|
||||
// Create the queue ...
|
||||
events = new LinkedBlockingDeque<>(maxQueueSize);
|
||||
batchEvents = new ArrayList<>(maxBatchSize);
|
||||
|
||||
// Set up our handlers ...
|
||||
// Set up our handlers for specific kinds of events ...
|
||||
tables = new Tables();
|
||||
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables);
|
||||
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
|
||||
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
|
||||
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
|
||||
eventHandlers.put(EventType.EXT_WRITE_ROWS, tableConverters::handleInsert);
|
||||
|
@ -84,10 +84,26 @@ public class MySqlConnectorConfig {
|
||||
.withDefault(false)
|
||||
.withValidation(Field::isBoolean);
|
||||
|
||||
public static final Field TABLE_WHITELIST = Field.create("table.whitelist")
|
||||
.withDescription("A comma-separated list of table identifiers to be monitored, where each identifer consists "
|
||||
+ "of the '<databaseName>.<tableName>'. A whitelist takes precedence over any blacklist.");
|
||||
|
||||
public static final Field TABLE_BLACKLIST = Field.create("table.blacklist")
|
||||
.withDescription("A comma-separated list of table identifiers to not be monitored, where each identifer consists "
|
||||
+ "of the '<databaseName>.<tableName>'. Any whitelist takes precedence over this blacklist.");
|
||||
|
||||
public static final Field DATABASE_WHITELIST = Field.create("database.whitelist")
|
||||
.withDescription("A comma-separated list of database names to be monitored. A database whitelist takes precedence over any database blacklist and supersedes a table whitelist or table blacklist.");
|
||||
|
||||
public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist")
|
||||
.withDescription("A comma-separated list of database names to not be monitored. Any database whitelist takes precedence over this blacklist, and supersedes a table whitelist or table blacklist.");
|
||||
|
||||
public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(USER, PASSWORD, HOSTNAME, PORT, SERVER_ID,
|
||||
SERVER_NAME, CONNECTION_TIMEOUT_MS, KEEP_ALIVE,
|
||||
MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
|
||||
DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES);
|
||||
DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES,
|
||||
TABLE_WHITELIST, TABLE_BLACKLIST,
|
||||
DATABASE_WHITELIST, DATABASE_BLACKLIST);
|
||||
|
||||
private static int validateMaxQueueSize(Configuration config, Field field, Consumer<String> problems) {
|
||||
int maxQueueSize = config.getInteger(field);
|
||||
|
@ -12,6 +12,7 @@
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
@ -53,9 +54,11 @@ final class TableConverters {
|
||||
private final Map<Long, Converter> convertersByTableId = new HashMap<>();
|
||||
private final Map<String, Long> tableNumbersByTableName = new HashMap<>();
|
||||
private final boolean recordSchemaChangesInSourceRecords;
|
||||
private final Predicate<TableId> tableFilter;
|
||||
|
||||
public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
|
||||
boolean recordSchemaChangesInSourceRecords, Tables tables) {
|
||||
boolean recordSchemaChangesInSourceRecords, Tables tables,
|
||||
Predicate<TableId> tableFilter) {
|
||||
Objects.requireNonNull(topicSelector, "A topic selector is required");
|
||||
Objects.requireNonNull(dbHistory, "Database history storage is required");
|
||||
Objects.requireNonNull(tables, "A Tables object is required");
|
||||
@ -64,6 +67,7 @@ public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
|
||||
this.tables = tables;
|
||||
this.ddlParser = new MySqlDdlParser(false); // don't include views
|
||||
this.recordSchemaChangesInSourceRecords = recordSchemaChangesInSourceRecords;
|
||||
this.tableFilter = tableFilter != null ? tableFilter : (id) -> true;
|
||||
}
|
||||
|
||||
public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRecord> recorder) {
|
||||
@ -133,6 +137,11 @@ public void updateTableMetadata(Event event, SourceInfo source, Consumer<SourceR
|
||||
|
||||
// Generate this table's insert, update, and delete converters ...
|
||||
Converter converter = new Converter() {
|
||||
@Override
|
||||
public TableId tableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String topic() {
|
||||
return topicName;
|
||||
@ -191,17 +200,19 @@ public void handleInsert(Event event, SourceInfo source, Consumer<SourceRecord>
|
||||
long tableNumber = write.getTableId();
|
||||
BitSet includedColumns = write.getIncludedColumns();
|
||||
Converter converter = convertersByTableId.get(tableNumber);
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Serializable[] values = write.getRows().get(row);
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(values, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.inserted(values, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
if (tableFilter.test(converter.tableId())) {
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Serializable[] values = write.getRows().get(row);
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(values, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.inserted(values, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,19 +229,21 @@ public void handleUpdate(Event event, SourceInfo source, Consumer<SourceRecord>
|
||||
BitSet includedColumns = update.getIncludedColumns();
|
||||
BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
|
||||
Converter converter = convertersByTableId.get(tableNumber);
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Map.Entry<Serializable[], Serializable[]> changes = update.getRows().get(row);
|
||||
Serializable[] before = changes.getKey();
|
||||
Serializable[] after = changes.getValue();
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(after, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
if (tableFilter.test(converter.tableId())) {
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Map.Entry<Serializable[], Serializable[]> changes = update.getRows().get(row);
|
||||
Serializable[] before = changes.getKey();
|
||||
Serializable[] after = changes.getValue();
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(after, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.updated(before, includedColumnsBefore, after, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,21 +252,25 @@ public void handleDelete(Event event, SourceInfo source, Consumer<SourceRecord>
|
||||
long tableNumber = deleted.getTableId();
|
||||
BitSet includedColumns = deleted.getIncludedColumns();
|
||||
Converter converter = convertersByTableId.get(tableNumber);
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Serializable[] values = deleted.getRows().get(row);
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(values, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.inserted(values, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
if (tableFilter.test(converter.tableId())) {
|
||||
String topic = converter.topic();
|
||||
Integer partition = converter.partition();
|
||||
for (int row = 0; row <= source.eventRowNumber(); ++row) {
|
||||
Serializable[] values = deleted.getRows().get(row);
|
||||
Schema keySchema = converter.keySchema();
|
||||
Object key = converter.createKey(values, includedColumns);
|
||||
Schema valueSchema = converter.valueSchema();
|
||||
Struct value = converter.inserted(values, includedColumns);
|
||||
SourceRecord record = new SourceRecord(source.partition(), source.offset(row), topic, partition,
|
||||
keySchema, key, valueSchema, value);
|
||||
recorder.accept(record);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static interface Converter {
|
||||
TableId tableId();
|
||||
|
||||
String topic();
|
||||
|
||||
Integer partition();
|
||||
|
@ -5,6 +5,9 @@
|
||||
*/
|
||||
package io.debezium.function;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
@ -13,6 +16,112 @@
|
||||
*/
|
||||
public class Predicates {
|
||||
|
||||
|
||||
/**
|
||||
* Generate a whitelist filter/predicate that allows only those values that <em>are</em> included in the supplied input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param splitter the function that splits the input into multiple items; may not be null
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> whitelist(String input, Function<String, String[]> splitter, Function<String, T> factory) {
|
||||
if ( input == null ) return (str)->false;
|
||||
Set<T> matches = new HashSet<>();
|
||||
for (String item : splitter.apply(input)) {
|
||||
T obj = factory.apply(item);
|
||||
if ( obj != null ) matches.add(obj);
|
||||
}
|
||||
return matches::contains;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a whitelist filter/predicate that allows only those values that <em>are</em> included in the supplied input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param delimiter the character used to delimit the items in the input
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> whitelist(String input, char delimiter, Function<String, T> factory) {
|
||||
return whitelist(input, (str) -> str.split("[" + delimiter + "]"), factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a whitelist filter/predicate that allows only those values that <em>are</em> included in the supplied
|
||||
* comma-separated input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> whitelist(String input, Function<String, T> factory) {
|
||||
return whitelist(input, (str) -> str.split("[\\,]"), factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a whitelist filter/predicate that allows only those values that <em>are</em> included in the supplied
|
||||
* comma-separated input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static Predicate<String> whitelist(String input) {
|
||||
return whitelist(input, (str) -> str);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a blacklist filter/predicate that allows only those values that are <em>not</em> included in the supplied input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param splitter the function that splits the input into multiple items; may not be null
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> blacklist(String input, Function<String, String[]> splitter, Function<String, T> factory) {
|
||||
return whitelist(input, splitter, factory).negate();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a blacklist filter/predicate that allows only those values that are <em>not</em> included in the supplied input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param delimiter the character used to delimit the items in the input
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> blacklist(String input, char delimiter, Function<String, T> factory) {
|
||||
return whitelist(input, delimiter, factory).negate();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a blacklist filter/predicate that allows only those values that are <em>not</em> included in the supplied comma-separated input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @param factory the factory for creating string items into filter matches; may not be null
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static <T> Predicate<T> blacklist(String input, Function<String, T> factory) {
|
||||
return whitelist(input, factory).negate();
|
||||
}
|
||||
/**
|
||||
* Generate a blacklist filter/predicate that allows only those values that are <em>not</em> included in the supplied comma-separated input.
|
||||
*
|
||||
* @param input the input string
|
||||
* @return the predicate that returns {@code true} if and only if the argument to the predicate matches (with
|
||||
* {@link Object#equals(Object) equals(...)} one of the objects parsed from the input; never null
|
||||
*/
|
||||
public static Predicate<String> blacklist(String input) {
|
||||
return whitelist(input).negate();
|
||||
}
|
||||
|
||||
public static <R> Predicate<R> not(Predicate<R> predicate) {
|
||||
return predicate.negate();
|
||||
}
|
||||
|
@ -5,89 +5,167 @@
|
||||
*/
|
||||
package io.debezium.relational;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import io.debezium.annotation.Immutable;
|
||||
import io.debezium.function.Predicates;
|
||||
|
||||
/**
|
||||
* Unique identifier for a database table.
|
||||
*
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
@Immutable
|
||||
public final class TableId implements Comparable<TableId> {
|
||||
|
||||
/**
|
||||
* Create a predicate function that allows only those {@link TableId}s that are allowed by the database whitelist (or
|
||||
* not disallowed by the database blacklist) and allowed by the table whitelist (or not disallowed by the table blacklist).
|
||||
* Therefore, blacklists are only used if there is no corresponding whitelist.
|
||||
* <p>
|
||||
* Qualified table names are comma-separated strings that are each {@link #parse(String) parsed} into {@link TableId} objects.
|
||||
*
|
||||
* @param dbWhitelist the comma-separated string listing the names of the databases to be explicitly allowed;
|
||||
* may be null
|
||||
* @param dbBlacklist the comma-separated string listing the names of the databases to be explicitly disallowed;
|
||||
* may be null
|
||||
* @param tableWhitelist the comma-separated string listing the qualified names of the tables to be explicitly allowed;
|
||||
* may be null
|
||||
* @param tableBlacklist the comma-separated string listing the qualified names of the tables to be explicitly disallowed;
|
||||
* may be null
|
||||
* @return the predicate function; never null
|
||||
*/
|
||||
public static Predicate<TableId> filter(String dbWhitelist, String dbBlacklist, String tableWhitelist, String tableBlacklist) {
|
||||
Predicate<TableId> tableExclusions = tableBlacklist == null ? null : Predicates.blacklist(tableBlacklist, TableId::parse);
|
||||
Predicate<TableId> tableInclusions = tableWhitelist == null ? null : Predicates.whitelist(tableWhitelist, TableId::parse);
|
||||
Predicate<TableId> tableFilter = tableInclusions != null ? tableInclusions : tableExclusions;
|
||||
Predicate<String> dbExclusions = dbBlacklist == null ? null : Predicates.blacklist(dbBlacklist);
|
||||
Predicate<String> dbInclusions = dbWhitelist == null ? null : Predicates.whitelist(dbWhitelist);
|
||||
Predicate<String> dbFilter = dbInclusions != null ? dbInclusions : dbExclusions;
|
||||
if (dbFilter != null) {
|
||||
if (tableFilter != null) {
|
||||
return (id) -> dbFilter.test(id.catalog()) && tableFilter.test(id);
|
||||
}
|
||||
return (id) -> dbFilter.test(id.catalog());
|
||||
}
|
||||
if (tableFilter != null) {
|
||||
return tableFilter;
|
||||
}
|
||||
return (id) -> true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the supplied string delimited with a period ({@code .}) character, extracting up to the first 3 parts into a TableID.
|
||||
* If the input contains only two parts, then the first part will be used as the catalog name and the second as the table
|
||||
* name.
|
||||
*
|
||||
* @param str the input string
|
||||
* @return the table ID, or null if it could not be parsed
|
||||
*/
|
||||
public static TableId parse(String str) {
|
||||
return parse(str, '.', true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the supplied string, extracting up to the first 3 parts into a TableID.
|
||||
*
|
||||
* @param str the input string
|
||||
* @param delimiter the delimiter between parts
|
||||
* @param useCatalogBeforeSchema {@code true} if the parsed string contains only 2 items and the first should be used as
|
||||
* the catalog and the second as the table name, or {@code false} if the first should be used as the schema and the
|
||||
* second
|
||||
* as the table name
|
||||
* @return the table ID, or null if it could not be parsed
|
||||
*/
|
||||
public static TableId parse(String str, char delimiter, boolean useCatalogBeforeSchema) {
|
||||
String[] parts = str.split("[\\" + delimiter + "]");
|
||||
if (parts.length == 0) return null;
|
||||
if (parts.length == 1) return new TableId(null, null, parts[0]); // table only
|
||||
if (parts.length == 2) {
|
||||
if (useCatalogBeforeSchema) return new TableId(parts[0], null, parts[1]); // catalog & table only
|
||||
return new TableId(null, parts[0], parts[1]); // catalog & table only
|
||||
}
|
||||
return new TableId(parts[0], parts[1], parts[2]); // catalog & table only
|
||||
}
|
||||
|
||||
private final String catalogName;
|
||||
private final String schemaName;
|
||||
private final String tableName;
|
||||
private final String id;
|
||||
|
||||
|
||||
/**
|
||||
* Create a new table identifier.
|
||||
*
|
||||
* @param catalogName the name of the database catalog that contains the table; may be null if the JDBC driver does not
|
||||
* show a schema for this table
|
||||
* @param schemaName the name of the database schema that contains the table; may be null if the JDBC driver does not
|
||||
* show a schema for this table
|
||||
* @param tableName the name of the table; may not be null
|
||||
*/
|
||||
public TableId( String catalogName, String schemaName, String tableName ) {
|
||||
public TableId(String catalogName, String schemaName, String tableName) {
|
||||
this.catalogName = catalogName;
|
||||
this.schemaName = schemaName;
|
||||
this.tableName = tableName;
|
||||
assert this.tableName != null;
|
||||
this.id = tableId(this.catalogName, this.schemaName, this.tableName);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the name of the JDBC catalog.
|
||||
*
|
||||
* @return the catalog name, or null if the table does not belong to a catalog
|
||||
*/
|
||||
public String catalog() {
|
||||
return catalogName;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the name of the JDBC schema.
|
||||
*
|
||||
* @return the JDBC schema name, or null if the table does not belong to a JDBC schema
|
||||
*/
|
||||
public String schema() {
|
||||
return schemaName;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the name of the table.
|
||||
*
|
||||
* @return the table name; never null
|
||||
*/
|
||||
public String table() {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int compareTo(TableId that) {
|
||||
if ( this == that ) return 0;
|
||||
if (this == that) return 0;
|
||||
return this.id.compareTo(that.id);
|
||||
}
|
||||
|
||||
|
||||
public int compareToIgnoreCase(TableId that) {
|
||||
if ( this == that ) return 0;
|
||||
if (this == that) return 0;
|
||||
return this.id.compareToIgnoreCase(that.id);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return id.hashCode();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if ( obj instanceof TableId ) {
|
||||
return this.compareTo((TableId)obj) == 0;
|
||||
if (obj instanceof TableId) {
|
||||
return this.compareTo((TableId) obj) == 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
private static String tableId(String catalog, String schema, String table) {
|
||||
if (catalog == null || catalog.length() == 0) {
|
||||
if (schema == null || schema.length() == 0) {
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.function;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
public class PredicatesTest {
|
||||
|
||||
@Test
|
||||
public void shouldWhitelistCommaSeparatedIntegers() {
|
||||
Predicate<Integer> p = Predicates.whitelist("1,2,3,4,5",',', Integer::parseInt);
|
||||
assertThat(p.test(1)).isTrue();
|
||||
assertThat(p.test(2)).isTrue();
|
||||
assertThat(p.test(3)).isTrue();
|
||||
assertThat(p.test(4)).isTrue();
|
||||
assertThat(p.test(5)).isTrue();
|
||||
assertThat(p.test(0)).isFalse();
|
||||
assertThat(p.test(6)).isFalse();
|
||||
assertThat(p.test(-1)).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldBlacklistCommaSeparatedIntegers() {
|
||||
Predicate<Integer> p = Predicates.blacklist("1,2,3,4,5",',', Integer::parseInt);
|
||||
assertThat(p.test(1)).isFalse();
|
||||
assertThat(p.test(2)).isFalse();
|
||||
assertThat(p.test(3)).isFalse();
|
||||
assertThat(p.test(4)).isFalse();
|
||||
assertThat(p.test(5)).isFalse();
|
||||
assertThat(p.test(0)).isTrue();
|
||||
assertThat(p.test(6)).isTrue();
|
||||
assertThat(p.test(-1)).isTrue();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.relational;
|
||||
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
*/
|
||||
public class TableIdTest {
|
||||
|
||||
private Predicate<TableId> filter;
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseWhitelistAndTableWhitelist() {
|
||||
filter = TableId.filter("db1,db2", null, "db1.A,db1.B,db2.C", null);
|
||||
|
||||
assertAllowed(filter, "db1", "A");
|
||||
assertAllowed(filter, "db1", "B");
|
||||
assertNotAllowed(filter, "db1", "D");
|
||||
assertNotAllowed(filter, "db1", "E");
|
||||
assertNotAllowed(filter, "db1", "F");
|
||||
|
||||
assertAllowed(filter, "db2", "C");
|
||||
assertNotAllowed(filter, "db2", "G");
|
||||
assertNotAllowed(filter, "db2", "H");
|
||||
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseWhitelistAndTableBlacklist() {
|
||||
filter = TableId.filter("db1,db2", null, null, "db1.A,db1.B,db2.C");
|
||||
|
||||
assertNotAllowed(filter, "db1", "A");
|
||||
assertNotAllowed(filter, "db1", "B");
|
||||
assertAllowed(filter, "db1", "D");
|
||||
assertAllowed(filter, "db1", "E");
|
||||
assertAllowed(filter, "db1", "F");
|
||||
|
||||
assertNotAllowed(filter, "db2", "C");
|
||||
assertAllowed(filter, "db2", "G");
|
||||
assertAllowed(filter, "db2", "H");
|
||||
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseBlacklistAndTableWhitelist() {
|
||||
filter = TableId.filter(null,"db3,db4", "db1.A,db1.B,db2.C", null);
|
||||
|
||||
assertAllowed(filter, "db1", "A");
|
||||
assertAllowed(filter, "db1", "B");
|
||||
assertNotAllowed(filter, "db1", "D");
|
||||
assertNotAllowed(filter, "db1", "E");
|
||||
assertNotAllowed(filter, "db1", "F");
|
||||
|
||||
assertAllowed(filter, "db2", "C");
|
||||
assertNotAllowed(filter, "db2", "G");
|
||||
assertNotAllowed(filter, "db2", "H");
|
||||
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseBlacklistAndTableBlacklist() {
|
||||
filter = TableId.filter(null,"db3,db4", null, "db1.A,db1.B,db2.C");
|
||||
|
||||
assertNotAllowed(filter, "db1", "A");
|
||||
assertNotAllowed(filter, "db1", "B");
|
||||
assertAllowed(filter, "db1", "D");
|
||||
assertAllowed(filter, "db1", "E");
|
||||
assertAllowed(filter, "db1", "F");
|
||||
|
||||
assertNotAllowed(filter, "db2", "C");
|
||||
assertAllowed(filter, "db2", "G");
|
||||
assertAllowed(filter, "db2", "H");
|
||||
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithNoDatabaseFilterAndTableWhitelist() {
|
||||
filter = TableId.filter(null, null, "db1.A,db1.B,db2.C", null);
|
||||
|
||||
assertAllowed(filter, "db1", "A");
|
||||
assertAllowed(filter, "db1", "B");
|
||||
assertNotAllowed(filter, "db1", "D");
|
||||
assertNotAllowed(filter, "db1", "E");
|
||||
assertNotAllowed(filter, "db1", "F");
|
||||
|
||||
assertAllowed(filter, "db2", "C");
|
||||
assertNotAllowed(filter, "db2", "G");
|
||||
assertNotAllowed(filter, "db2", "H");
|
||||
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithNoDatabaseFilterAndTableBlacklist() {
|
||||
filter = TableId.filter(null, null, null, "db1.A,db1.B,db2.C");
|
||||
|
||||
assertNotAllowed(filter, "db1", "A");
|
||||
assertNotAllowed(filter, "db1", "B");
|
||||
assertAllowed(filter, "db1", "D");
|
||||
assertAllowed(filter, "db1", "E");
|
||||
assertAllowed(filter, "db1", "F");
|
||||
|
||||
assertNotAllowed(filter, "db2", "C");
|
||||
assertAllowed(filter, "db2", "G");
|
||||
assertAllowed(filter, "db2", "H");
|
||||
|
||||
assertAllowed(filter, "db3", "A");
|
||||
assertAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseWhitelistAndNoTableFilter() {
|
||||
filter = TableId.filter("db1,db2", null, null, null);
|
||||
|
||||
assertAllowed(filter, "db1", "A");
|
||||
assertAllowed(filter, "db2", "A");
|
||||
assertNotAllowed(filter, "db3", "A");
|
||||
assertNotAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateFilterWithDatabaseBlacklistAndNoTableFilter() {
|
||||
filter = TableId.filter(null, "db1,db2", null, null);
|
||||
|
||||
assertNotAllowed(filter, "db1", "A");
|
||||
assertNotAllowed(filter, "db2", "A");
|
||||
assertAllowed(filter, "db3", "A");
|
||||
assertAllowed(filter, "db4", "A");
|
||||
}
|
||||
|
||||
protected void assertAllowed(Predicate<TableId> filter, String dbName, String tableName) {
|
||||
TableId id = new TableId(dbName, null, tableName);
|
||||
assertThat(filter.test(id)).isTrue();
|
||||
}
|
||||
|
||||
protected void assertNotAllowed(Predicate<TableId> filter, String dbName, String tableName) {
|
||||
TableId id = new TableId(dbName, null, tableName);
|
||||
assertThat(filter.test(id)).isFalse();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user