diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java index 986477475..4a5f15f88 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java @@ -1640,5 +1640,4 @@ public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener) { this.ddlChanges = changesListener; } } - } diff --git a/debezium-connector-mysql/src/test/resources/ddl/mysql-test-statements.ddl b/debezium-connector-mysql/src/test/resources/ddl/mysql-test-statements.ddl index 6816c686c..b95f1f3e9 100644 --- a/debezium-connector-mysql/src/test/resources/ddl/mysql-test-statements.ddl +++ b/debezium-connector-mysql/src/test/resources/ddl/mysql-test-statements.ddl @@ -329,7 +329,7 @@ CREATE SERVER server_1 -- [COMMENT [=] comment_text] -- ENGINE [=] engine_name -CREATE TABLESPACE tbl_space_1 ADD DATAFILE 'my_data_file' USER LOGFILE GROUP my_lf_group ENGINE = NDB; +CREATE TABLESPACE tbl_space_1 ADD DATAFILE 'my_data_file' USE LOGFILE GROUP my_lf_group ENGINE = NDB; -- CREATE -- [DEFINER = { user | CURRENT_USER }] diff --git a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 index 0499072e1..531e83cca 100644 --- a/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 +++ b/debezium-ddl-parser/src/main/antlr4/io/debezium/ddl/parser/mysql/generated/MySqlParser.g4 @@ -37,7 +37,7 @@ root ; sqlStatements - : (sqlStatement MINUSMINUS? SEMI | emptyStatement)* + : (sqlStatement MINUSMINUS? SEMI? | emptyStatement)* (sqlStatement (MINUSMINUS? SEMI)? | emptyStatement) ; diff --git a/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java b/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java index cf105c38a..1482b71dc 100644 --- a/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java +++ b/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java @@ -30,7 +30,9 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; @@ -70,6 +72,7 @@ protected ProxyParseTreeListener assignParserListeners() { proxyParseTreeListener.add(new DropTableParserListener()); proxyParseTreeListener.add(new RenameTableParserListener()); proxyParseTreeListener.add(new TruncateTableParserListener()); + proxyParseTreeListener.add(new CreateViewParserListener()); proxyParseTreeListener.add(new CreateUniqueIndexParserListener()); proxyParseTreeListener.add(new SetStatementParserListener()); proxyParseTreeListener.add(new UseStatementParserListener()); @@ -327,7 +330,8 @@ else if (dataTypeContext instanceof MySqlParser.SpatialDataTypeContext) { if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) { // NCHAR and NVARCHAR columns always uses utf8 as charset columnEditor.charsetName("utf8"); - } else { + } + else { columnEditor.charsetName(charsetName); } } @@ -337,7 +341,7 @@ private void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext in .map(indexColumnNameContext -> { // MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ... String columnName; - if(indexColumnNameContext.uid() != null) { + if (indexColumnNameContext.uid() != null) { columnName = parseName(indexColumnNameContext.uid()); } else { @@ -450,7 +454,7 @@ public void enterAlterSimpleDatabase(MySqlParser.AlterSimpleDatabaseContext ctx) @Override public void enterCreateDatabaseOption(MySqlParser.CreateDatabaseOptionContext ctx) { - if(ctx.charsetName() != null) { + if (ctx.charsetName() != null) { String charsetName = withoutQuotes(ctx.charsetName()); if ("DEFAULT".equalsIgnoreCase(charsetName)) { charsetName = systemVariables.getVariable(MySqlSystemVariables.CHARSET_NAME_SERVER); @@ -464,7 +468,7 @@ public void enterCreateDatabaseOption(MySqlParser.CreateDatabaseOptionContext ct /** * Parser listener fro MySQL drop database queries. */ - class DropDatabaseParserListener extends MySqlParserBaseListener { + class DropDatabaseParserListener extends MySqlParserBaseListener { @Override public void enterDropDatabase(MySqlParser.DropDatabaseContext ctx) { @@ -810,6 +814,196 @@ public void enterTruncateTable(MySqlParser.TruncateTableContext ctx) { } } + /** + * Parser listener for MySQL create view sql queries + */ + private class CreateViewParserListener extends MySqlParserBaseListener { + + private TableEditor selectTableEditor; + private Map tableByAlias = new HashMap<>(); + + @Override + public void enterCreateView(MySqlParser.CreateViewContext ctx) { + if (!skipViews) { + tableEditor = databaseTables.editOrCreateTable(parseQualifiedTableId(ctx.fullId())); + // create new columns just with specified name for now + if (ctx.uidList() != null) { + ctx.uidList().uid().forEach(uidContext -> { + tableEditor.addColumn(Column.editor().name(parseName(uidContext)).create()); + }); + } + } + super.enterCreateView(ctx); + } + + + @Override + public void exitQuerySpecification(MySqlParser.QuerySpecificationContext ctx) { + if (ctx.fromClause() != null) { + parseQuerySpecification(ctx.fromClause().tableSources(), ctx.selectElements()); + } + super.exitQuerySpecification(ctx); + } + + @Override + public void exitQuerySpecificationNointo(MySqlParser.QuerySpecificationNointoContext ctx) { + if (ctx.fromClause() != null) { + parseQuerySpecification(ctx.fromClause().tableSources(), ctx.selectElements()); + } + super.exitQuerySpecificationNointo(ctx); + } + + @Override + public void exitAtomTableItem(MySqlParser.AtomTableItemContext ctx) { + runIfTableEditorNotNull(() -> parseAtomTableItem(ctx, tableByAlias)); + super.exitAtomTableItem(ctx); + } + + @Override + public void exitSubqueryTableItem(MySqlParser.SubqueryTableItemContext ctx) { + runIfTableEditorNotNull(() -> { + // parsing subselect + String tableAlias = parseName(ctx.uid()); + TableId aliasTableId = resolveTableId(currentSchema(), tableAlias); + selectTableEditor.tableId(aliasTableId); + tableByAlias.put(aliasTableId, selectTableEditor.create()); + }); + super.exitSubqueryTableItem(ctx); + } + + private void parseQuerySpecification(MySqlParser.TableSourcesContext tableSourcesContext, MySqlParser.SelectElementsContext selectElementsContext) { + runIfTableEditorNotNull(() -> { + +// if (tableSourcesContext != null) { +// tableSourcesContext.tableSource().forEach(tableSourceContext -> { +// MySqlParser.TableSourceItemContext tableSourceItemContext = getTableSourceItemContext(tableSourceContext); +// // parsing atom table item, which is select without inner select +// parseAtomTableItem(tableSourceItemContext, tableByAlias); +// if (tableSourceItemContext instanceof MySqlParser.SubqueryTableItemContext) { +// // parsing subselect +// String tableAlias = parseName(((MySqlParser.SubqueryTableItemContext) tableSourceItemContext).uid()); +// TableId aliasTableId = resolveTableId(currentSchema(), tableAlias); +// selectTableEditor.tableId(aliasTableId); +// tableByAlias.put(aliasTableId, selectTableEditor.create()); +// } +// }); +// } + selectTableEditor = parseSelectElements(selectElementsContext); + }); + } + + private void parseAtomTableItem(MySqlParser.TableSourceItemContext ctx, Map tableByAlias) { + if (ctx instanceof MySqlParser.AtomTableItemContext) { + MySqlParser.AtomTableItemContext atomTableItemContext = (MySqlParser.AtomTableItemContext) ctx; + + TableId tableId = parseQualifiedTableId(atomTableItemContext.tableName().fullId()); + + Table table = tableByAlias.get(tableId); + if (table == null) { + table = databaseTables.forTable(tableId); + } + if (atomTableItemContext.alias != null) { + TableId aliasTableId = resolveTableId(tableId.schema(), parseName(atomTableItemContext.alias)); + tableByAlias.put(aliasTableId, table); + } + else { + tableByAlias.put(tableId, table); + } + } + } + + private TableEditor parseSelectElements(MySqlParser.SelectElementsContext ctx) { + TableEditor table = Table.editor(); + if (ctx.star != null) { + tableByAlias.keySet().forEach(tableId -> { + table.addColumns(tableByAlias.get(tableId).columns()); + }); + } + else { + ctx.selectElement().forEach(selectElementContext -> { + if (selectElementContext instanceof MySqlParser.SelectStarElementContext) { + TableId tableId = parseQualifiedTableId(((MySqlParser.SelectStarElementContext) selectElementContext).fullId()); + Table selectedTable = tableByAlias.get(tableId); + table.addColumns(selectedTable.columns()); + } + else if (selectElementContext instanceof MySqlParser.SelectColumnElementContext) { + MySqlParser.SelectColumnElementContext selectColumnElementContext = (MySqlParser.SelectColumnElementContext) selectElementContext; + MySqlParser.FullColumnNameContext fullColumnNameContext = selectColumnElementContext.fullColumnName(); + + String schemaName = currentSchema(); + String tableName = null; + String columnName; + + columnName = parseName(fullColumnNameContext.uid()); + if (fullColumnNameContext.dottedId(0) != null) { + // shift by 1 + tableName = columnName; + if (fullColumnNameContext.dottedId(1) != null) { + // shift by 2 + // final look of fullColumnName e.q. inventory.Persons.FirstName + schemaName = tableName; + tableName = withoutQuotes(fullColumnNameContext.dottedId(0).getText().substring(1)); + columnName = withoutQuotes(fullColumnNameContext.dottedId(1).getText().substring(1)); + } + else { + // final look of fullColumnName e.g. Persons.FirstName + columnName = withoutQuotes(fullColumnNameContext.dottedId(0).getText().substring(1)); + } + } + String alias = columnName; + if (selectColumnElementContext.uid() != null) { + alias = parseName(selectColumnElementContext.uid()); + } + if (tableName != null) { + Table selectedTable = tableByAlias.get(resolveTableId(schemaName, tableName)); + addColumnFromTable(table, columnName, alias, selectedTable); + } + else { + for (Table selectedTable : tableByAlias.values()) { + addColumnFromTable(table, columnName, alias, selectedTable); + } + } + } + }); + } + tableByAlias.clear(); + return table; + } + + @Override + public void exitCreateView(MySqlParser.CreateViewContext ctx) { + runIfTableEditorNotNull(() -> { + tableEditor.addColumns(selectTableEditor.columns()); + // Make sure that the table's character set has been set ... + if (!tableEditor.hasDefaultCharsetName()) { + tableEditor.setDefaultCharsetName(currentDatabaseCharset()); + } + databaseTables.overwriteTable(tableEditor.create()); + }); + signalCreateView(parseQualifiedTableId(ctx.fullId()), ctx); + super.exitCreateView(ctx); + } + + private MySqlParser.TableSourceItemContext getTableSourceItemContext(MySqlParser.TableSourceContext tableSourceContext) { + if (tableSourceContext instanceof MySqlParser.TableSourceBaseContext) { + return ((MySqlParser.TableSourceBaseContext) tableSourceContext).tableSourceItem(); + } + else if (tableSourceContext instanceof MySqlParser.TableSourceNestedContext) { + return ((MySqlParser.TableSourceNestedContext) tableSourceContext).tableSourceItem(); + } + return null; + } + + private void addColumnFromTable(TableEditor table, String columnName, String newColumnName, Table selectedTable) { + for (Column column : selectedTable.columns()) { + if (column.name().equals(columnName)) { + table.addColumn(column.edit().name(newColumnName).create()); + break; + } + } + } + } + /** * Parser listener for MySQL create unique index sql statement. */ @@ -850,7 +1044,7 @@ public void enterSetVariable(MySqlParser.SetVariableContext ctx) { continue; } String variableIdentifier = variableClauseContext.GLOBAL_ID().getText(); - if(variableIdentifier.startsWith("@@global.")) { + if (variableIdentifier.startsWith("@@global.")) { scope = MySqlScope.GLOBAL; variableName = variableIdentifier.substring("@@global.".length()); } @@ -861,7 +1055,8 @@ else if (variableIdentifier.startsWith("@@session.")) { else if (variableIdentifier.startsWith("@@local.")) { scope = MySqlScope.LOCAL; variableName = variableIdentifier.substring("@@local.".length()); - } else { + } + else { scope = MySqlScope.SESSION; variableName = variableIdentifier.substring("@@".length()); }