DBZ-38 Changed the DDL parser framework to notify listeners as statements are applied.

This commit is contained in:
Randall Hauch 2016-04-11 14:52:02 -05:00
parent 453a3730f6
commit 137b9f6d4d
5 changed files with 283 additions and 13 deletions

View File

@ -192,6 +192,7 @@ protected void parseCreateTable(Marker start) {
// Update the table definition ... // Update the table definition ...
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.CREATE, start);
debugParsed(start); debugParsed(start);
} }
@ -677,6 +678,7 @@ protected void parseCreateView(Marker start) {
// Update the table definition ... // Update the table definition ...
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.CREATE, start);
debugParsed(start); debugParsed(start);
} }
@ -685,7 +687,7 @@ protected void parseCreateIndex(Marker start) {
// This is a unique index, and we can mark the index's columns as the primary key iff there is not already // This is a unique index, and we can mark the index's columns as the primary key iff there is not already
// a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.) // a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.)
tokens.consume("INDEX"); tokens.consume("INDEX");
tokens.consume(); // index name String indexName = tokens.consume(); // index name
if (tokens.canConsume("USING")) { if (tokens.canConsume("USING")) {
parseIndexType(start); parseIndexType(start);
} }
@ -697,6 +699,7 @@ protected void parseCreateIndex(Marker start) {
List<String> names = parseIndexColumnNames(start); List<String> names = parseIndexColumnNames(start);
if (table.columns().stream().allMatch(Column::isRequired)) { if (table.columns().stream().allMatch(Column::isRequired)) {
databaseTables.overwriteTable(table.setPrimaryKeyNames(names).create()); databaseTables.overwriteTable(table.setPrimaryKeyNames(names).create());
signalIndexChange(indexName, table.tableId(), Action.CREATE, start);
} }
} }
} }
@ -736,6 +739,7 @@ protected void parseAlterTable(Marker start) {
parsePartitionOptions(start, table); parsePartitionOptions(start, table);
} }
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.ALTER, start);
if (newTableName.get() != null) { if (newTableName.get() != null) {
// the table was renamed ... // the table was renamed ...
databaseTables.renameTable(tableId, newTableName.get()); databaseTables.renameTable(tableId, newTableName.get());
@ -882,6 +886,8 @@ protected void parseDrop(Marker marker) {
parseDropTable(marker); parseDropTable(marker);
} else if (tokens.matches("VIEW")) { } else if (tokens.matches("VIEW")) {
parseDropView(marker); parseDropView(marker);
} else if (tokens.matches("INDEX")) {
parseDropIndex(marker);
} else { } else {
parseDropUnknown(marker); parseDropUnknown(marker);
} }
@ -891,11 +897,12 @@ protected void parseDropTable(Marker start) {
tokens.canConsume("TEMPORARY"); tokens.canConsume("TEMPORARY");
tokens.consume("TABLE"); tokens.consume("TABLE");
tokens.canConsume("IF", "EXISTS"); tokens.canConsume("IF", "EXISTS");
databaseTables.removeTable(parseQualifiedTableName(start)); List<TableId> ids = parseQualifiedTableNames(start);
while (tokens.canConsume(',')) {
databaseTables.removeTable(parseQualifiedTableName(start));
}
tokens.canConsumeAnyOf("RESTRICT", "CASCADE"); tokens.canConsumeAnyOf("RESTRICT", "CASCADE");
ids.forEach(tableId->{
databaseTables.removeTable(tableId);
signal(tableId, Action.DROP, start);
});
debugParsed(start); debugParsed(start);
} }
@ -907,11 +914,22 @@ protected void parseDropView(Marker start) {
} }
tokens.consume("VIEW"); tokens.consume("VIEW");
tokens.canConsume("IF", "EXISTS"); tokens.canConsume("IF", "EXISTS");
databaseTables.removeTable(parseQualifiedTableName(start)); List<TableId> ids = parseQualifiedTableNames(start);
while (tokens.canConsume(',')) {
databaseTables.removeTable(parseQualifiedTableName(start));
}
tokens.canConsumeAnyOf("RESTRICT", "CASCADE"); tokens.canConsumeAnyOf("RESTRICT", "CASCADE");
ids.forEach(tableId->{
databaseTables.removeTable(tableId);
signal(tableId, Action.DROP, start);
});
debugParsed(start);
}
protected void parseDropIndex(Marker start) {
tokens.consume("INDEX");
String indexName = tokens.consume(); // index name
tokens.consume("ON");
TableId tableId = parseQualifiedTableName(start);
consumeRemainingStatement(start);
signalIndexChange(indexName,tableId,Action.DROP,start);
debugParsed(start); debugParsed(start);
} }

View File

@ -22,18 +22,22 @@
import io.debezium.relational.TableId; import io.debezium.relational.TableId;
import io.debezium.relational.Tables; import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.SimpleDdlParserListener;
import io.debezium.util.IoUtil; import io.debezium.util.IoUtil;
import io.debezium.util.Testing; import io.debezium.util.Testing;
public class MySqlDdlParserTest { public class MySqlDdlParserTest {
private DdlParser parser; private DdlParser parser;
private Tables tables; private Tables tables;
private SimpleDdlParserListener listener;
@Before @Before
public void beforeEach() { public void beforeEach() {
parser = new MySqlDdlParser(); parser = new MySqlDdlParser();
tables = new Tables(); tables = new Tables();
listener = new SimpleDdlParserListener();
parser.addListener(listener);
} }
@Test @Test
@ -46,6 +50,8 @@ public void shouldParseMultipleStatements() {
+ "DROP TABLE foo;" + System.lineSeparator(); + "DROP TABLE foo;" + System.lineSeparator();
parser.parse(ddl, tables); parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped assertThat(tables.size()).isEqualTo(0); // table created and dropped
listener.next().assertCreateTable(TableId.parse("foo")).assertDdlStatement().startsWith("CREATE TABLE foo (");
listener.next().assertDropTable(TableId.parse("foo")).assertDdlStatement().isEqualTo("DROP TABLE foo");
} }
@Test @Test
@ -118,23 +124,31 @@ public void shouldParseCreateTableStatementWithSignedTypes() {
public void shouldParseGrantStatement() { public void shouldParseGrantStatement() {
String ddl = "GRANT ALL PRIVILEGES ON `mysql`.* TO 'mysqluser'@'%'"; String ddl = "GRANT ALL PRIVILEGES ON `mysql`.* TO 'mysqluser'@'%'";
parser.parse(ddl, tables); parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // no tables
assertThat(listener.total()).isEqualTo(0);
} }
@Test @Test
public void shouldParseCreateStatements() { public void shouldParseCreateStatements() {
parser.parse(readFile("ddl/mysql-test-create.ddl"), tables); parser.parse(readFile("ddl/mysql-test-create.ddl"), tables);
Testing.print(tables); Testing.print(tables);
assertThat(tables.size()).isEqualTo(57); // no tables
assertThat(listener.total()).isEqualTo(88);
} }
@Test @Test
public void shouldParseTestStatements() { public void shouldParseTestStatements() {
parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables); parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables);
Testing.print(tables); Testing.print(tables);
assertThat(tables.size()).isEqualTo(6); // no tables
assertThat(listener.total()).isEqualTo(24);
} }
@Test @Test
public void shouldParseSomeLinesFromCreateStatements() { public void shouldParseSomeLinesFromCreateStatements() {
parser.parse(readLines(189,"ddl/mysql-test-create.ddl"), tables); parser.parse(readLines(189,"ddl/mysql-test-create.ddl"), tables);
assertThat(tables.size()).isEqualTo(39); // no tables
assertThat(listener.total()).isEqualTo(68);
} }
protected String readFile( String classpathResource ) { protected String readFile( String classpathResource ) {

View File

@ -11,8 +11,11 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,6 +41,34 @@
*/ */
@NotThreadSafe @NotThreadSafe
public class DdlParser { public class DdlParser {
public static enum Action {
CREATE,
ALTER,
DROP
}
/**
* A listener that will be called with each change operation.
*/
public static interface Listener {
/**
* Handle an event that changes a table definition.
* @param tableId the identifier of the affected table; never null
* @param action the operation's action on the table
* @param ddlStatement the DDL statement
*/
void handleTableEvent( TableId tableId, Action action, String ddlStatement );
/**
* Handle an event that changes a table definition.
* @param indexName the name of the affected index; never null
* @param tableId the identifier of the associated table; never null
* @param action the operation's action on the index
* @param ddlStatement the DDL statement
*/
void handleIndexEvent( String indexName, TableId tableId, Action action, String ddlStatement );
}
protected static interface TokenSet { protected static interface TokenSet {
void add(String token); void add(String token);
@ -58,6 +89,7 @@ default void add(String firstToken, String... additionalTokens) {
protected final DataTypeParser dataTypeParser = new DataTypeParser(); protected final DataTypeParser dataTypeParser = new DataTypeParser();
protected Tables databaseTables; protected Tables databaseTables;
protected TokenStream tokens; protected TokenStream tokens;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
/** /**
* Create a new parser that uses the supplied {@link DataTypeParser}, but that does not include view definitions. * Create a new parser that uses the supplied {@link DataTypeParser}, but that does not include view definitions.
@ -81,6 +113,31 @@ public DdlParser(String terminator, boolean includeViews) {
initializeKeywords(keywords::add); initializeKeywords(keywords::add);
initializeStatementStarts(statementStarts::add); initializeStatementStarts(statementStarts::add);
} }
/**
* Add a listener. This method should not be called more than once with the same listener object, since the result will be
* that object will be called multiple times for each event.
* @param listener the listener; if null nothing is done
*/
public void addListener( Listener listener ) {
if ( listener != null ) listeners.add(listener);
}
/**
* Remove an existing listener.
* @param listener the listener; if null nothing is done
* @return {@code true} if the listener was removed, or {@code false} otherwise
*/
public boolean removeListener( Listener listener ) {
return listener != null ? listeners.remove(listener) : false;
}
/**
* Remove all existing listeners.
*/
public void removeListeners() {
listeners.clear();
}
protected void initializeDataTypes(DataTypeParser dataTypeParser) { protected void initializeDataTypes(DataTypeParser dataTypeParser) {
} }
@ -158,6 +215,25 @@ protected TableId parseQualifiedTableName(Marker start) {
return resolveTableId(currentSchema(), name); return resolveTableId(currentSchema(), name);
} }
/**
* Parse the next tokens for one or more comma-separated qualified table names. This method uses the schema name that appears in the
* token stream, or if none is found the {@link #currentSchema()}, and then calls {@link #resolveTableId(String, String)} with
* the values.
*
* @param start the start of the statement
* @return the resolved {@link TableId}
*/
protected List<TableId> parseQualifiedTableNames(Marker start) {
List<TableId> ids = new LinkedList<>();
TableId id = parseQualifiedTableName(start);
if ( id != null ) ids.add(id);
while (tokens.canConsume(',')) {
id = parseQualifiedTableName(start);
if ( id != null ) ids.add(id);
}
return ids;
}
/** /**
* Create a {@link TableId} from the supplied schema and table names. By default, this method uses the supplied schema name * Create a {@link TableId} from the supplied schema and table names. By default, this method uses the supplied schema name
* as the TableId's catalog, which often matches the catalog name in JDBC database metadata. * as the TableId's catalog, which often matches the catalog name in JDBC database metadata.
@ -302,20 +378,51 @@ protected void parseDrop(Marker marker) {
protected void parseUnknownStatement(Marker marker) { protected void parseUnknownStatement(Marker marker) {
consumeStatement(); consumeStatement();
} }
/**
* Signal that an action was applied to the identified table.
* @param tableId the identifier of the table that is affected; may not be null
* @param action the type of operation on the table; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signal( TableId tableId, Action action, Marker statementStart ) {
if ( !listeners.isEmpty() ) {
String statement = statement(statementStart);
listeners.forEach(listener -> listener.handleTableEvent(tableId,action,statement));
}
}
/**
* Signal that an action was applied to the identified table.
* @param indexName the name of the affected index; may not be null
* @param tableId the identifier of the associated table; may not be null
* @param action the type of operation on the index; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalIndexChange( String indexName, TableId tableId, Action action, Marker statementStart ) {
if ( !listeners.isEmpty() ) {
String statement = statement(statementStart);
listeners.forEach(listener -> listener.handleIndexEvent(indexName, tableId,action,statement));
}
}
protected void debugParsed(Marker statementStart) { protected void debugParsed(Marker statementStart) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); String statement = statement(statementStart);
logger.trace("PARSED: {}", statement); logger.trace("PARSED: {}", statement);
} }
} }
protected void debugSkipped(Marker statementStart) { protected void debugSkipped(Marker statementStart) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
String statement = removeLineFeeds(tokens.getContentFrom(statementStart)); String statement = statement(statementStart);
logger.trace("SKIPPED: {}", statement); logger.trace("SKIPPED: {}", statement);
} }
} }
protected String statement( Marker statementStart ) {
return removeLineFeeds(tokens.getContentFrom(statementStart));
}
private String removeLineFeeds(String input) { private String removeLineFeeds(String input) {
return input.replaceAll("[\\n|\\t]", ""); return input.replaceAll("[\\n|\\t]", "");

View File

@ -173,6 +173,7 @@ protected void parseCreateTable(Marker start) {
// Update the table definition ... // Update the table definition ...
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(tableId, Action.CREATE, start);
} }
protected void parseAsSubqueryClause(Marker start, TableEditor table) { protected void parseAsSubqueryClause(Marker start, TableEditor table) {
@ -539,6 +540,7 @@ protected void parseCreateView(Marker start) {
// Update the table definition ... // Update the table definition ...
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(tableId, Action.CREATE, start);
} }
protected void parseCreateUnknown(Marker start) { protected void parseCreateUnknown(Marker start) {
@ -595,6 +597,7 @@ protected void parseAlterTable(Marker start) {
} }
databaseTables.overwriteTable(table.create()); databaseTables.overwriteTable(table.create());
signal(tableId, Action.ALTER, start);
} }
protected void parseDropColumn(Marker start, TableEditor table) { protected void parseDropColumn(Marker start, TableEditor table) {
@ -667,6 +670,7 @@ protected void parseDropTable(Marker start) {
databaseTables.removeTable(tableId); databaseTables.removeTable(tableId);
// ignore the rest ... // ignore the rest ...
consumeRemainingStatement(start); consumeRemainingStatement(start);
signal(tableId, Action.DROP, start);
} }
protected void parseDropView(Marker start) { protected void parseDropView(Marker start) {
@ -676,6 +680,7 @@ protected void parseDropView(Marker start) {
databaseTables.removeTable(tableId); databaseTables.removeTable(tableId);
// ignore the rest ... // ignore the rest ...
consumeRemainingStatement(start); consumeRemainingStatement(start);
signal(tableId, Action.DROP, start);
} }
protected void parseDropUnknown(Marker start) { protected void parseDropUnknown(Marker start) {

View File

@ -0,0 +1,126 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.ddl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.fest.assertions.StringAssert;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.relational.TableId;
import io.debezium.relational.ddl.DdlParser.Action;
import io.debezium.relational.ddl.DdlParser.Listener;
/**
* @author Randall Hauch
*
*/
public class SimpleDdlParserListener implements Listener {
public static final class Event {
public final TableId tableId;
public final String indexName;
public final Action action;
public final String ddlStatement;
public Event(TableId tableId, Action action, String ddlStatement) {
this(null, tableId, action, ddlStatement);
}
public Event(String indexName, TableId tableId, Action action, String ddlStatement) {
this.indexName = indexName;
this.tableId = tableId;
this.action = action;
this.ddlStatement = ddlStatement;
}
public StringAssert assertDdlStatement() {
return assertThat(ddlStatement);
}
public Event assertTableId(TableId id) {
assertThat(tableId).isEqualTo(id);
return this;
}
public Event assertCreateTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.CREATE);
return this;
}
public Event assertDropTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.DROP);
return this;
}
public Event assertAlterTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.ALTER);
return this;
}
public Event assertCreateIndex(String indexName, TableId id) {
assertThat(indexName).isEqualTo(indexName);
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.CREATE);
return this;
}
public Event assertDropIndex(String indexName, TableId id) {
assertThat(indexName).isEqualTo(indexName);
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.DROP);
return this;
}
}
private final AtomicLong counter = new AtomicLong();
private final List<Event> events = new ArrayList<>();
public SimpleDdlParserListener() {
}
@Override
public void handleTableEvent(TableId tableId, Action action, String ddlStatement) {
events.add(new Event(tableId, action, ddlStatement));
counter.incrementAndGet();
}
@Override
public void handleIndexEvent(String indexName, TableId tableId, Action action, String ddlStatement) {
events.add(new Event(indexName, tableId, action, ddlStatement));
counter.incrementAndGet();
}
/**
* Get the total number of events that have been handled by this listener.
* @return the total number of events
*/
public int total() {
return counter.intValue();
}
/**
* Get the number of events currently held by this listener that have yet to be {@link #next() processed}.
* @return the number of remaining events
*/
public int remaining() {
return events.size();
}
/**
* Get the next event seen by this listener.
* @return the next event, or null if there is no event
*/
public Event next() {
return events.isEmpty() ? null : events.remove(0);
}
}