DBZ-469 Filter our RDS heartbeat INSERT statements
This commit is contained in:
parent
d4daf639f7
commit
86d9e109fc
@ -48,7 +48,11 @@ public interface DatabaseHistory {
|
||||
public static final Field DDL_FILTER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "ddl.filter")
|
||||
.withDisplayName("DDL filter")
|
||||
.withType(Type.STRING)
|
||||
.withDefault("DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/")
|
||||
.withDefault(
|
||||
"DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/," +
|
||||
// Filter out RDS heartbeat statements, see DBZ-469
|
||||
"INSERT INTO mysql.rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*"
|
||||
)
|
||||
.withWidth(Width.LONG)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("A regular expression to filter out a subset of incoming DDL statements "
|
||||
|
@ -26,6 +26,11 @@
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
|
||||
import io.debezium.annotation.ThreadSafe;
|
||||
import io.debezium.text.ParsingException;
|
||||
import io.debezium.text.TokenStream;
|
||||
import io.debezium.text.TokenStream.CharacterStream;
|
||||
import io.debezium.text.TokenStream.Tokenizer;
|
||||
import io.debezium.text.TokenStream.Tokens;
|
||||
|
||||
/**
|
||||
* String-related utility methods.
|
||||
@ -81,19 +86,19 @@ public static <T> Set<T> listOf(String input, Function<String, T> factory) {
|
||||
* Generate the set of regular expression {@link Pattern}s that are specified in the string containing comma-separated
|
||||
* regular expressions.
|
||||
*
|
||||
* @param input the input string with comma-separated regular expressions
|
||||
* @param input the input string with comma-separated regular expressions. Comma can be escaped with backslash.
|
||||
* @return the list of regular expression {@link Pattern}s included in the list; never null
|
||||
* @throws PatternSyntaxException if the input includes an invalid regular expression
|
||||
*/
|
||||
public static Set<Pattern> listOfRegex(String input) {
|
||||
return listOf(input, ',', Pattern::compile);
|
||||
return listOf(input, RegExSplitter::split, Pattern::compile);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the set of regular expression {@link Pattern}s that are specified in the string containing comma-separated
|
||||
* regular expressions.
|
||||
*
|
||||
* @param input the input string with comma-separated regular expressions
|
||||
* @param input the input string with comma-separated regular expressions. . Comma can be escaped with backslash.
|
||||
* @param regexFlags the flags for {@link Pattern#compile(String, int) compiling regular expressions}
|
||||
* @return the list of regular expression {@link Pattern}s included in the list; never null
|
||||
* @throws PatternSyntaxException if the input includes an invalid regular expression
|
||||
@ -101,7 +106,7 @@ public static Set<Pattern> listOfRegex(String input) {
|
||||
* match flags are set in {@code regexFlags}
|
||||
*/
|
||||
public static Set<Pattern> listOfRegex(String input, int regexFlags) {
|
||||
return listOf(input, ',', (str) -> Pattern.compile(str, regexFlags));
|
||||
return listOf(input, RegExSplitter::split, (str) -> Pattern.compile(str, regexFlags));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -854,4 +859,48 @@ private static Character deriveQuotingChar(String identifierPart) {
|
||||
|
||||
private Strings() {
|
||||
}
|
||||
|
||||
/**
|
||||
* A tokenization class used to split a comma-separated list of regular expressions.
|
||||
* If a comma is part of expression then it can be prepended with <code>'\'</code> so
|
||||
* it will not act as a separator.
|
||||
*/
|
||||
private static class RegExSplitter implements Tokenizer {
|
||||
|
||||
public static String[] split(String identifier) {
|
||||
TokenStream stream = new TokenStream(identifier, new RegExSplitter(), true);
|
||||
stream.start();
|
||||
|
||||
List<String> parts = new ArrayList<>();
|
||||
|
||||
while(stream.hasNext()) {
|
||||
final String part = stream.consume();
|
||||
if (part.length() == 0) {
|
||||
continue;
|
||||
}
|
||||
parts.add(part.replace("\\,", ","));
|
||||
}
|
||||
|
||||
return parts.toArray(new String[parts.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tokenize(CharacterStream input, Tokens tokens) throws ParsingException {
|
||||
int tokenStart = 0;
|
||||
while (input.hasNext()) {
|
||||
char c = input.next();
|
||||
// Escape sequence
|
||||
if (c == '\\') {
|
||||
if (!input.hasNext()) {
|
||||
throw new ParsingException(input.position(input.index()), "Unterminated escape sequence at the end of the string");
|
||||
}
|
||||
input.next();
|
||||
} else if (c == ',') {
|
||||
tokens.addToken(input.position(tokenStart), tokenStart, input.index());
|
||||
tokenStart = input.index() + 1;
|
||||
}
|
||||
}
|
||||
tokens.addToken(input.position(tokenStart), tokenStart, input.index() + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,9 +13,13 @@
|
||||
import java.io.PrintStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import io.debezium.text.ParsingException;
|
||||
|
||||
/**
|
||||
* @author Randall Hauch
|
||||
* @author Horia Chiorean
|
||||
@ -259,6 +263,24 @@ public void hexStringToByteArrayShouldReturnCorrectByteArray() {
|
||||
assertThat(Strings.hexStringToByteArray("CAFEBABE")).isEqualTo(new byte[] { -54, -2, -70, -66 });
|
||||
}
|
||||
|
||||
@Test
|
||||
public void regexSplit() {
|
||||
assertRegexList("a,b", 2, "a", "b");
|
||||
assertRegexList("a\\,b", 1, "a,b");
|
||||
assertRegexList("a,b,", 2, "a", "b");
|
||||
assertRegexList("a,b\\,", 2, "a", "b,");
|
||||
assertRegexList(
|
||||
"DROP TEMPORARY TABLE IF EXISTS .+ /\\\\* generated by server \\\\*/,"
|
||||
+ "INSERT INTO mysql.rds_heartbeat2\\(.*\\,.*\\) values \\(.*\\,.*\\) ON DUPLICATE KEY UPDATE value = .*" ,
|
||||
2,
|
||||
"INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1510678117058) ON DUPLICATE KEY UPDATE value = 1510678117058");
|
||||
}
|
||||
|
||||
@Test(expected = ParsingException.class)
|
||||
public void regexSplitWrongEscape() {
|
||||
Strings.listOfRegex("a,b\\,c\\");
|
||||
}
|
||||
|
||||
protected void assertReplacement(String before, Map<String, String> replacements, String after) {
|
||||
String result = Strings.replaceVariables(before, replacements::get);
|
||||
assertThat(result).isEqualTo(after);
|
||||
@ -271,4 +293,15 @@ protected Map<String, String> vars(String var1, String val1) {
|
||||
protected Map<String, String> vars(String var1, String val1, String var2, String val2) {
|
||||
return Collect.hashMapOf(var1, val1, var2, val2);
|
||||
}
|
||||
|
||||
protected void assertRegexList(String patterns, int count, String... matches) {
|
||||
final Set<Pattern> regexSet = Strings.listOfRegex(patterns);
|
||||
System.err.println(regexSet);
|
||||
assertThat(regexSet)
|
||||
.describedAs("Correct number of expressions")
|
||||
.hasSize(count);
|
||||
for (String m: matches) {
|
||||
assert regexSet.stream().anyMatch(p -> p.matcher(m).matches()) : "Test string '" + m + "' should match";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user