DBZ-666 Supporting ordered snapshot using tables.whitelist config

This commit is contained in:
Rao 2018-05-16 23:15:33 +05:30 committed by Jiri Pechanec
parent 6e1e5f40ce
commit 15a90548fb
7 changed files with 142 additions and 18 deletions

View File

@ -13,15 +13,16 @@
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types; import java.sql.Types;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.List;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Collections;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -337,6 +338,20 @@ protected void execute() {
logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage()); logger.warn("\t skipping database '{}' due to error reading tables: {}", dbName, e.getMessage());
} }
} }
/* To achieve an ordered snapshot, we would first get a list of Regex tables.whitelist regex patterns
+ and then sort the tableIds list based on the above list
+ */
List<Pattern> tableWhitelistPattern = Strings.listOfRegex(context.config().getString(MySqlConnectorConfig.TABLE_WHITELIST),Pattern.CASE_INSENSITIVE);
List<TableId> tableIdsSorted = new ArrayList<>();
tableWhitelistPattern.forEach(pattern -> {
List<TableId> tablesMatchedByPattern = tableIds.stream().filter(t -> pattern.asPredicate().test(t.toString()))
.collect(Collectors.toList());
tablesMatchedByPattern.forEach(t -> {
if (!tableIdsSorted.contains(t))
tableIdsSorted.add(t);
});
});
tableIds.sort(Comparator.comparing(tableIdsSorted::indexOf));
final Set<String> includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet()); final Set<String> includedDatabaseNames = readableDatabaseNames.stream().filter(filters.databaseFilter()).collect(Collectors.toSet());
logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames); logger.info("\tsnapshot continuing with database(s): {}", includedDatabaseNames);
@ -387,7 +402,7 @@ protected void execute() {
schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges); schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges);
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ... // Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds()); List<TableId> allTableIds = new ArrayList<>(schema.tables().tableIds());
allTableIds.addAll(tableIds); allTableIds.addAll(tableIds);
allTableIds.stream() allTableIds.stream()
.filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped .filter(id -> isRunning()) // ignore all subsequent tables if this reader is stopped

View File

@ -6,14 +6,17 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat; import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
@ -446,6 +449,65 @@ public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
fail("failed to complete the snapshot within 10 seconds"); fail("failed to complete the snapshot within 10 seconds");
} }
} }
@Test
public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Exception{
config = simpleConfig()
.with(MySqlConnectorConfig.TABLE_WHITELIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")
.build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
// Start the snapshot ...
reader.start();
// Poll for records ...
List<SourceRecord> records;
LinkedHashSet<String> tablesInOrder = new LinkedHashSet<>();
LinkedHashSet<String> tablesInOrderExpected = getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
if (record.value() != null)
tablesInOrder.add(getTableNameFromSourceRecord.apply(record));
});
}
assertArrayEquals(tablesInOrder.toArray(), tablesInOrderExpected.toArray());
}
@Test
public void shouldSnapshotTablesInLexicographicalOrder() throws Exception{
config = simpleConfig()
.build();
context = new MySqlTaskContext(config);
context.start();
reader = new SnapshotReader("snapshot", context);
reader.uponCompletion(completed::countDown);
reader.generateInsertEvents();
// Start the snapshot ...
reader.start();
// Poll for records ...
// Testing.Print.enable();
List<SourceRecord> records;
LinkedHashSet<String> tablesInOrder = new LinkedHashSet<>();
LinkedHashSet<String> tablesInOrderExpected = getTableNamesInSpecifiedOrder("Products", "customers", "dbz_342_timetest", "orders", "products_on_hand");
while ((records = reader.poll()) != null) {
records.forEach(record -> {
VerifyRecord.isValid(record);
if (record.value() != null)
tablesInOrder.add(getTableNameFromSourceRecord.apply(record));
});
}
assertArrayEquals(tablesInOrder.toArray(), tablesInOrderExpected.toArray());
}
private Function<SourceRecord, String> getTableNameFromSourceRecord = sourceRecord -> ((Struct) sourceRecord.value()).getStruct("source").getString("table");
private LinkedHashSet<String> getTableNamesInSpecifiedOrder(String ... tables){
LinkedHashSet<String> tablesInOrderExpected = new LinkedHashSet<>();
for (String table : tables)
tablesInOrderExpected.add(table);
return tablesInOrderExpected;
}
public void shouldCreateSnapshotSchemaOnly() throws Exception { public void shouldCreateSnapshotSchemaOnly() throws Exception {
config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).build(); config = simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY).build();

View File

@ -1049,7 +1049,7 @@ public static int isListOfRegex(Configuration config, Field field, ValidationOut
int errors = 0; int errors = 0;
if (value != null) { if (value != null) {
try { try {
Strings.listOfRegex(value, Pattern.CASE_INSENSITIVE); Strings.setOfRegex(value, Pattern.CASE_INSENSITIVE);
} catch (PatternSyntaxException e) { } catch (PatternSyntaxException e) {
problems.accept(field, value, "A comma-separated list of valid regular expressions is expected, but " + e.getMessage()); problems.accept(field, value, "A comma-separated list of valid regular expressions is expected, but " + e.getMessage());
++errors; ++errors;

View File

@ -194,7 +194,7 @@ public static Predicate<String> excludes(String regexPatterns) {
* @throws PatternSyntaxException if the string includes an invalid regular expression * @throws PatternSyntaxException if the string includes an invalid regular expression
*/ */
public static <T> Predicate<T> includes(String regexPatterns, Function<T, String> conversion) { public static <T> Predicate<T> includes(String regexPatterns, Function<T, String> conversion) {
Set<Pattern> patterns = Strings.listOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE); Set<Pattern> patterns = Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE);
return includedInPatterns(patterns, conversion); return includedInPatterns(patterns, conversion);
} }
@ -212,7 +212,7 @@ protected static <T> Predicate<T> includedInPatterns(Collection<Pattern> pattern
* @throws PatternSyntaxException if the string includes an invalid regular expression * @throws PatternSyntaxException if the string includes an invalid regular expression
*/ */
public static Function<String, Optional<Pattern>> matchedBy(String regexPatterns) { public static Function<String, Optional<Pattern>> matchedBy(String regexPatterns) {
return matchedByPattern(Strings.listOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE), Function.identity()); return matchedByPattern(Strings.setOfRegex(regexPatterns, Pattern.CASE_INSENSITIVE), Function.identity());
} }
protected static <T> Function<T, Optional<Pattern>> matchedByPattern(Collection<Pattern> patterns, Function<T, String> conversion) { protected static <T> Function<T, Optional<Pattern>> matchedByPattern(Collection<Pattern> patterns, Function<T, String> conversion) {

View File

@ -35,7 +35,7 @@ public final class ColumnId implements Comparable<ColumnId> {
* @return the predicate function; never null * @return the predicate function; never null
*/ */
public static Map<TableId,Predicate<Column>> filter(String columnBlacklist) { public static Map<TableId,Predicate<Column>> filter(String columnBlacklist) {
Set<ColumnId> columnExclusions = columnBlacklist == null ? null : Strings.listOf(columnBlacklist, ColumnId::parse); Set<ColumnId> columnExclusions = columnBlacklist == null ? null : Strings.setOf(columnBlacklist, ColumnId::parse);
Map<TableId,Set<String>> excludedColumnNamesByTable = new HashMap<>(); Map<TableId,Set<String>> excludedColumnNamesByTable = new HashMap<>();
columnExclusions.forEach(columnId->{ columnExclusions.forEach(columnId->{
excludedColumnNamesByTable.compute(columnId.tableId(), (tableId,columns)->{ excludedColumnNamesByTable.compute(columnId.tableId(), (tableId,columns)->{

View File

@ -47,9 +47,9 @@ public final class Strings {
* @param input the input string * @param input the input string
* @param splitter the function that splits the input into multiple items; may not be null * @param splitter the function that splits the input into multiple items; may not be null
* @param factory the factory for creating string items into filter matches; may not be null * @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null * @return the set of objects included in the list; never null
*/ */
public static <T> Set<T> listOf(String input, Function<String, String[]> splitter, Function<String, T> factory) { public static <T> Set<T> setOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) return Collections.emptySet(); if (input == null) return Collections.emptySet();
Set<T> matches = new HashSet<>(); Set<T> matches = new HashSet<>();
for (String item : splitter.apply(input)) { for (String item : splitter.apply(input)) {
@ -59,16 +59,34 @@ public static <T> Set<T> listOf(String input, Function<String, String[]> splitte
return matches; return matches;
} }
/**
* Generate the list of values that are included in the list.
*
* @param input the input string
* @param splitter the function that splits the input into multiple items; may not be null
* @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null
*/
public static <T> List<T> listOf(String input, Function<String, String[]> splitter, Function<String, T> factory) {
if (input == null) return Collections.emptyList();
List<T> matches = new ArrayList<T>();
for (String item : splitter.apply(input)) {
T obj = factory.apply(item);
if (obj != null) matches.add(obj);
}
return matches;
}
/** /**
* Generate the set of values that are included in the list delimited by the given delimiter. * Generate the set of values that are included in the list delimited by the given delimiter.
* *
* @param input the input string * @param input the input string
* @param delimiter the character used to delimit the items in the input * @param delimiter the character used to delimit the items in the input
* @param factory the factory for creating string items into filter matches; may not be null * @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null * @return the set of objects included in the list; never null
*/ */
public static <T> Set<T> listOf(String input, char delimiter, Function<String, T> factory) { public static <T> Set<T> setOf(String input, char delimiter, Function<String, T> factory) {
return listOf(input, (str) -> str.split("[" + delimiter + "]"), factory); return setOf(input, (str) -> str.split("[" + delimiter + "]"), factory);
} }
/** /**
@ -76,10 +94,22 @@ public static <T> Set<T> listOf(String input, char delimiter, Function<String, T
* *
* @param input the input string * @param input the input string
* @param factory the factory for creating string items into filter matches; may not be null * @param factory the factory for creating string items into filter matches; may not be null
* @return the list of objects included in the list; never null * @return the set of objects included in the list; never null
*/ */
public static <T> Set<T> listOf(String input, Function<String, T> factory) { public static <T> Set<T> setOf(String input, Function<String, T> factory) {
return listOf(input, ',', factory); return setOf(input, ',', factory);
}
/**
* Generate the set of regular expression {@link Pattern}s that are specified in the string containing comma-separated
* regular expressions.
*
* @param input the input string with comma-separated regular expressions. Comma can be escaped with backslash.
* @return the set of regular expression {@link Pattern}s included within the given string; never null
* @throws PatternSyntaxException if the input includes an invalid regular expression
*/
public static Set<Pattern> setOfRegex(String input, int regexFlags) {
return setOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags));
} }
/** /**
@ -91,7 +121,7 @@ public static <T> Set<T> listOf(String input, Function<String, T> factory) {
* @throws PatternSyntaxException if the input includes an invalid regular expression * @throws PatternSyntaxException if the input includes an invalid regular expression
*/ */
public static Set<Pattern> setOfRegex(String input) { public static Set<Pattern> setOfRegex(String input) {
return listOf(input, RegExSplitter::split, Pattern::compile); return setOf(input, RegExSplitter::split, Pattern::compile);
} }
/** /**
@ -105,7 +135,7 @@ public static Set<Pattern> setOfRegex(String input) {
* @throws IllegalArgumentException if bit values other than those corresponding to the defined * @throws IllegalArgumentException if bit values other than those corresponding to the defined
* match flags are set in {@code regexFlags} * match flags are set in {@code regexFlags}
*/ */
public static Set<Pattern> listOfRegex(String input, int regexFlags) { public static List<Pattern> listOfRegex(String input, int regexFlags) {
return listOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags)); return listOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags));
} }

View File

@ -11,6 +11,7 @@
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -276,6 +277,15 @@ public void regexSplit() {
+ "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*", + "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*",
"DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/", "DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/",
"INSERT INTO mysql.rds_heartbeat2\\(.*,.*\\) values \\(.*,.*\\) ON DUPLICATE KEY UPDATE value = .*"); "INSERT INTO mysql.rds_heartbeat2\\(.*,.*\\) values \\(.*,.*\\) ON DUPLICATE KEY UPDATE value = .*");
assertRegexList("a,b", "a", "b");
assertRegexList("a\\,b", "a,b");
assertRegexList("a,b,", "a", "b");
assertRegexList("a,b\\,", "a", "b,");
assertRegexList("a\\\\\\,b", "a\\\\,b");
assertRegexList( "DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/,"
+ "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*",
"DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/",
"INSERT INTO mysql.rds_heartbeat2\\(.*,.*\\) values \\(.*,.*\\) ON DUPLICATE KEY UPDATE value = .*");
} }
@Test(expected = ParsingException.class) @Test(expected = ParsingException.class)
@ -303,4 +313,11 @@ protected void assertRegexSet(String patterns, String... matches) {
.map(Pattern::pattern) .map(Pattern::pattern)
.collect(Collectors.toSet())).containsOnly((Object[])matches); .collect(Collectors.toSet())).containsOnly((Object[])matches);
} }
protected void assertRegexList(String patterns, String... matches) {
List<Pattern> regexList = Strings.listOfRegex(patterns, Pattern.CASE_INSENSITIVE);
assertThat(regexList.stream()
.map(Pattern::pattern)
.collect(Collectors.toList())).isEqualTo(Arrays.asList((Object[])matches));
}
} }