diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java index b84e089da..517ae3847 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTaskContextTest.java @@ -119,6 +119,25 @@ public void shouldCreateTaskFromConfigurationWithWhenNeededSnapshotMode() throws assertThat(context.isSnapshotNeverAllowed()).isEqualTo(false); } + @Test + public void shouldFilterInternalDmlStatementsUsingDefaultFilter() throws Exception { + config = simpleConfig().build(); + context = new MySqlTaskContext(config, new Filters.Builder(config).build(), false, null); + + assertThat(context.ddlFilter().test("INSERT INTO mysql.rds_heartbeat2(name) values ('innodb_txn_key') ON DUPLICATE KEY UPDATE value = 'v'")).isTrue(); + assertThat(context.ddlFilter().test("INSERT INTO mysql.rds_sysinfo(name, value) values ('innodb_txn_key','Sat Jun 13 06:26:02 UTC 2020')")).isTrue(); + assertThat(context.ddlFilter().test("INSERT INTO mysql.rds_monitor(name, value) values ('innodb_txn_key','Sat Jun 13 06:26:02 UTC 2020')")).isTrue(); + assertThat(context.ddlFilter().test("INSERT INTO mysql.rds_monitor(name) values ('innodb_txn_key') ON DUPLICATE KEY UPDATE value = 'v'")).isTrue(); + assertThat(context.ddlFilter().test("DELETE FROM mysql.rds_sysinfo")).isTrue(); + assertThat(context.ddlFilter().test("DELETE FROM mysql.rds_monitor;")).isTrue(); + assertThat(context.ddlFilter().test("FLUSH RELAY LOGS;")).isTrue(); + assertThat(context.ddlFilter().test("SAVEPOINT x")).isTrue(); + // Missing 'ON DUPLICATE ...' clause + assertThat(context.ddlFilter().test("INSERT INTO mysql.rds_heartbeat2(name) values ('innodb_txn_key')")).isFalse(); + // No space after 'SAVEPOINT' + assertThat(context.ddlFilter().test("SAVEPOINT;")).isFalse(); + } + @Test public void shouldUseGtidSetIncludes() throws Exception { config = simpleConfig().with(MySqlConnectorConfig.GTID_SOURCE_INCLUDES, "a,b,c,d.*") diff --git a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java index f6607e5dc..3709288be 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/DatabaseHistory.java @@ -64,6 +64,8 @@ public interface DatabaseHistory { "INSERT INTO mysql.rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," + "DELETE FROM mysql.rds_sysinfo.*," + "INSERT INTO mysql.rds_sysinfo\\(.*\\) values \\(.*\\)," + + "INSERT INTO mysql.rds_monitor\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," + + "INSERT INTO mysql.rds_monitor\\(.*\\) values \\(.*\\)," + "DELETE FROM mysql.rds_monitor.*," + "FLUSH RELAY LOGS.*," + "flush relay logs.*," +