From 137b9f6d4d8a3ae97dfd1e1ecdede637880fce5b Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Mon, 11 Apr 2016 14:52:02 -0500 Subject: [PATCH] DBZ-38 Changed the DDL parser framework to notify listeners as statements are applied. --- .../connector/mysql/MySqlDdlParser.java | 36 +++-- .../connector/mysql/MySqlDdlParserTest.java | 16 ++- .../io/debezium/relational/ddl/DdlParser.java | 113 +++++++++++++++- .../relational/ddl/DdlParserSql2003.java | 5 + .../ddl/SimpleDdlParserListener.java | 126 ++++++++++++++++++ 5 files changed, 283 insertions(+), 13 deletions(-) create mode 100644 debezium-core/src/test/java/io/debezium/relational/ddl/SimpleDdlParserListener.java diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java index 1557390dd..a9cef5b98 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java @@ -192,6 +192,7 @@ protected void parseCreateTable(Marker start) { // Update the table definition ... databaseTables.overwriteTable(table.create()); + signal(table.tableId(), Action.CREATE, start); debugParsed(start); } @@ -677,6 +678,7 @@ protected void parseCreateView(Marker start) { // Update the table definition ... databaseTables.overwriteTable(table.create()); + signal(table.tableId(), Action.CREATE, start); debugParsed(start); } @@ -685,7 +687,7 @@ protected void parseCreateIndex(Marker start) { // This is a unique index, and we can mark the index's columns as the primary key iff there is not already // a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.) tokens.consume("INDEX"); - tokens.consume(); // index name + String indexName = tokens.consume(); // index name if (tokens.canConsume("USING")) { parseIndexType(start); } @@ -697,6 +699,7 @@ protected void parseCreateIndex(Marker start) { List names = parseIndexColumnNames(start); if (table.columns().stream().allMatch(Column::isRequired)) { databaseTables.overwriteTable(table.setPrimaryKeyNames(names).create()); + signalIndexChange(indexName, table.tableId(), Action.CREATE, start); } } } @@ -736,6 +739,7 @@ protected void parseAlterTable(Marker start) { parsePartitionOptions(start, table); } databaseTables.overwriteTable(table.create()); + signal(table.tableId(), Action.ALTER, start); if (newTableName.get() != null) { // the table was renamed ... databaseTables.renameTable(tableId, newTableName.get()); @@ -882,6 +886,8 @@ protected void parseDrop(Marker marker) { parseDropTable(marker); } else if (tokens.matches("VIEW")) { parseDropView(marker); + } else if (tokens.matches("INDEX")) { + parseDropIndex(marker); } else { parseDropUnknown(marker); } @@ -891,11 +897,12 @@ protected void parseDropTable(Marker start) { tokens.canConsume("TEMPORARY"); tokens.consume("TABLE"); tokens.canConsume("IF", "EXISTS"); - databaseTables.removeTable(parseQualifiedTableName(start)); - while (tokens.canConsume(',')) { - databaseTables.removeTable(parseQualifiedTableName(start)); - } + List ids = parseQualifiedTableNames(start); tokens.canConsumeAnyOf("RESTRICT", "CASCADE"); + ids.forEach(tableId->{ + databaseTables.removeTable(tableId); + signal(tableId, Action.DROP, start); + }); debugParsed(start); } @@ -907,11 +914,22 @@ protected void parseDropView(Marker start) { } tokens.consume("VIEW"); tokens.canConsume("IF", "EXISTS"); - databaseTables.removeTable(parseQualifiedTableName(start)); - while (tokens.canConsume(',')) { - databaseTables.removeTable(parseQualifiedTableName(start)); - } + List ids = parseQualifiedTableNames(start); tokens.canConsumeAnyOf("RESTRICT", "CASCADE"); + ids.forEach(tableId->{ + databaseTables.removeTable(tableId); + signal(tableId, Action.DROP, start); + }); + debugParsed(start); + } + + protected void parseDropIndex(Marker start) { + tokens.consume("INDEX"); + String indexName = tokens.consume(); // index name + tokens.consume("ON"); + TableId tableId = parseQualifiedTableName(start); + consumeRemainingStatement(start); + signalIndexChange(indexName,tableId,Action.DROP,start); debugParsed(start); } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java index 857560623..40567aaf4 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java @@ -22,18 +22,22 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlParser; +import io.debezium.relational.ddl.SimpleDdlParserListener; import io.debezium.util.IoUtil; import io.debezium.util.Testing; public class MySqlDdlParserTest { - + private DdlParser parser; private Tables tables; + private SimpleDdlParserListener listener; @Before public void beforeEach() { parser = new MySqlDdlParser(); tables = new Tables(); + listener = new SimpleDdlParserListener(); + parser.addListener(listener); } @Test @@ -46,6 +50,8 @@ public void shouldParseMultipleStatements() { + "DROP TABLE foo;" + System.lineSeparator(); parser.parse(ddl, tables); assertThat(tables.size()).isEqualTo(0); // table created and dropped + listener.next().assertCreateTable(TableId.parse("foo")).assertDdlStatement().startsWith("CREATE TABLE foo ("); + listener.next().assertDropTable(TableId.parse("foo")).assertDdlStatement().isEqualTo("DROP TABLE foo"); } @Test @@ -118,23 +124,31 @@ public void shouldParseCreateTableStatementWithSignedTypes() { public void shouldParseGrantStatement() { String ddl = "GRANT ALL PRIVILEGES ON `mysql`.* TO 'mysqluser'@'%'"; parser.parse(ddl, tables); + assertThat(tables.size()).isEqualTo(0); // no tables + assertThat(listener.total()).isEqualTo(0); } @Test public void shouldParseCreateStatements() { parser.parse(readFile("ddl/mysql-test-create.ddl"), tables); Testing.print(tables); + assertThat(tables.size()).isEqualTo(57); // no tables + assertThat(listener.total()).isEqualTo(88); } @Test public void shouldParseTestStatements() { parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables); Testing.print(tables); + assertThat(tables.size()).isEqualTo(6); // no tables + assertThat(listener.total()).isEqualTo(24); } @Test public void shouldParseSomeLinesFromCreateStatements() { parser.parse(readLines(189,"ddl/mysql-test-create.ddl"), tables); + assertThat(tables.size()).isEqualTo(39); // no tables + assertThat(listener.total()).isEqualTo(68); } protected String readFile( String classpathResource ) { diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java index d7553d74c..7ed8a2e16 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParser.java @@ -11,8 +11,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,34 @@ */ @NotThreadSafe public class DdlParser { + + public static enum Action { + CREATE, + ALTER, + DROP + } + + /** + * A listener that will be called with each change operation. + */ + public static interface Listener { + /** + * Handle an event that changes a table definition. + * @param tableId the identifier of the affected table; never null + * @param action the operation's action on the table + * @param ddlStatement the DDL statement + */ + void handleTableEvent( TableId tableId, Action action, String ddlStatement ); + + /** + * Handle an event that changes a table definition. + * @param indexName the name of the affected index; never null + * @param tableId the identifier of the associated table; never null + * @param action the operation's action on the index + * @param ddlStatement the DDL statement + */ + void handleIndexEvent( String indexName, TableId tableId, Action action, String ddlStatement ); + } protected static interface TokenSet { void add(String token); @@ -58,6 +89,7 @@ default void add(String firstToken, String... additionalTokens) { protected final DataTypeParser dataTypeParser = new DataTypeParser(); protected Tables databaseTables; protected TokenStream tokens; + private final List listeners = new CopyOnWriteArrayList<>(); /** * Create a new parser that uses the supplied {@link DataTypeParser}, but that does not include view definitions. @@ -81,6 +113,31 @@ public DdlParser(String terminator, boolean includeViews) { initializeKeywords(keywords::add); initializeStatementStarts(statementStarts::add); } + + /** + * Add a listener. This method should not be called more than once with the same listener object, since the result will be + * that object will be called multiple times for each event. + * @param listener the listener; if null nothing is done + */ + public void addListener( Listener listener ) { + if ( listener != null ) listeners.add(listener); + } + + /** + * Remove an existing listener. + * @param listener the listener; if null nothing is done + * @return {@code true} if the listener was removed, or {@code false} otherwise + */ + public boolean removeListener( Listener listener ) { + return listener != null ? listeners.remove(listener) : false; + } + + /** + * Remove all existing listeners. + */ + public void removeListeners() { + listeners.clear(); + } protected void initializeDataTypes(DataTypeParser dataTypeParser) { } @@ -158,6 +215,25 @@ protected TableId parseQualifiedTableName(Marker start) { return resolveTableId(currentSchema(), name); } + /** + * Parse the next tokens for one or more comma-separated qualified table names. This method uses the schema name that appears in the + * token stream, or if none is found the {@link #currentSchema()}, and then calls {@link #resolveTableId(String, String)} with + * the values. + * + * @param start the start of the statement + * @return the resolved {@link TableId} + */ + protected List parseQualifiedTableNames(Marker start) { + List ids = new LinkedList<>(); + TableId id = parseQualifiedTableName(start); + if ( id != null ) ids.add(id); + while (tokens.canConsume(',')) { + id = parseQualifiedTableName(start); + if ( id != null ) ids.add(id); + } + return ids; + } + /** * Create a {@link TableId} from the supplied schema and table names. By default, this method uses the supplied schema name * as the TableId's catalog, which often matches the catalog name in JDBC database metadata. @@ -302,20 +378,51 @@ protected void parseDrop(Marker marker) { protected void parseUnknownStatement(Marker marker) { consumeStatement(); } - + + /** + * Signal that an action was applied to the identified table. + * @param tableId the identifier of the table that is affected; may not be null + * @param action the type of operation on the table; may not be null + * @param statementStart the start of the statement; may not be null + */ + protected void signal( TableId tableId, Action action, Marker statementStart ) { + if ( !listeners.isEmpty() ) { + String statement = statement(statementStart); + listeners.forEach(listener -> listener.handleTableEvent(tableId,action,statement)); + } + } + + /** + * Signal that an action was applied to the identified table. + * @param indexName the name of the affected index; may not be null + * @param tableId the identifier of the associated table; may not be null + * @param action the type of operation on the index; may not be null + * @param statementStart the start of the statement; may not be null + */ + protected void signalIndexChange( String indexName, TableId tableId, Action action, Marker statementStart ) { + if ( !listeners.isEmpty() ) { + String statement = statement(statementStart); + listeners.forEach(listener -> listener.handleIndexEvent(indexName, tableId,action,statement)); + } + } + protected void debugParsed(Marker statementStart) { if (logger.isTraceEnabled()) { - String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); + String statement = statement(statementStart); logger.trace("PARSED: {}", statement); } } protected void debugSkipped(Marker statementStart) { if (logger.isTraceEnabled()) { - String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); + String statement = statement(statementStart); logger.trace("SKIPPED: {}", statement); } } + + protected String statement( Marker statementStart ) { + return removeLineFeeds(tokens.getContentFrom(statementStart)); + } private String removeLineFeeds(String input) { return input.replaceAll("[\\n|\\t]", ""); diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java index 8d9f3f9db..6f77e3f0a 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/DdlParserSql2003.java @@ -173,6 +173,7 @@ protected void parseCreateTable(Marker start) { // Update the table definition ... databaseTables.overwriteTable(table.create()); + signal(tableId, Action.CREATE, start); } protected void parseAsSubqueryClause(Marker start, TableEditor table) { @@ -539,6 +540,7 @@ protected void parseCreateView(Marker start) { // Update the table definition ... databaseTables.overwriteTable(table.create()); + signal(tableId, Action.CREATE, start); } protected void parseCreateUnknown(Marker start) { @@ -595,6 +597,7 @@ protected void parseAlterTable(Marker start) { } databaseTables.overwriteTable(table.create()); + signal(tableId, Action.ALTER, start); } protected void parseDropColumn(Marker start, TableEditor table) { @@ -667,6 +670,7 @@ protected void parseDropTable(Marker start) { databaseTables.removeTable(tableId); // ignore the rest ... consumeRemainingStatement(start); + signal(tableId, Action.DROP, start); } protected void parseDropView(Marker start) { @@ -676,6 +680,7 @@ protected void parseDropView(Marker start) { databaseTables.removeTable(tableId); // ignore the rest ... consumeRemainingStatement(start); + signal(tableId, Action.DROP, start); } protected void parseDropUnknown(Marker start) { diff --git a/debezium-core/src/test/java/io/debezium/relational/ddl/SimpleDdlParserListener.java b/debezium-core/src/test/java/io/debezium/relational/ddl/SimpleDdlParserListener.java new file mode 100644 index 000000000..85444e67e --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/relational/ddl/SimpleDdlParserListener.java @@ -0,0 +1,126 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.relational.ddl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.fest.assertions.StringAssert; + +import static org.fest.assertions.Assertions.assertThat; + +import io.debezium.relational.TableId; +import io.debezium.relational.ddl.DdlParser.Action; +import io.debezium.relational.ddl.DdlParser.Listener; + +/** + * @author Randall Hauch + * + */ +public class SimpleDdlParserListener implements Listener { + + public static final class Event { + public final TableId tableId; + public final String indexName; + public final Action action; + public final String ddlStatement; + + public Event(TableId tableId, Action action, String ddlStatement) { + this(null, tableId, action, ddlStatement); + } + + public Event(String indexName, TableId tableId, Action action, String ddlStatement) { + this.indexName = indexName; + this.tableId = tableId; + this.action = action; + this.ddlStatement = ddlStatement; + } + + public StringAssert assertDdlStatement() { + return assertThat(ddlStatement); + } + + public Event assertTableId(TableId id) { + assertThat(tableId).isEqualTo(id); + return this; + } + + public Event assertCreateTable(TableId id) { + assertThat(tableId).isEqualTo(id); + assertThat(action).isEqualTo(Action.CREATE); + return this; + } + + public Event assertDropTable(TableId id) { + assertThat(tableId).isEqualTo(id); + assertThat(action).isEqualTo(Action.DROP); + return this; + } + + public Event assertAlterTable(TableId id) { + assertThat(tableId).isEqualTo(id); + assertThat(action).isEqualTo(Action.ALTER); + return this; + } + + public Event assertCreateIndex(String indexName, TableId id) { + assertThat(indexName).isEqualTo(indexName); + assertThat(tableId).isEqualTo(id); + assertThat(action).isEqualTo(Action.CREATE); + return this; + } + + public Event assertDropIndex(String indexName, TableId id) { + assertThat(indexName).isEqualTo(indexName); + assertThat(tableId).isEqualTo(id); + assertThat(action).isEqualTo(Action.DROP); + return this; + } + } + + private final AtomicLong counter = new AtomicLong(); + private final List events = new ArrayList<>(); + + public SimpleDdlParserListener() { + } + + @Override + public void handleTableEvent(TableId tableId, Action action, String ddlStatement) { + events.add(new Event(tableId, action, ddlStatement)); + counter.incrementAndGet(); + } + + @Override + public void handleIndexEvent(String indexName, TableId tableId, Action action, String ddlStatement) { + events.add(new Event(indexName, tableId, action, ddlStatement)); + counter.incrementAndGet(); + } + + /** + * Get the total number of events that have been handled by this listener. + * @return the total number of events + */ + public int total() { + return counter.intValue(); + } + + /** + * Get the number of events currently held by this listener that have yet to be {@link #next() processed}. + * @return the number of remaining events + */ + public int remaining() { + return events.size(); + } + + /** + * Get the next event seen by this listener. + * @return the next event, or null if there is no event + */ + public Event next() { + return events.isEmpty() ? null : events.remove(0); + } +}