DBZ-252 create table and drop table listeners

This commit is contained in:
rkuchar 2018-03-10 18:25:36 +01:00 committed by Gunnar Morling
parent 909668e14a
commit 30e158aa71
5 changed files with 286 additions and 23 deletions

View File

@ -346,4 +346,16 @@ protected void setTypeInfoForConstant(String constantValue, ColumnEditor column)
} catch (NumberFormatException e) {
}
}
protected void debugParsed(String statement) {
if (logger.isTraceEnabled()) {
logger.trace("PARSED: {}", statement);
}
}
protected void debugSkipped(String statement) {
if (logger.isTraceEnabled()) {
logger.trace("SKIPPED: {}", statement);
}
}
}

View File

@ -403,17 +403,11 @@ protected void signalDropIndex(String indexName, TableId id, Marker statementSta
}
protected void debugParsed(Marker statementStart) {
if (logger.isTraceEnabled()) {
String statement = statement(statementStart);
logger.trace("PARSED: {}", statement);
}
debugParsed(statement(statementStart));
}
protected void debugSkipped(Marker statementStart) {
if (logger.isTraceEnabled()) {
String statement = statement(statementStart);
logger.trace("SKIPPED: {}", statement);
}
debugSkipped(statement(statementStart));
}
protected String statement(Marker statementStart) {

View File

@ -6,8 +6,10 @@
package io.debezium.antlr;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.AbstractDdlParser;
import io.debezium.relational.ddl.DdlParserListener;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import org.antlr.v4.runtime.CharStream;
@ -101,4 +103,127 @@ protected String statement(ParserRuleContext ctx) {
Interval interval = new Interval(ctx.start.getStartIndex(), ctx.stop.getStopIndex());
return ctx.start.getInputStream().getText(interval);
}
/**
* Signal a create database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalCreateDatabase(String databaseName, ParserRuleContext ctx) {
signalCreateDatabase(databaseName, statement(ctx));
}
/**
* Signal an alter database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param previousDatabaseName the previous name of the database if it was renamed, or null if it was not renamed
* @param ctx the start of the statement; may not be null
*/
protected void signalAlterDatabase(String databaseName, String previousDatabaseName, ParserRuleContext ctx) {
signalAlterDatabase(databaseName, previousDatabaseName, statement(ctx));
}
/**
* Signal a drop database event to all listeners.
*
* @param databaseName the database name; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalDropDatabase(String databaseName, ParserRuleContext ctx) {
signalDropDatabase(databaseName, statement(ctx));
}
/**
* Signal a create table event to all listeners.
*
* @param id the table identifier; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalCreateTable(TableId id, ParserRuleContext ctx) {
signalCreateTable(id, statement(ctx));
}
/**
* Signal an alter table event to all listeners.
*
* @param id the table identifier; may not be null
* @param previousId the previous name of the view if it was renamed, or null if it was not renamed
* @param ctx the start of the statement; may not be null
*/
protected void signalAlterTable(TableId id, TableId previousId, ParserRuleContext ctx) {
signalAlterTable(id, previousId, statement(ctx));
}
/**
* Signal a drop table event to all listeners.
*
* @param id the table identifier; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalDropTable(TableId id, ParserRuleContext ctx) {
signalDropTable(id, statement(ctx));
}
/**
* Signal a create view event to all listeners.
*
* @param id the table identifier; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalCreateView(TableId id, ParserRuleContext ctx) {
signalEvent(new DdlParserListener.TableCreatedEvent(id, statement(ctx), true));
}
/**
* Signal an alter view event to all listeners.
*
* @param id the table identifier; may not be null
* @param previousId the previous name of the view if it was renamed, or null if it was not renamed
* @param ctx the start of the statement; may not be null
*/
protected void signalAlterView(TableId id, TableId previousId, ParserRuleContext ctx) {
signalAlterView(id, previousId, statement(ctx));
}
/**
* Signal a drop view event to all listeners.
*
* @param id the table identifier; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalDropView(TableId id, ParserRuleContext ctx) {
signalDropView(id, statement(ctx));
}
/**
* Signal a create index event to all listeners.
*
* @param indexName the name of the index; may not be null
* @param id the table identifier; may be null if the index does not apply to a single table
* @param ctx the start of the statement; may not be null
*/
protected void signalCreateIndex(String indexName, TableId id, ParserRuleContext ctx) {
signalCreateIndex(indexName, id, statement(ctx));
}
/**
* Signal a drop index event to all listeners.
*
* @param indexName the name of the index; may not be null
* @param id the table identifier; may not be null
* @param ctx the start of the statement; may not be null
*/
protected void signalDropIndex(String indexName, TableId id, ParserRuleContext ctx) {
signalDropIndex(indexName, id, statement(ctx));
}
protected void debugParsed(ParserRuleContext ctx) {
debugParsed(statement(ctx));
}
protected void debugSkipped(ParserRuleContext ctx) {
debugSkipped(statement(ctx));
}
}

View File

@ -12,16 +12,19 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.misc.Interval;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author Roman Kuchár <kucharrom@gmail.com>.
@ -87,6 +90,105 @@ private class MySqlDdlParserListener extends MySqlParserBaseListener {
private int parsingColumnIndex = 0;
private List<ColumnEditor> columnEditors;
/*
* START - Listening event for create table statements
*/
@Override
public void enterQueryCreateTable(MySqlParser.QueryCreateTableContext ctx) {
TableId tableId = parseQualifiedTableId(ctx.tableName());
tableEditor = databaseTables.editOrCreateTable(tableId);
super.enterQueryCreateTable(ctx);
}
@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parseQualifiedTableId(ctx.tableName());
tableEditor = databaseTables.editOrCreateTable(tableId);
super.enterColumnCreateTable(ctx);
}
@Override
public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
databaseTables.overwriteTable(tableEditor.create());
signalCreateTable(tableEditor.tableId(), ctx);
debugParsed(ctx);
super.exitColumnCreateTable(ctx);
}
@Override
public void exitQueryCreateTable(MySqlParser.QueryCreateTableContext ctx) {
databaseTables.overwriteTable(tableEditor.create());
signalCreateTable(tableEditor.tableId(), ctx);
debugParsed(ctx);
super.exitQueryCreateTable(ctx);
}
@Override
public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
TableId tableId = parseQualifiedTableId(ctx.tableName(0));
TableId originalTableId = parseQualifiedTableId(ctx.tableName(1));
Table original = databaseTables.forTable(originalTableId);
if (original != null) {
databaseTables.overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames(), original.defaultCharsetName());
signalCreateTable(tableId, ctx);
}
debugParsed(ctx);
super.exitCopyCreateTable(ctx);
}
@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
String columnName = parseColumnName(ctx.uid());
columnEditor = Column.editor().name(columnName);
super.enterColumnDeclaration(ctx);
}
@Override
public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
tableEditor.addColumn(columnEditor.create());
columnEditor = null;
super.exitColumnDeclaration(ctx);
}
@Override
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
MySqlParser.IndexColumnNamesContext indexColumnNamesContext = ctx.indexColumnNames();
List<String> pkColumnNames = indexColumnNamesContext.indexColumnName().stream()
.map(indexColumnNameContext -> {
// MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ...
String columnName = parseColumnName(indexColumnNameContext.uid());
Column column = tableEditor.columnWithName(columnName);
if (column != null && column.isOptional()) {
tableEditor.addColumn(column.edit().optional(false).create());
}
return columnName;
})
.collect(Collectors.toList());
tableEditor.setPrimaryKeyNames(pkColumnNames);
super.enterPrimaryKeyTableConstraint(ctx);
}
/*
* END - Listening event for create table statements
*/
/*
* Drop table listener
*/
@Override
public void enterDropTable(MySqlParser.DropTableContext ctx) {
Interval interval = new Interval(ctx.start.getStartIndex(), ctx.tables().start.getStartIndex() - 1);
String prefix = ctx.start.getInputStream().getText(interval);
ctx.tables().tableName().forEach(tableNameContext -> {
TableId tableId = parseQualifiedTableId(tableNameContext);
databaseTables.removeTable(tableId);
signalDropTable(tableId, prefix + tableId.table()
+ (ctx.dropType != null ? " " + ctx.dropType.getText() : ""));
});
super.enterDropTable(ctx);
}
/*
* START - Listening events for alter table statements
*/
@ -100,26 +202,18 @@ public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
@Override
public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
String columnName = parseColumnName(ctx.uid().get(0));
String columnName = parseColumnName(ctx.uid(0));
columnEditor = Column.editor().name(columnName);
// TODO: how can i set a column position and update other existing columns position?
if (ctx.FIRST() != null) {
//TODO: this new column should have the first position in table
} else if (ctx.AFTER() != null) {
String afterColumn = parseColumnName(ctx.uid().get(1));
String afterColumn = parseColumnName(ctx.uid(1));
//TODO: this column should have position after the specified column
}
super.exitAlterByAddColumn(ctx);
}
@Override
public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
tableEditor.addColumn(columnEditor.create());
signalAlterTable(tableEditor.tableId(), null, statement(ctx.getParent()));
super.exitAlterByAddColumn(ctx);
}
@Override
public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
// multiple columns are added. Initialize a list of column editors for them
@ -131,12 +225,25 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
super.enterAlterByAddColumns(ctx);
}
@Override
public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
tableEditor.addColumn(columnEditor.create());
super.exitAlterByAddColumn(ctx);
}
@Override
public void exitAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
columnEditors.forEach(columnEditor -> tableEditor.addColumn(columnEditor.create()));
signalAlterTable(tableEditor.tableId(), null, statement(ctx.getParent()));
super.exitAlterByAddColumns(ctx);
}
@Override
public void exitAlterTable(MySqlParser.AlterTableContext ctx) {
databaseTables.overwriteTable(tableEditor.create());
signalAlterTable(tableEditor.tableId(), null, ctx.getParent());
debugParsed(ctx.getParent());
super.exitAlterTable(ctx);
}
/*
* END - Listening events for alter table statements
*/
@ -156,6 +263,8 @@ public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) {
@Override
public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) {
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary key error
columnEditor.optional(false);
tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
@ -188,7 +297,6 @@ public void enterAutoIncrementColumnConstraint(MySqlParser.AutoIncrementColumnCo
*/
@Override
public void exitSqlStatement(MySqlParser.SqlStatementContext ctx) {
// TODO finish the job and send signal event
// reset global values for next statement that could be parsed with this instance
tableEditor = null;
columnEditor = null;

View File

@ -12,6 +12,8 @@
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
/**
* @author Roman Kuchár <kucharrom@gmail.com>.
*/
@ -29,13 +31,35 @@ public void beforeEach() {
parser.addListener(listener);
}
@Test
public void shouldParseMultipleStatements() {
String ddl = "CREATE TABLE foo ( " + System.lineSeparator()
+ " c1 INTEGER NOT NULL, " + System.lineSeparator()
+ " c2 VARCHAR(22) " + System.lineSeparator()
+ "); " + System.lineSeparator()
+ "-- This is a comment" + System.lineSeparator()
+ "DROP TABLE foo;" + System.lineSeparator();
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(0); // table created and dropped
listener.assertNext().createTableNamed("foo").ddlStartsWith("CREATE TABLE foo (");
listener.assertNext().dropTableNamed("foo").ddlMatches("DROP TABLE foo");
}
@Test
public void shouldParseAlterStatementsAfterCreate() {
String ddl1 = "CREATE TABLE foo ( c1 INTEGER NOT NULL, c2 VARCHAR(22) );" + System.lineSeparator();
String ddl2 = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator();
parser.parse(ddl1, tables);
parser.parse(ddl2, tables);
listener.assertNext().createTableNamed("foo").ddlStartsWith("CREATE TABLE foo (");
listener.assertNext().alterTableNamed("foo").ddlStartsWith("ALTER TABLE foo ADD COLUMN c");
}
@Test
public void shouldParseAlterStatementsWithoutCreate() {
String ddl = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator()
+ "ALTER TABLE foo2 ADD b bigint;";
String ddl = "ALTER TABLE foo ADD COLUMN c bigint;" + System.lineSeparator();
parser.parse(ddl, tables);
listener.assertNext().alterTableNamed("foo").ddlStartsWith("ALTER TABLE foo ADD COLUMN c");
listener.assertNext().alterTableNamed("foo2").ddlStartsWith("ALTER TABLE foo2 ADD b");
}
}