From b4480c55b701bf2e18780c8692916ab9f846708c Mon Sep 17 00:00:00 2001 From: rkuchar Date: Fri, 13 Apr 2018 11:46:44 +0200 Subject: [PATCH] DBZ-252 parse database queries (create, alter, delete), parse name tokens without quotes, add new Mysql connector configuration field for choosing ddl parsing mode --- .../connector/mysql/MySqlConnectorConfig.java | 71 +++++++++++- .../connector/mysql/MySqlDdlParser.java | 15 +-- .../debezium/connector/mysql/MySqlSchema.java | 22 ++-- .../connector/mysql/MySqlDdlParserTest.java | 9 +- .../java/io/debezium/relational/Tables.java | 19 ++++ .../relational/ddl/AbstractDdlParser.java | 29 +++++ .../io/debezium/antlr/AntlrDdlParser.java | 4 + .../antlr/mysql/MySqlAntlrDdlParser.java | 101 ++++++++++++++---- .../antlr/mysql/MySqlAntlrDdlParserTest.java | 9 +- 9 files changed, 227 insertions(+), 52 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index 0573ec7ea..95b7e765a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -10,6 +10,8 @@ import java.util.Objects; import java.util.Random; +import io.debezium.antlr.mysql.MySqlAntlrDdlParser; +import io.debezium.relational.ddl.DdlParser; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -452,6 +454,60 @@ public static EventProcessingFailureHandlingMode parse(String value) { } } + public static enum DdlParsingMode implements EnumeratedValue { + + LEGACY("legacy", MySqlDdlParser.class), + ANTLR("antlr", MySqlAntlrDdlParser.class); + + private final String value; + private final Class parserClass; + + private DdlParsingMode(String value, Class parserClass) { + this.value = value; + this.parserClass = parserClass; + } + + @Override + public String getValue() { + return value; + } + + public Class getParserClass() { + return parserClass; + } + + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @return the matching option, or null if no match is found + */ + public static DdlParsingMode parse(String value) { + if (value == null) return null; + value = value.trim(); + for (DdlParsingMode option : DdlParsingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) return option; + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be null + * @param defaultValue the default value; may be null + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static DdlParsingMode parse(String value, String defaultValue) { + DdlParsingMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + private static final String DATABASE_WHITELIST_NAME = "database.whitelist"; private static final String TABLE_WHITELIST_NAME = "table.whitelist"; private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin"; @@ -903,6 +959,18 @@ public static EventProcessingFailureHandlingMode parse(String value) { .withDefault(0L) .withValidation(Field::isNonNegativeLong); + public static final Field DDL_PARSER_MODE = Field.create("ddl.parser.mode") + .withDisplayName("Ddl parser mode") + .withEnum(DdlParsingMode.class, DdlParsingMode.LEGACY) + .withWidth(Width.SHORT) + .withImportance(Importance.MEDIUM) + // TODO rkuchar: change description + .withDescription("Time, date, and timestamps can be represented with different kinds of precisions, including:" + + "'adaptive_time_microseconds' (the default) like 'adaptive' mode, but TIME fields always use microseconds precision;" + + "'adaptive' (deprecated) bases the precision of time, date, and timestamp values on the database column's precision; " + + "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, " + + "which uses millisecond precision regardless of the database columns' precision."); + /** * Method that generates a Field for specifying that string columns whose names match a set of regular expressions should * have their values truncated to be no longer than the specified number of characters. @@ -957,6 +1025,7 @@ public static final Field MASK_COLUMN(int length) { EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE, SNAPSHOT_DELAY_MS, + DDL_PARSER_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE); /** @@ -1013,7 +1082,7 @@ protected static ConfigDef configDef() { Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, - BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS); + BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, DDL_PARSER_MODE); return config; } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java index 97a76646d..3551b43d0 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java @@ -835,20 +835,6 @@ public static List parseSetAndEnumOptions(String typeExpression) { return options; } - protected static String withoutQuotes(String possiblyQuoted) { - if (possiblyQuoted.length() < 2) { - // Too short to be quoted ... - return possiblyQuoted; - } - if (possiblyQuoted.startsWith("'") && possiblyQuoted.endsWith("'")) { - return possiblyQuoted.substring(1, possiblyQuoted.length() - 1); - } - if (possiblyQuoted.startsWith("\"") && possiblyQuoted.endsWith("\"")) { - return possiblyQuoted.substring(1, possiblyQuoted.length() - 1); - } - return possiblyQuoted; - } - protected void parseColumnDefinition(Marker start, String columnName, TokenStream tokens, TableEditor table, ColumnEditor column, AtomicBoolean isPrimaryKey) { // Parse the data type, which must be at this location ... @@ -1153,6 +1139,7 @@ protected void parseCreateIndex(Marker start) { // We don't care about any other statements or the rest of this statement ... consumeRemainingStatement(start); + // TODO fix: signal should be send only when some changes on table are made signalCreateIndex(indexName, tableId, start); debugParsed(start); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index e2bfe57df..5572281fe 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -18,11 +18,11 @@ import org.slf4j.LoggerFactory; import io.debezium.annotation.NotThreadSafe; -import io.debezium.antlr.mysql.MySqlAntlrDdlParser; import io.debezium.antlr.mysql.MySqlSystemVariables.MySqlScope; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode; import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode; +import io.debezium.connector.mysql.MySqlConnectorConfig.DdlParsingMode; import io.debezium.document.Document; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; @@ -98,14 +98,17 @@ public MySqlSchema(Configuration config, String serverName, Predicate gt this.topicSelector = topicSelector; this.tableIdCaseInsensitive = tableIdCaseInsensitive; - // TODO rkuchar: implement connector configuration using enum to define which parser should be used - if(true) { - this.ddlParser = new MySqlDdlParser(); - } else { - this.ddlParser = new MySqlAntlrDdlParser(); - } + String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE); + DdlParsingMode parsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString()); - this.ddlChanges = this.ddlParser.getDdlChanges(); + try { + this.ddlParser = parsingMode.getParserClass().newInstance(); + this.ddlChanges = this.ddlParser.getDdlChanges(); + } + catch (InstantiationException | IllegalAccessException e) { + // ddl parser constructor are not throwing any exceptions, so this should never happen + throw new IllegalArgumentException("Unable to create new instance for ddl parser class " + parsingMode.getParserClass().getCanonicalName()); + } // Use MySQL-specific converters and schemas for values ... String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE); @@ -128,7 +131,8 @@ public MySqlSchema(Configuration config, String serverName, Predicate gt this.serverName = serverName; if (this.serverName == null || serverName.isEmpty()) { this.schemaPrefix = ""; - } else { + } + else { this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + "."; } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java index 89ab90f1e..42f8eee84 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java @@ -27,6 +27,7 @@ import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.DdlParserListener.Event; import io.debezium.relational.ddl.SimpleDdlParserListener; import io.debezium.util.IoUtil; @@ -42,7 +43,7 @@ public class MySqlDdlParserTest { @Before public void beforeEach() { listener = new SimpleDdlParserListener(); - parser = new MysqlDdlParserWithTest(); + parser = new MysqlDdlParserWithSimpleTestListener(listener); tables = new Tables(); } @@ -1629,10 +1630,10 @@ protected void assertColumn(Table table, String name, String typeName, int jdbcT assertThat(column.isAutoIncremented()).isEqualTo(autoIncremented); } - class MysqlDdlParserWithTest extends MySqlDdlParser { - public MysqlDdlParserWithTest() { + class MysqlDdlParserWithSimpleTestListener extends MySqlDdlParser { + public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener) { super(); - this.ddlChanges = listener; + this.ddlChanges = changesListener; } } diff --git a/debezium-core/src/main/java/io/debezium/relational/Tables.java b/debezium-core/src/main/java/io/debezium/relational/Tables.java index 4e8b5dacf..89de8f61b 100644 --- a/debezium-core/src/main/java/io/debezium/relational/Tables.java +++ b/debezium-core/src/main/java/io/debezium/relational/Tables.java @@ -7,6 +7,7 @@ import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -168,6 +169,20 @@ public Table overwriteTable(Table table) { }); } + public void removeTablesForDatabase(String schemaName) { + removeTablesForDatabase(schemaName, null); + } + + public void removeTablesForDatabase(String catalogName, String schemaName) { + lock.write(() -> { + tablesByTableId.entrySet().removeIf(tableIdTableEntry -> { + TableId tableId = tableIdTableEntry.getKey(); + return (schemaName == null || tableId.schema() == null || schemaName.equals(tableId.schema())) + & (catalogName == null || tableId.catalog() == null || catalogName.equals(tableId.catalog())); + }); + }); + } + /** * Rename an existing table. * @@ -381,6 +396,10 @@ void forEach(BiConsumer action) { values.forEach(action); } + Set> entrySet() { + return values.entrySet(); + } + private TableId toLowerCaseIfNeeded(TableId tableId) { return tableIdCaseInsensitive ? tableId.toLowercase() : tableId; } diff --git a/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java b/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java index ccf1a1522..e54c2e6ea 100644 --- a/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java +++ b/debezium-core/src/main/java/io/debezium/relational/ddl/AbstractDdlParser.java @@ -248,10 +248,39 @@ protected void signalDropIndex(String indexName, TableId id, String statement) { signalChangeEvent(new DdlParserListener.TableIndexDroppedEvent(indexName, id, statement)); } + /** + * Removes line feeds from input string. + * + * @param input input with possible line feeds + * @return input string without line feeds + */ protected String removeLineFeeds(String input) { return input.replaceAll("[\\n|\\t]", ""); } + /** + * Cut out the string surrounded with single, double and reversed quotes. + * + * @param possiblyQuoted string with possible quotes + * @return string without quotes + */ + protected static String withoutQuotes(String possiblyQuoted) { + if (possiblyQuoted.length() < 2) { + // Too short to be quoted ... + return possiblyQuoted; + } + if (possiblyQuoted.startsWith("`") && possiblyQuoted.endsWith("`")) { + return possiblyQuoted.substring(1, possiblyQuoted.length() - 1); + } + if (possiblyQuoted.startsWith("'") && possiblyQuoted.endsWith("'")) { + return possiblyQuoted.substring(1, possiblyQuoted.length() - 1); + } + if (possiblyQuoted.startsWith("\"") && possiblyQuoted.endsWith("\"")) { + return possiblyQuoted.substring(1, possiblyQuoted.length() - 1); + } + return possiblyQuoted; + } + /** * Utility method to accumulate a parsing exception. * diff --git a/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java b/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java index e6de24185..62fdc6694 100644 --- a/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java +++ b/debezium-ddl-parser/src/main/java/io/debezium/antlr/AntlrDdlParser.java @@ -250,4 +250,8 @@ protected void debugParsed(ParserRuleContext ctx) { protected void debugSkipped(ParserRuleContext ctx) { debugSkipped(getText(ctx)); } + + protected String withoutQuotes(ParserRuleContext ctx) { + return withoutQuotes(ctx.getText()); + } } diff --git a/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java b/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java index 57d169b1b..9d7c1e5e1 100644 --- a/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java +++ b/debezium-ddl-parser/src/main/java/io/debezium/antlr/mysql/MySqlAntlrDdlParser.java @@ -29,6 +29,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; /** @@ -39,6 +41,8 @@ public class MySqlAntlrDdlParser extends AntlrDdlParser private TableEditor tableEditor; private ColumnEditor columnEditor; + private final ConcurrentMap charsetNameForDatabase = new ConcurrentHashMap<>(); + public MySqlAntlrDdlParser() { super(); systemVariables = new MySqlSystemVariables(); @@ -51,10 +55,12 @@ protected ParseTree parseTree(MySqlParser parser) { @Override protected void assignParserListeners(ProxyParseTreeListener proxyParseTreeListener) { + proxyParseTreeListener.add(new DatabaseOptionsListener()); + proxyParseTreeListener.add(new DropDatabaseParserListener()); proxyParseTreeListener.add(new ColumnDefinitionParserListener()); proxyParseTreeListener.add(new CreateTableParserListener()); - proxyParseTreeListener.add(new DropTableParserListener()); proxyParseTreeListener.add(new AlterTableParserListener()); + proxyParseTreeListener.add(new DropTableParserListener()); proxyParseTreeListener.add(new RenameTableParserListener()); proxyParseTreeListener.add(new TruncateTableParserListener()); proxyParseTreeListener.add(new CreateUniqueIndexParserListener()); @@ -153,16 +159,16 @@ private TableId parseQualifiedTableId(MySqlParser.TableNameContext tableNameCont String fullTableName = tableNameContext.fullId().getText(); int dotIndex; if ((dotIndex = fullTableName.indexOf(".")) > 0) { - return resolveTableId(fullTableName.substring(0, dotIndex), - fullTableName.substring(dotIndex + 1, fullTableName.length())); + return resolveTableId(withoutQuotes(fullTableName.substring(0, dotIndex)), + withoutQuotes(fullTableName.substring(dotIndex + 1, fullTableName.length()))); } else { - return resolveTableId(currentSchema(), fullTableName); + return resolveTableId(currentSchema(), withoutQuotes(fullTableName)); } } - private String parseColumnName(MySqlParser.UidContext uidContext) { - return uidContext.getText(); + private String parseName(MySqlParser.UidContext uidContext) { + return withoutQuotes(uidContext); } private String getFullTableName(TableId tableId) { @@ -295,7 +301,7 @@ private void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext in List pkColumnNames = indexColumnNamesContext.indexColumnName().stream() .map(indexColumnNameContext -> { // MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ... - String columnName = parseColumnName(indexColumnNameContext.uid()); + String columnName = parseName(indexColumnNameContext.uid()); Column column = tableEditor.columnWithName(columnName); if (column != null && column.isOptional()) { tableEditor.addColumn(column.edit().optional(false).create()); @@ -307,6 +313,59 @@ private void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext in tableEditor.setPrimaryKeyNames(pkColumnNames); } + /** + * Parser listener for MySQL create database query to get database charsetName. + */ + private class DatabaseOptionsListener extends MySqlParserBaseListener { + + private String databaseName; + + @Override + public void enterCreateDatabase(MySqlParser.CreateDatabaseContext ctx) { + databaseName = parseName(ctx.uid()); + super.enterCreateDatabase(ctx); + } + + @Override + public void exitCreateDatabase(MySqlParser.CreateDatabaseContext ctx) { + signalCreateDatabase(databaseName, ctx); + super.exitCreateDatabase(ctx); + } + + @Override + public void enterAlterSimpleDatabase(MySqlParser.AlterSimpleDatabaseContext ctx) { + databaseName = ctx.uid() == null ? currentSchema() : parseName(ctx.uid()); + super.enterAlterSimpleDatabase(ctx); + } + + @Override + public void enterCreateDatabaseOption(MySqlParser.CreateDatabaseOptionContext ctx) { + if(ctx.charsetName() != null) { + String charsetName = withoutQuotes(ctx.charsetName()); + if ("DEFAULT".equalsIgnoreCase(charsetName)) { + charsetName = systemVariables.getVariable(MySqlSystemVariables.CHARSET_NAME_SERVER); + } + charsetNameForDatabase.put(databaseName, charsetName); + } + super.enterCreateDatabaseOption(ctx); + } + } + + /** + * Parser listener fro MySQL drop database queries. + */ + class DropDatabaseParserListener extends MySqlParserBaseListener { + + @Override + public void enterDropDatabase(MySqlParser.DropDatabaseContext ctx) { + String databaseName = parseName(ctx.uid()); + databaseTables.removeTablesForDatabase(databaseName); + charsetNameForDatabase.remove(databaseName); + signalDropDatabase(databaseName, ctx); + super.enterDropDatabase(ctx); + } + } + /** * Parser listener for MySQL create table queries. */ @@ -340,7 +399,7 @@ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) { @Override public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { - String columnName = parseColumnName(ctx.uid()); + String columnName = parseName(ctx.uid()); columnEditor = Column.editor().name(columnName); super.enterColumnDeclaration(ctx); } @@ -419,7 +478,7 @@ public void exitAlterTable(MySqlParser.AlterTableContext ctx) { @Override public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { runIfTableEditorNotNull(() -> { - String columnName = parseColumnName(ctx.uid(0)); + String columnName = parseName(ctx.uid(0)); columnEditor = Column.editor().name(columnName); }); super.exitAlterByAddColumn(ctx); @@ -435,7 +494,7 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { tableEditor.reorderColumn(columnName, null); } else if (ctx.AFTER() != null) { - String afterColumn = parseColumnName(ctx.uid(1)); + String afterColumn = parseName(ctx.uid(1)); tableEditor.reorderColumn(columnName, afterColumn); } }); @@ -448,7 +507,7 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { runIfTableEditorNotNull(() -> { columnEditors = new ArrayList<>(ctx.uid().size()); for (MySqlParser.UidContext uidContext : ctx.uid()) { - String columnName = parseColumnName(uidContext); + String columnName = parseName(uidContext); columnEditors.add(Column.editor().name(columnName)); } columnEditor = columnEditors.get(0); @@ -485,7 +544,7 @@ public void exitAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { @Override public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { runIfTableEditorNotNull(() -> { - String oldColumnName = parseColumnName(ctx.oldColumn); + String oldColumnName = parseName(ctx.oldColumn); Column existingColumn = tableEditor.columnWithName(oldColumnName); if (existingColumn != null) { columnEditor = existingColumn.edit(); @@ -502,14 +561,14 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) { runIfAllEditorsNotNull(() -> { tableEditor.addColumn(columnEditor.create()); - String newColumnName = parseColumnName(ctx.newColumn); + String newColumnName = parseName(ctx.newColumn); tableEditor.renameColumn(columnEditor.name(), newColumnName); if (ctx.FIRST() != null) { tableEditor.reorderColumn(newColumnName, null); } else if (ctx.afterColumn != null) { - tableEditor.reorderColumn(newColumnName, parseColumnName(ctx.afterColumn)); + tableEditor.reorderColumn(newColumnName, parseName(ctx.afterColumn)); } }); super.exitAlterByChangeColumn(ctx); @@ -518,7 +577,7 @@ else if (ctx.afterColumn != null) { @Override public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) { runIfTableEditorNotNull(() -> { - String columnName = parseColumnName(ctx.uid(0)); + String columnName = parseName(ctx.uid(0)); Column column = tableEditor.columnWithName(columnName); if (column != null) { columnEditor = column.edit(); @@ -540,7 +599,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) tableEditor.reorderColumn(columnEditor.name(), null); } else if (ctx.AFTER() != null) { - String afterColumn = parseColumnName(ctx.uid(1)); + String afterColumn = parseName(ctx.uid(1)); tableEditor.reorderColumn(columnEditor.name(), afterColumn); } }); @@ -549,14 +608,15 @@ else if (ctx.AFTER() != null) { @Override public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { - runIfTableEditorNotNull(() -> tableEditor.removeColumn(parseColumnName(ctx.uid()))); + runIfTableEditorNotNull(() -> tableEditor.removeColumn(parseName(ctx.uid()))); super.enterAlterByDropColumn(ctx); } @Override public void enterAlterByRename(MySqlParser.AlterByRenameContext ctx) { runIfTableEditorNotNull(() -> { - TableId newTableId = resolveTableId(currentSchema(), ctx.uid().getText()); +// TODO rkuchar: test uid + TableId newTableId = resolveTableId(currentSchema(), parseName(ctx.uid())); databaseTables.renameTable(tableEditor.tableId(), newTableId); }); super.enterAlterByRename(ctx); @@ -565,7 +625,7 @@ public void enterAlterByRename(MySqlParser.AlterByRenameContext ctx) { @Override public void enterAlterByChangeDefault(MySqlParser.AlterByChangeDefaultContext ctx) { runIfTableEditorNotNull(() -> { - String columnName = parseColumnName(ctx.uid()); + String columnName = parseName(ctx.uid()); Column column = tableEditor.columnWithName(columnName); if (column != null) { ColumnEditor columnEditor = column.edit(); @@ -698,13 +758,14 @@ public void enterCreateIndex(MySqlParser.CreateIndexContext ctx) { if (tableEditor != null) { if (!tableEditor.hasPrimaryKey()) { parsePrimaryIndexColumnNames(ctx.indexColumnNames()); - signalAlterTable(tableId, null, ctx); } } else { throw new ParsingException(null, "Trying to create index on non existing table " + getFullTableName(tableId)); } } + // TODO fixed together with MySql legacy parser bug. + signalAlterTable(null, null, ctx); super.enterCreateIndex(ctx); } } diff --git a/debezium-ddl-parser/src/test/java/io/debezium/antlr/mysql/MySqlAntlrDdlParserTest.java b/debezium-ddl-parser/src/test/java/io/debezium/antlr/mysql/MySqlAntlrDdlParserTest.java index a8a9ae33e..d47daedaa 100644 --- a/debezium-ddl-parser/src/test/java/io/debezium/antlr/mysql/MySqlAntlrDdlParserTest.java +++ b/debezium-ddl-parser/src/test/java/io/debezium/antlr/mysql/MySqlAntlrDdlParserTest.java @@ -7,6 +7,7 @@ package io.debezium.antlr.mysql; import io.debezium.relational.Tables; +import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.ddl.SimpleDdlParserListener; import io.debezium.text.MultipleParsingExceptions; @@ -26,9 +27,9 @@ public class MySqlAntlrDdlParserTest { @Before public void beforeEach() { - parser = new MySqlAntlrDdlParserWithSimpleTestListener(); + listener = new SimpleDdlParserListener(); + parser = new MySqlAntlrDdlParserWithSimpleTestListener(listener); tables = new Tables(); - listener = (SimpleDdlParserListener) parser.getDdlChanges(); } @Test @@ -63,9 +64,9 @@ public void shouldParseAlterStatementsWithoutCreate() { class MySqlAntlrDdlParserWithSimpleTestListener extends MySqlAntlrDdlParser { - public MySqlAntlrDdlParserWithSimpleTestListener() { + public MySqlAntlrDdlParserWithSimpleTestListener(DdlChanges changesListener) { super(); - ddlChanges = new SimpleDdlParserListener(); + this.ddlChanges = changesListener; } }