DBZ-38 Changed the listening framework of the DDL parser

Refactored the mechanism by which components can listen to the activities of a DDL parser. The new approach
should be significantly more flexible for additional types of DDL events while making it easier to maintain
backward compatibility. It also will enable passing event-specific information on each DDL event.
This commit is contained in:
Randall Hauch 2016-04-12 09:03:12 -05:00
parent 75955945ee
commit 5b30568650
6 changed files with 501 additions and 179 deletions

View File

@ -166,12 +166,14 @@ protected void parseCreateTable(Marker start) {
databaseTables.overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames());
}
consumeRemainingStatement(start);
signalCreateTable(tableId, start);
debugParsed(start);
return;
}
if (onlyIfNotExists && databaseTables.forTable(tableId) != null) {
// The table does exist, so we should do nothing ...
consumeRemainingStatement(start);
signalCreateTable(tableId, start);
debugParsed(start);
return;
}
@ -192,7 +194,7 @@ protected void parseCreateTable(Marker start) {
// Update the table definition ...
databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.CREATE, start);
signalCreateTable(tableId, start);
debugParsed(start);
}
@ -596,12 +598,6 @@ protected void parseReferenceOption(Marker start) {
}
protected void parseCreateView(Marker start) {
if (skipViews) {
// We don't care about the rest ...
consumeRemainingStatement(start);
debugSkipped(start);
return;
}
tokens.canConsume("OR", "REPLACE");
if (tokens.canConsume("ALGORITHM")) {
tokens.consume('=');
@ -616,6 +612,14 @@ protected void parseCreateView(Marker start) {
}
tokens.consume("VIEW");
TableId tableId = parseQualifiedTableName(start);
if (skipViews) {
// We don't care about the rest ...
consumeRemainingStatement(start);
signalCreateView(tableId, start);
debugSkipped(start);
return;
}
TableEditor table = databaseTables.editOrCreateTable(tableId);
if (tokens.matches('(')) {
List<String> columnNames = parseColumnNameList(start);
@ -678,35 +682,39 @@ protected void parseCreateView(Marker start) {
// Update the table definition ...
databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.CREATE, start);
signalCreateView(tableId, start);
debugParsed(start);
}
protected void parseCreateIndex(Marker start) {
if (tokens.canConsume("UNIQUE")) {
// This is a unique index, and we can mark the index's columns as the primary key iff there is not already
// a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.)
boolean unique = tokens.canConsume("UNIQUE");
tokens.canConsumeAnyOf("FULLTEXT","SPATIAL");
tokens.consume("INDEX");
String indexName = tokens.consume(); // index name
if (tokens.canConsume("USING")) {
if (tokens.matches("USING")) {
parseIndexType(start);
}
TableId tableId = null;
if (tokens.canConsume("ON")) {
// Usually this is required, but in some cases ON is not required
TableId tableName = parseQualifiedTableName(start);
TableEditor table = databaseTables.editTable(tableName);
tableId = parseQualifiedTableName(start);
}
if ( unique && tableId != null ) {
// This is a unique index, and we can mark the index's columns as the primary key iff there is not already
// a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.)
TableEditor table = databaseTables.editTable(tableId);
if (table != null && !table.hasPrimaryKey()) {
List<String> names = parseIndexColumnNames(start);
if (table.columns().stream().allMatch(Column::isRequired)) {
databaseTables.overwriteTable(table.setPrimaryKeyNames(names).create());
signalIndexChange(indexName, table.tableId(), Action.CREATE, start);
}
}
}
}
// We don't care about any other statements or the rest of this statement ...
consumeRemainingStatement(start);
signalCreateIndex(indexName, tableId, start);
debugParsed(start);
}
@ -730,6 +738,7 @@ protected void parseAlterTable(Marker start) {
tokens.consume("TABLE");
TableId tableId = parseQualifiedTableName(start);
TableEditor table = databaseTables.editTable(tableId);
TableId oldTableId = null;
if (table != null) {
AtomicReference<TableId> newTableName = new AtomicReference<>(null);
if (!tokens.matches(terminator()) && !tokens.matches("PARTITION")) {
@ -739,15 +748,19 @@ protected void parseAlterTable(Marker start) {
parsePartitionOptions(start, table);
}
databaseTables.overwriteTable(table.create());
signal(table.tableId(), Action.ALTER, start);
if (newTableName.get() != null) {
// the table was renamed ...
databaseTables.renameTable(tableId, newTableName.get());
Table renamed = databaseTables.renameTable(tableId, newTableName.get());
if ( renamed != null ) {
oldTableId = tableId;
tableId = renamed.id();
}
}
} else {
// We don't know about this table ...
consumeRemainingStatement(start);
}
signalAlterTable(tableId, oldTableId, start);
}
protected void parseAlterSpecificationList(Marker start, TableEditor table, Consumer<TableId> newTableName) {
@ -901,7 +914,7 @@ protected void parseDropTable(Marker start) {
tokens.canConsumeAnyOf("RESTRICT", "CASCADE");
ids.forEach(tableId->{
databaseTables.removeTable(tableId);
signal(tableId, Action.DROP, start);
signalDropTable(tableId, start);
});
debugParsed(start);
}
@ -918,7 +931,7 @@ protected void parseDropView(Marker start) {
tokens.canConsumeAnyOf("RESTRICT", "CASCADE");
ids.forEach(tableId->{
databaseTables.removeTable(tableId);
signal(tableId, Action.DROP, start);
signalDropView(tableId, start);
});
debugParsed(start);
}
@ -929,7 +942,7 @@ protected void parseDropIndex(Marker start) {
tokens.consume("ON");
TableId tableId = parseQualifiedTableName(start);
consumeRemainingStatement(start);
signalIndexChange(indexName,tableId,Action.DROP,start);
signalDropIndex(indexName, tableId, start);
debugParsed(start);
}

View File

@ -50,8 +50,25 @@ public void shouldParseMultipleStatements() {
+ "DROP TABLE foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
listener.next().assertCreateTable(TableId.parse("foo")).assertDdlStatement().startsWith("CREATE TABLE foo (");
listener.next().assertDropTable(TableId.parse("foo")).assertDdlStatement().isEqualTo("DROP TABLE foo");
listener.assertNext().createTableNamed("foo").ddlStartsWith("CREATE TABLE foo (");
listener.assertNext().dropTableNamed("foo").ddlMatches("DROP TABLE foo");
}
@Test
public void shouldParseAlterStatementsAfterCreate() {
String ddl1 = "CREATE TABLE foo ( c1 INTEGER NOT NULL, c2 VARCHAR(22) );" + System.lineSeparator();
String ddl2 = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator();
parser.parse(ddl1, tables);
parser.parse(ddl2, tables);
listener.assertNext().createTableNamed("foo").ddlStartsWith("CREATE TABLE foo (");
listener.assertNext().alterTableNamed("foo").ddlStartsWith("ALTER TABLE foo ADD COLUMN c");
}
@Test
public void shouldParseAlterStatementsWithoutCreate() {
String ddl = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator();
parser.parse(ddl, tables);
listener.assertNext().alterTableNamed("foo").ddlStartsWith("ALTER TABLE foo ADD COLUMN c");
}
@Test
@ -133,7 +150,7 @@ public void shouldParseCreateStatements() {
parser.parse(readFile("ddl/mysql-test-create.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(57); // no tables
assertThat(listener.total()).isEqualTo(88);
assertThat(listener.total()).isEqualTo(144);
}
@Test
@ -141,14 +158,14 @@ public void shouldParseTestStatements() {
parser.parse(readFile("ddl/mysql-test-statements.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(6); // no tables
assertThat(listener.total()).isEqualTo(24);
assertThat(listener.total()).isEqualTo(46);
}
@Test
public void shouldParseSomeLinesFromCreateStatements() {
parser.parse(readLines(189,"ddl/mysql-test-create.ddl"), tables);
assertThat(tables.size()).isEqualTo(39); // no tables
assertThat(listener.total()).isEqualTo(68);
assertThat(listener.total()).isEqualTo(120);
}
protected String readFile( String classpathResource ) {

View File

@ -26,6 +26,11 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParserListener.TableAlteredEvent;
import io.debezium.relational.ddl.DdlParserListener.TableCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableDroppedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableIndexCreatedEvent;
import io.debezium.relational.ddl.DdlParserListener.TableIndexDroppedEvent;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import io.debezium.text.Position;
@ -42,34 +47,6 @@
@NotThreadSafe
public class DdlParser {
public static enum Action {
CREATE,
ALTER,
DROP
}
/**
* A listener that will be called with each change operation.
*/
public static interface Listener {
/**
* Handle an event that changes a table definition.
* @param tableId the identifier of the affected table; never null
* @param action the operation's action on the table
* @param ddlStatement the DDL statement
*/
void handleTableEvent( TableId tableId, Action action, String ddlStatement );
/**
* Handle an event that changes a table definition.
* @param indexName the name of the affected index; never null
* @param tableId the identifier of the associated table; never null
* @param action the operation's action on the index
* @param ddlStatement the DDL statement
*/
void handleIndexEvent( String indexName, TableId tableId, Action action, String ddlStatement );
}
protected static interface TokenSet {
void add(String token);
@ -89,7 +66,7 @@ default void add(String firstToken, String... additionalTokens) {
protected final DataTypeParser dataTypeParser = new DataTypeParser();
protected Tables databaseTables;
protected TokenStream tokens;
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final List<DdlParserListener> listeners = new CopyOnWriteArrayList<>();
/**
* Create a new parser that uses the supplied {@link DataTypeParser}, but that does not include view definitions.
@ -117,18 +94,20 @@ public DdlParser(String terminator, boolean includeViews) {
/**
* Add a listener. This method should not be called more than once with the same listener object, since the result will be
* that object will be called multiple times for each event.
*
* @param listener the listener; if null nothing is done
*/
public void addListener( Listener listener ) {
public void addListener(DdlParserListener listener) {
if (listener != null) listeners.add(listener);
}
/**
* Remove an existing listener.
*
* @param listener the listener; if null nothing is done
* @return {@code true} if the listener was removed, or {@code false} otherwise
*/
public boolean removeListener( Listener listener ) {
public boolean removeListener(DdlParserListener listener) {
return listener != null ? listeners.remove(listener) : false;
}
@ -216,7 +195,8 @@ protected TableId parseQualifiedTableName(Marker start) {
}
/**
* Parse the next tokens for one or more comma-separated qualified table names. This method uses the schema name that appears in the
* Parse the next tokens for one or more comma-separated qualified table names. This method uses the schema name that appears
* in the
* token stream, or if none is found the {@link #currentSchema()}, and then calls {@link #resolveTableId(String, String)} with
* the values.
*
@ -380,30 +360,98 @@ protected void parseUnknownStatement(Marker marker) {
}
/**
* Signal that an action was applied to the identified table.
* @param tableId the identifier of the table that is affected; may not be null
* @param action the type of operation on the table; may not be null
* @param statementStart the start of the statement; may not be null
* Signal an event to all listeners.
*
* @param event the event; may not be null
*/
protected void signal( TableId tableId, Action action, Marker statementStart ) {
if ( !listeners.isEmpty() ) {
String statement = statement(statementStart);
listeners.forEach(listener -> listener.handleTableEvent(tableId,action,statement));
protected void signalEvent(DdlParserListener.Event event) {
if (event != null && !listeners.isEmpty()) {
listeners.forEach(listener -> listener.handle(event));
}
}
/**
* Signal that an action was applied to the identified table.
* @param indexName the name of the affected index; may not be null
* @param tableId the identifier of the associated table; may not be null
* @param action the type of operation on the index; may not be null
* Signal a create table event to all listeners.
*
* @param id the table identifier; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalIndexChange( String indexName, TableId tableId, Action action, Marker statementStart ) {
if ( !listeners.isEmpty() ) {
String statement = statement(statementStart);
listeners.forEach(listener -> listener.handleIndexEvent(indexName, tableId,action,statement));
protected void signalCreateTable(TableId id, Marker statementStart) {
signalEvent(new TableCreatedEvent(id, statement(statementStart), false));
}
/**
* Signal an alter table event to all listeners.
*
* @param id the table identifier; may not be null
* @param previousId the previous name of the view if it was renamed, or null if it was not renamed
* @param statementStart the start of the statement; may not be null
*/
protected void signalAlterTable(TableId id, TableId previousId, Marker statementStart) {
signalEvent(new TableAlteredEvent(id, previousId, statement(statementStart), false));
}
/**
* Signal a drop table event to all listeners.
*
* @param id the table identifier; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalDropTable(TableId id, Marker statementStart) {
signalEvent(new TableDroppedEvent(id, statement(statementStart), false));
}
/**
* Signal a create view event to all listeners.
*
* @param id the table identifier; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalCreateView(TableId id, Marker statementStart) {
signalEvent(new TableCreatedEvent(id, statement(statementStart), true));
}
/**
* Signal an alter view event to all listeners.
*
* @param id the table identifier; may not be null
* @param previousId the previous name of the view if it was renamed, or null if it was not renamed
* @param statementStart the start of the statement; may not be null
*/
protected void signalAlterView(TableId id, TableId previousId, Marker statementStart) {
signalEvent(new TableAlteredEvent(id, previousId, statement(statementStart), true));
}
/**
* Signal a drop view event to all listeners.
*
* @param id the table identifier; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalDropView(TableId id, Marker statementStart) {
signalEvent(new TableDroppedEvent(id, statement(statementStart), true));
}
/**
* Signal a create index event to all listeners.
*
* @param indexName the name of the index; may not be null
* @param id the table identifier; may be null if the index does not apply to a single table
* @param statementStart the start of the statement; may not be null
*/
protected void signalCreateIndex(String indexName, TableId id, Marker statementStart) {
signalEvent(new TableIndexCreatedEvent(indexName,id, statement(statementStart)));
}
/**
* Signal a drop index event to all listeners.
*
* @param indexName the name of the index; may not be null
* @param id the table identifier; may not be null
* @param statementStart the start of the statement; may not be null
*/
protected void signalDropIndex(String indexName, TableId id, Marker statementStart) {
signalEvent(new TableIndexDroppedEvent(indexName,id, statement(statementStart)));
}
protected void debugParsed(Marker statementStart) {
@ -704,20 +752,17 @@ protected void setTypeInfoForConstant(String constantValue, ColumnEditor column)
Integer.parseInt(constantValue);
column.typeName("INTEGER");
column.jdbcType(Types.INTEGER);
} catch (NumberFormatException e) {
}
} catch (NumberFormatException e) {}
try {
Long.parseLong(constantValue);
column.typeName("BIGINT");
column.jdbcType(Types.BIGINT);
} catch (NumberFormatException e) {
}
} catch (NumberFormatException e) {}
try {
Float.parseFloat(constantValue);
column.typeName("FLOAT");
column.jdbcType(Types.FLOAT);
} catch (NumberFormatException e) {
}
} catch (NumberFormatException e) {}
try {
Double.parseDouble(constantValue);
column.typeName("DOUBLE");
@ -732,24 +777,24 @@ protected void setTypeInfoForConstant(String constantValue, ColumnEditor column)
} else if (c == '.') {
foundDecimalPoint = true;
} else if (Character.isDigit(c)) {
if ( foundDecimalPoint ) ++scale;
else ++precision;
if (foundDecimalPoint)
++scale;
else
++precision;
} else {
break;
}
}
column.length(precision);
column.scale(scale);
} catch (NumberFormatException e) {
}
} catch (NumberFormatException e) {}
try {
BigDecimal decimal = new BigDecimal(constantValue);
column.typeName("DECIMAL");
column.jdbcType(Types.DECIMAL);
column.length(decimal.precision());
column.scale(decimal.precision());
} catch (NumberFormatException e) {
}
} catch (NumberFormatException e) {}
}
protected String determineTypeNameForConstant(long value) {
@ -837,8 +882,7 @@ protected boolean canConsumeJoinCondition(Marker start) {
if (tokens.canConsume("ON")) {
try {
parseSchemaQualifiedName(start);
while (tokens.canConsume(DdlTokenizer.SYMBOL)) {
}
while (tokens.canConsume(DdlTokenizer.SYMBOL)) {}
parseSchemaQualifiedName(start);
return true;
} catch (ParsingException e) {

View File

@ -0,0 +1,202 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.ddl;
import io.debezium.relational.TableId;
/**
* An interface that can listen to various actions of a {@link DdlParser}. Every kind of {@link Event} has a {@link EventType
* type} that makes it easier to implement a {@link DdlParserListener} using a {@code switch} statement. However, each kind of
* {@link Event} also may have additional data associated with it.
* <p>
* Clearly not all DDL statements processed by a {@link DdlParser parser} will result in an {@link Event event}.
*
* @author Randall Hauch
*/
@FunctionalInterface
public interface DdlParserListener {
/**
* Handle a DDL event.
*
* @param event the DDL event; never null
*/
void handle(Event event);
/**
* The type of concrete {@link Event}s.
*/
public static enum EventType {
CREATE_TABLE, ALTER_TABLE, DROP_TABLE,
CREATE_INDEX, DROP_INDEX,
}
/**
* The base class for all concrete events.
*/
public static abstract class Event {
private final String statement;
private final EventType type;
public Event(EventType type, String ddlStatement) {
this.type = type;
this.statement = ddlStatement;
}
/**
* Get the {@link EventType type} of event. This is useful when switching on the kind of event.
* @return the type of event; never null
*/
public EventType type() {
return type;
}
/**
* Get the DDL statement associated with this event.
* @return the DDL statement; never null
*/
public String statement() {
return statement;
}
}
/**
* The base class for all table-related events.
*/
public static abstract class TableEvent extends Event {
private final TableId tableId;
private final boolean isView;
public TableEvent(EventType type, TableId tableId, String ddlStatement, boolean isView) {
super(type, ddlStatement);
this.tableId = tableId;
this.isView = isView;
}
/**
* Get the identifier of the primary table affected by this event.
* @return the table identifier; never null
*/
public TableId tableId() {
return tableId;
}
/**
* Determine whether the target of the event is a view rather than a table.
* @return {@code true} if the target is a view, or {@code false} if the target is a table
*/
public boolean isView() {
return isView;
}
@Override
public String toString() {
return tableId() + " => " + statement();
}
}
/**
* An event describing the creation (or replacement) of a table.
*/
public static class TableCreatedEvent extends TableEvent {
public TableCreatedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.CREATE_TABLE, tableId, ddlStatement, isView);
}
}
/**
* An event describing the altering of a table.
*/
public static class TableAlteredEvent extends TableEvent {
private final TableId previousTableId;
public TableAlteredEvent(TableId tableId, TableId previousTableId, String ddlStatement, boolean isView) {
super(EventType.ALTER_TABLE, tableId, ddlStatement, isView);
this.previousTableId = previousTableId;
}
/**
* If the table was renamed, then get the old identifier of the table before it was renamed.
* @return the table's previous identifier; may be null if the alter did not affect the table's identifier
*/
public TableId previousTableId() {
return previousTableId;
}
@Override
public String toString() {
if ( previousTableId != null ) {
return tableId() + " (was " + previousTableId() + ") => " + statement();
}
return tableId() + " => " + statement();
}
}
/**
* An event describing the dropping of a table.
*/
public static class TableDroppedEvent extends TableEvent {
public TableDroppedEvent(TableId tableId, String ddlStatement, boolean isView) {
super(EventType.DROP_TABLE, tableId, ddlStatement, isView);
}
}
/**
* The abstract base class for all index-related events.
*/
public static abstract class TableIndexEvent extends Event {
private final TableId tableId;
private final String indexName;
public TableIndexEvent(EventType type, String indexName, TableId tableId, String ddlStatement) {
super(type, ddlStatement);
this.tableId = tableId;
this.indexName = indexName;
}
/**
* Get the identifier of the table to which the index applies.
* @return the table identifier; may be null if the index is not scoped to a single table
*/
public TableId tableId() {
return tableId;
}
/**
* Get the name of the index affected by this event.
* @return the index name; never null
*/
public String indexName() {
return indexName;
}
@Override
public String toString() {
if ( tableId == null ) {
return indexName() + " => " + statement();
}
return indexName() + " on " + tableId() + " => " + statement();
}
}
/**
* An event describing the creation of an index on a table.
*/
public static class TableIndexCreatedEvent extends TableIndexEvent {
public TableIndexCreatedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.CREATE_INDEX, indexName, tableId, ddlStatement);
}
}
/**
* An event describing the dropping of an index on a table.
*/
public static class TableIndexDroppedEvent extends TableIndexEvent {
public TableIndexDroppedEvent(String indexName, TableId tableId, String ddlStatement) {
super(EventType.DROP_INDEX, indexName, tableId, ddlStatement);
}
}
}

View File

@ -173,7 +173,7 @@ protected void parseCreateTable(Marker start) {
// Update the table definition ...
databaseTables.overwriteTable(table.create());
signal(tableId, Action.CREATE, start);
signalCreateTable(tableId, start);
}
protected void parseAsSubqueryClause(Marker start, TableEditor table) {
@ -502,15 +502,17 @@ protected void parseReferencesScopeCheck(Marker start, String columnName, TokenS
}
protected void parseCreateView(Marker start) {
if ( skipViews ) {
// We don't care about the rest ...
consumeRemainingStatement(start);
debugSkipped(start);
return;
}
tokens.canConsume("RECURSIVE");
tokens.consume("VIEW");
TableId tableId = parseQualifiedTableName(start);
if ( skipViews ) {
// We don't care about the rest ...
consumeRemainingStatement(start);
signalCreateTable(tableId, start);
debugSkipped(start);
return;
}
TableEditor table = databaseTables.editOrCreateTable(tableId);
List<String> columnNames = null;
@ -540,7 +542,7 @@ protected void parseCreateView(Marker start) {
// Update the table definition ...
databaseTables.overwriteTable(table.create());
signal(tableId, Action.CREATE, start);
signalCreateView(tableId, start);
}
protected void parseCreateUnknown(Marker start) {
@ -597,7 +599,7 @@ protected void parseAlterTable(Marker start) {
}
databaseTables.overwriteTable(table.create());
signal(tableId, Action.ALTER, start);
signalAlterTable(tableId, null, start); // rename is not supported
}
protected void parseDropColumn(Marker start, TableEditor table) {
@ -670,7 +672,7 @@ protected void parseDropTable(Marker start) {
databaseTables.removeTable(tableId);
// ignore the rest ...
consumeRemainingStatement(start);
signal(tableId, Action.DROP, start);
signalDropTable(tableId, start);
}
protected void parseDropView(Marker start) {
@ -680,7 +682,7 @@ protected void parseDropView(Marker start) {
databaseTables.removeTable(tableId);
// ignore the rest ...
consumeRemainingStatement(start);
signal(tableId, Action.DROP, start);
signalDropView(tableId, start);
}
protected void parseDropUnknown(Marker start) {

View File

@ -9,75 +9,117 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.fest.assertions.StringAssert;
import static org.fest.assertions.Assertions.assertThat;
import io.debezium.relational.TableId;
import io.debezium.relational.ddl.DdlParser.Action;
import io.debezium.relational.ddl.DdlParser.Listener;
/**
* @author Randall Hauch
*
*/
public class SimpleDdlParserListener implements Listener {
public class SimpleDdlParserListener implements DdlParserListener {
public static final class Event {
public final TableId tableId;
public final String indexName;
public final Action action;
public final String ddlStatement;
public static final class EventAssert {
public Event(TableId tableId, Action action, String ddlStatement) {
this(null, tableId, action, ddlStatement);
private final Event actual;
public EventAssert(Event actual) {
this.actual = actual;
}
public Event(String indexName, TableId tableId, Action action, String ddlStatement) {
this.indexName = indexName;
this.tableId = tableId;
this.action = action;
this.ddlStatement = ddlStatement;
}
public StringAssert assertDdlStatement() {
return assertThat(ddlStatement);
}
public Event assertTableId(TableId id) {
assertThat(tableId).isEqualTo(id);
public EventAssert ddlMatches( String expected) {
assertThat(actual.statement()).isEqualTo(expected);
return this;
}
public Event assertCreateTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.CREATE);
public EventAssert ddlStartsWith( String expected) {
assertThat(actual.statement()).startsWith(expected);
return this;
}
public Event assertDropTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.DROP);
public EventAssert ddlContains( String expected) {
assertThat(actual.statement()).contains(expected);
return this;
}
public Event assertAlterTable(TableId id) {
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.ALTER);
protected TableEvent tableEvent() {
assertThat(actual).isInstanceOf(TableEvent.class);
return (TableEvent)actual;
}
protected TableAlteredEvent alterTableEvent() {
assertThat(actual).isInstanceOf(TableAlteredEvent.class);
return (TableAlteredEvent)actual;
}
public EventAssert tableNameIs( String expected) {
assertThat(tableEvent().tableId().table()).isEqualTo(expected);
return this;
}
public Event assertCreateIndex(String indexName, TableId id) {
assertThat(indexName).isEqualTo(indexName);
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.CREATE);
public EventAssert tableIs(TableId expected) {
assertThat(tableEvent().tableId()).isEqualTo(expected);
return this;
}
public Event assertDropIndex(String indexName, TableId id) {
assertThat(indexName).isEqualTo(indexName);
assertThat(tableId).isEqualTo(id);
assertThat(action).isEqualTo(Action.DROP);
public EventAssert ofType( EventType expected) {
assertThat(actual.type()).isEqualTo(expected);
return this;
}
public EventAssert createTableNamed( String tableName) {
return createTable().tableNameIs(tableName).isNotView();
}
public EventAssert alterTableNamed( String tableName) {
return alterTable().tableNameIs(tableName).isNotView();
}
public EventAssert renamedFrom( String oldName ) {
TableId previousTableId = alterTableEvent().previousTableId();
if ( oldName == null ) {
assertThat(previousTableId).isNull();
} else {
assertThat(previousTableId.table()).isEqualTo(oldName);
}
return this;
}
public EventAssert dropTableNamed( String tableName) {
return dropTable().tableNameIs(tableName).isNotView();
}
public EventAssert createViewNamed( String viewName) {
return createTable().tableNameIs(viewName).isView();
}
public EventAssert alterViewNamed( String viewName) {
return alterTable().tableNameIs(viewName).isView();
}
public EventAssert dropViewNamed( String viewName) {
return dropTable().tableNameIs(viewName).isView();
}
public EventAssert isView() {
assertThat(tableEvent().isView()).isTrue();
return this;
}
public EventAssert isNotView() {
assertThat(tableEvent().isView()).isFalse();
return this;
}
public EventAssert createTable() {
ofType(EventType.CREATE_TABLE);
return this;
}
public EventAssert alterTable() {
ofType(EventType.ALTER_TABLE);
return this;
}
public EventAssert dropTable() {
ofType(EventType.DROP_TABLE);
return this;
}
public EventAssert createIndex() {
ofType(EventType.CREATE_INDEX);
return this;
}
public EventAssert dropIndex() {
ofType(EventType.DROP_INDEX);
return this;
}
}
@ -89,14 +131,8 @@ public SimpleDdlParserListener() {
}
@Override
public void handleTableEvent(TableId tableId, Action action, String ddlStatement) {
events.add(new Event(tableId, action, ddlStatement));
counter.incrementAndGet();
}
@Override
public void handleIndexEvent(String indexName, TableId tableId, Action action, String ddlStatement) {
events.add(new Event(indexName, tableId, action, ddlStatement));
public void handle(Event event) {
events.add(event);
counter.incrementAndGet();
}
@ -109,7 +145,7 @@ public int total() {
}
/**
* Get the number of events currently held by this listener that have yet to be {@link #next() processed}.
* Get the number of events currently held by this listener that have yet to be {@link #assertNext() checked}.
* @return the number of remaining events
*/
public int remaining() {
@ -117,10 +153,18 @@ public int remaining() {
}
/**
* Get the next event seen by this listener.
* Assert that there is no next event.
*/
public void assertNoMoreEvents() {
assertThat( events.isEmpty()).isTrue();
}
/**
* Perform assertions on the next event seen by this listener.
* @return the next event, or null if there is no event
*/
public Event next() {
return events.isEmpty() ? null : events.remove(0);
public EventAssert assertNext() {
assertThat( events.isEmpty()).isFalse();
return new EventAssert(events.remove(0));
}
}