diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index 2c6bc9494..442d94661 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -315,7 +315,7 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem // Note that, unlike with the DB history topic, we don't filter out non-whitelisted tables here // (which writes to the public schema change topic); if required, a second option could be added // for controlling this, too - if (!storeOnlyMonitoredTablesDdl || !changes.isEmpty()) { + if (!storeOnlyMonitoredTablesDdl || ddlChanges.anyMatch(filters.databaseFilter(), filters.tableFilter())) { if (statementConsumer != null) { // We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered @@ -349,17 +349,13 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem // Record the DDL statement so that we can later recover them if needed. We do this _after_ writing the // schema change records so that failure recovery (which is based on of the history) won't lose // schema change records. - try { - if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) { - dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements); - } else { - logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements); - } - } catch (Throwable e) { - throw new ConnectException( - "Error recording the DDL statement(s) in the database history " + dbHistory + ": " + ddlStatements, e); + if (!storeOnlyMonitoredTablesDdl || changes.stream().anyMatch(filters().tableFilter()::test)) { + dbHistory.record(source.partition(), source.offset(), databaseName, ddlStatements); } } + else { + logger.debug("Changes for DDL '{}' were filtered and not recorded in database history", ddlStatements); + } } // Figure out what changed ... @@ -374,4 +370,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem }); return true; } + + + /** + * @return true if only monitored tables should be stored in database history, false if all tables should be stored + */ + public boolean isStoreOnlyMonitoredTablesDdl() { + return storeOnlyMonitoredTablesDdl; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 613deafdc..8ffb161f4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -302,12 +302,12 @@ protected void execute() { mysql.query(sql.get(), rs -> { while (rs.next() && isRunning()) { TableId id = new TableId(dbName, null, rs.getString(1)); - if (createTableFilters.tableFilter().test(id)) { + if ((createTableFilters == filters && shouldRecordTableSchema(schema, filters, id)) || createTableFilters.tableFilter().test(id)) { createTablesMap.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id); } - if (filters.tableFilter().test(id)) { + if (shouldRecordTableSchema(schema, filters, id)) { tableIds.add(id); - logger.info("\t including '{}'", id); + logger.info("\t including '{}' for further processing", id); } else { logger.info("\t '{}' is filtered out, discarding", id); } @@ -481,6 +481,10 @@ protected void execute() { Iterator tableIdIter = tableIds.iterator(); while (tableIdIter.hasNext()) { TableId tableId = tableIdIter.next(); + if (!filters.tableFilter().test(tableId)) { + // Table schema was recorded but the table is filtered out so will not be snapshotted + continue; + } AtomicLong rowNum = new AtomicLong(); if (!isRunning()) { break; @@ -730,6 +734,10 @@ protected void execute() { } } + private boolean shouldRecordTableSchema(final MySqlSchema schema, final Filters filters, TableId id) { + return !schema.isStoreOnlyMonitoredTablesDdl() || filters.tableFilter().test(id); + } + protected void readBinlogPosition(int step, SourceInfo source, JdbcConnection mysql, AtomicReference sql) throws SQLException { if (context.isSchemaOnlyRecoverySnapshot()) { // We are in schema only recovery mode, use the existing binlog position diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/UseStatementParserListener.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/UseStatementParserListener.java index 1ff5ee561..01c64d081 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/UseStatementParserListener.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/UseStatementParserListener.java @@ -35,6 +35,9 @@ public void enterUseStatement(MySqlParser.UseStatementContext ctx) { // right for the current database. String charsetForDb = parser.charsetNameForDatabase().get(dbName); parser.systemVariables().setVariable(MySqlSystemVariables.MySqlScope.SESSION, MySqlSystemVariables.CHARSET_NAME_DATABASE, charsetForDb); + + // Signal that the variable was set ... + parser.signalUseDatabase(ctx); super.enterUseStatement(ctx); } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java index f32e1fdd1..533044579 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java @@ -1385,7 +1385,7 @@ public void shouldParseStatementsWithQuotedIdentifiers() { parser.parse(readFile("ddl/mysql-quoted.ddl"), tables); Testing.print(tables); assertThat(tables.size()).isEqualTo(4); - assertThat(listener.total()).isEqualTo(10); + assertThat(listener.total()).isEqualTo(11); assertThat(tables.forTable("connector_test_ro", null, "products")).isNotNull(); assertThat(tables.forTable("connector_test_ro", null, "products_on_hand")).isNotNull(); assertThat(tables.forTable("connector_test_ro", null, "customers")).isNotNull(); @@ -1397,7 +1397,7 @@ public void shouldParseIntegrationTestSchema() { parser.parse(readFile("ddl/mysql-integration.ddl"), tables); Testing.print(tables); assertThat(tables.size()).isEqualTo(10); - assertThat(listener.total()).isEqualTo(17); + assertThat(listener.total()).isEqualTo(20); } @Test @@ -1690,7 +1690,7 @@ public void shouldParseAlterTableThatChangesMultipleColumns() { public void shouldParseTicketMonsterLiquibaseStatements() { parser.parse(readLines(1, "ddl/mysql-ticketmonster-liquibase.ddl"), tables); assertThat(tables.size()).isEqualTo(7); - assertThat(listener.total()).isEqualTo(16); + assertThat(listener.total()).isEqualTo(17); listener.forEach(this::printEvent); } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 21521696f..76bc53459 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -699,7 +699,8 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLExc .with(MySqlConnectorConfig.DATABASE_WHITELIST, DATABASE.getDatabaseName()) .with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.getDatabaseName() + ".products") .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, DATABASE.getDatabaseName()+".products") - .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName())) + .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName())) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) .with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(FileDatabaseHistory.FILE_PATH, DB_HISTORY_PATH) @@ -747,6 +748,7 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro .with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10) .with(MySqlConnectorConfig.DATABASE_WHITELIST, DATABASE.getDatabaseName()) .with(MySqlConnectorConfig.TABLE_WHITELIST, tables) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, tables) .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products", String.format("SELECT * from %s.products where id>=108 order by id", DATABASE.getDatabaseName())) .with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE+"."+DATABASE.getDatabaseName()+".products_on_hand", String.format("SELECT * from %s.products_on_hand where product_id>=108 order by product_id", DATABASE.getDatabaseName())) @@ -796,8 +798,8 @@ public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throw // Consume the first records due to startup and initialization of the database ... // Testing.Print.enable(); - SourceRecords records = consumeRecordsByTopic(2); - assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(2); + SourceRecords records = consumeRecordsByTopic(1 + 5); + assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5); try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { try (JdbcConnection connection = db.connect()) { @@ -843,12 +845,12 @@ public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedExc } } - SourceRecords records = consumeRecordsByTopic(8); + SourceRecords records = consumeRecordsByTopic(16); final List migrationTestRecords = records.recordsForTopic(DATABASE.topicForTable("migration_test")); assertThat(migrationTestRecords.size()).isEqualTo(1); final SourceRecord record = migrationTestRecords.get(0); assertThat(((Struct) record.key()).getString("mgb_no")).isEqualTo("2"); - assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(6); + assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(14); stopConnector(); } @@ -865,13 +867,15 @@ public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws S .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .build(); + dropDatabases(); + // Start the connector ... start(MySqlConnector.class, config); // Consume the first records due to startup and initialization of the database ... // Testing.Print.enable(); - SourceRecords records = consumeRecordsByTopic(6); - assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5); + SourceRecords records = consumeRecordsByTopic(1 + 1 + 2 + 2 * 4); + assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4); try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { try (JdbcConnection connection = db.connect()) { @@ -914,8 +918,8 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro // Consume the first records due to startup and initialization of the database ... // Testing.Print.enable(); - SourceRecords records = consumeRecordsByTopic(2); - assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(2); + SourceRecords records = consumeRecordsByTopic(6); + assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(5); try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) { try (JdbcConnection connection = db.connect()) { @@ -930,6 +934,57 @@ public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() thro Assertions.assertThat(record.topic()).isEqualTo(DATABASE.topicForTable("customers")); } + @Test + @FixFor("DBZ-683") + public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLException, InterruptedException { + Testing.Files.delete(DB_HISTORY_PATH); + + final String tables = String.format("%s.customers,%s.orders", DATABASE.getDatabaseName(), DATABASE.getDatabaseName()); + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.TABLE_WHITELIST, tables) + .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(MySqlConnectorConfig.DATABASE_WHITELIST, ".*") + .build(); + + dropDatabases(); + + try (MySQLConnection db = MySQLConnection.forTestDatabase("mysql");) { + try (JdbcConnection connection = db.connect()) { + connection.execute( + "CREATE DATABASE non_wh", + "USE non_wh", + "CREATE TABLE t1 (ID INT PRIMARY KEY)" + ); + } + } + + // Start the connector ... + start(MySqlConnector.class, config); + + // Consume the first records due to startup and initialization of the database ... + // Testing.Print.enable(); + // Four tables in database, only two are whitelisted + SourceRecords records = consumeRecordsByTopic(1 + 3 + 2 * 4 + 3 + 2); + assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(1 + 2 + 2 * 4); + stopConnector(); + } + + private void dropDatabases() throws SQLException { + try (MySQLConnection db = MySQLConnection.forTestDatabase("mysql");) { + try (JdbcConnection connection = db.connect()) { + connection.query("SHOW DATABASES", rs -> { + while (rs.next()) { + final String dbName = rs.getString(1); + if (!Filters.isBuiltInDatabase(dbName) && !dbName.equals(DATABASE.getDatabaseName())) { + connection.execute("DROP DATABASE IF EXISTS " + dbName); + } + } + }); + } + } + } + protected static class BinlogPosition { private String binlogFilename; private long binlogPosition; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index 83c2e821d..ce68501f9 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -38,6 +38,7 @@ import io.debezium.embedded.AbstractConnectorTest; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; +import io.debezium.relational.history.DatabaseHistory; import io.debezium.time.ZonedTimestamp; import io.debezium.util.Testing; @@ -860,6 +861,7 @@ public void shouldConsumeDecimalAsStringFromSnapshot() throws SQLException, Inte config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("dbz_147_decimalvalues")) .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) .with(MySqlConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.STRING) .build(); // Start the connector ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlGeometryIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlGeometryIT.java index a6a649554..3533e2eea 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlGeometryIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlGeometryIT.java @@ -43,7 +43,7 @@ public class MySqlGeometryIT extends AbstractConnectorTest { @Before public void beforeEach() { stopConnector(); - databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("emptydb").getMySqlVersion()); + databaseDifferences = databaseGeoDifferences(MySQLConnection.forTestDatabase("mysql").getMySqlVersion()); DATABASE = new UniqueDatabase("geometryit", databaseDifferences.geometryDatabaseName()) .withDbHistoryPath(DB_HISTORY_PATH); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java index d437ef8e3..eb678e48d 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlDefaultValueIT.java @@ -36,6 +36,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.history.DatabaseHistory; import io.debezium.time.MicroTimestamp; import io.debezium.time.Timestamp; import io.debezium.time.ZonedTimestamp; @@ -517,6 +518,7 @@ public void dateAndTimeTest() throws InterruptedException { config = DATABASE.defaultConfig() .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) .with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE")) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) .build(); start(MySqlConnector.class, config); @@ -582,6 +584,7 @@ public void timeTypeWithConnectMode() throws InterruptedException { .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) .with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("DATE_TIME_TABLE")) .with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) .build(); start(MySqlConnector.class, config); diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java index bc444a964..2ea323ece 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java @@ -125,7 +125,11 @@ protected void signalChangeEvent(DdlParserListener.Event event) { } protected void signalSetVariable(String variableName, String variableValue, String statement) { - signalChangeEvent(new DdlParserListener.SetVariableEvent(variableName, variableValue, statement)); + signalChangeEvent(new DdlParserListener.SetVariableEvent(variableName, variableValue, currentSchema, statement)); + } + + protected void signalUseDatabase(String statement) { + signalChangeEvent(new DdlParserListener.DatabaseSwitchedEvent(currentSchema, statement)); } /** diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java index 43e5ca230..bc9ba66b4 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlChanges.java @@ -9,8 +9,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Predicate; import io.debezium.annotation.NotThreadSafe; +import io.debezium.relational.TableId; /** * A {@link DdlParserListener} that accumulates changes, allowing them to be consumed in the same order by database. @@ -129,10 +131,12 @@ protected String getDatabase(Event event) { case CREATE_DATABASE: case ALTER_DATABASE: case DROP_DATABASE: + case USE_DATABASE: DatabaseEvent dbEvent = (DatabaseEvent) event; return dbEvent.databaseName(); case SET_VARIABLE: - return ""; + SetVariableEvent varEvent = (SetVariableEvent) event; + return varEvent.databaseName().orElse(""); } assert false : "Should never happen"; return null; @@ -162,4 +166,15 @@ public static interface DatabaseStatementConsumer { public static interface DatabaseStatementStringConsumer { void consume(String databaseName, String ddlStatements); } + + public boolean anyMatch(Predicate databaseFilter, Predicate tableFilter) { + return events.stream().anyMatch(event -> + (event instanceof DatabaseEvent) && databaseFilter.test(((DatabaseEvent) event).databaseName()) + || (event instanceof TableEvent) && tableFilter.test(((TableEvent) event).tableId()) + || (event instanceof SetVariableEvent) && ( + !((SetVariableEvent) event).databaseName().isPresent() + || databaseFilter.test(((SetVariableEvent) event).databaseName().get()) + ) + ); + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserListener.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserListener.java index dc862bedf..597664fe1 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserListener.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserListener.java @@ -5,6 +5,8 @@ */ package io.debezium.relational.ddl; +import java.util.Optional; + import io.debezium.annotation.Immutable; import io.debezium.relational.TableId; @@ -33,7 +35,7 @@ public interface DdlParserListener { public static enum EventType { CREATE_TABLE, ALTER_TABLE, DROP_TABLE, TRUNCATE_TABLE, CREATE_INDEX, DROP_INDEX, - CREATE_DATABASE, ALTER_DATABASE, DROP_DATABASE, + CREATE_DATABASE, ALTER_DATABASE, DROP_DATABASE, USE_DATABASE, SET_VARIABLE, } @@ -293,6 +295,16 @@ public DatabaseDroppedEvent(String databaseName, String ddlStatement) { } } + /** + * An event describing the switching of a database. + */ + @Immutable + public static class DatabaseSwitchedEvent extends DatabaseEvent { + public DatabaseSwitchedEvent(String databaseName, String ddlStatement) { + super(EventType.USE_DATABASE, databaseName, ddlStatement); + } + } + /** * An event describing the setting of a variable. */ @@ -301,11 +313,13 @@ public static class SetVariableEvent extends Event { private final String variableName; private final String value; + private final String databaseName; - public SetVariableEvent(String variableName, String value, String ddlStatement) { + public SetVariableEvent(String variableName, String value, String currentDatabaseName, String ddlStatement) { super(EventType.SET_VARIABLE, ddlStatement); this.variableName = variableName; this.value = value; + this.databaseName = currentDatabaseName; } /** @@ -323,7 +337,11 @@ public String variableName() { public String variableValue() { return value; } - + + public Optional databaseName() { + return Optional.ofNullable(databaseName); + } + @Override public String toString() { return statement(); diff --git a/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java b/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java index 6812ec05d..d1944da8a 100644 --- a/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java +++ b/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java @@ -197,6 +197,10 @@ public void signalSetVariable(String variableName, String variableValue, ParserR signalSetVariable(variableName, variableValue, getText(ctx)); } + public void signalUseDatabase(ParserRuleContext ctx) { + signalUseDatabase(getText(ctx)); + } + /** * Signal a create database event to ddl changes listener. *