DBZ-200 Corrected MySQL DDL parser to better handle column definitions

Apparently not all reserved words must be quoted when using them as colum names, so refactored MySQL’s DDL parser to better handle a variety of unquoted colum names that are reserved words.
This commit is contained in:
Randall Hauch 2017-03-08 12:12:27 -06:00
parent e0a631431b
commit b48ccce4b5
4 changed files with 280 additions and 102 deletions

View File

@ -27,6 +27,7 @@
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.DdlParserListener.SetVariableEvent;
import io.debezium.relational.ddl.DdlTokenizer;
import io.debezium.text.MultipleParsingExceptions;
import io.debezium.text.ParsingException;
import io.debezium.text.TokenStream;
import io.debezium.text.TokenStream.Marker;
@ -548,88 +549,157 @@ protected void parseCreateDefinitionList(Marker start, TableEditor table) {
protected void parseCreateDefinition(Marker start, TableEditor table) {
// If the first token is a quoted identifier, then we know it is a column name ...
Collection<ParsingException> errors = null;
boolean quoted = isNextTokenQuotedIdentifier();
Marker defnStart = tokens.mark();
if (!quoted) {
// The first token is not quoted so let's check for other expressions ...
if (tokens.canConsume("CHECK")) {
// Try to parse the constraints first ...
consumeExpression(start);
return;
}
if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "PRIMARY", "KEY") || tokens.canConsume("PRIMARY", "KEY")) {
try {
if (tokens.canConsume("USING")) {
parseIndexType(start);
}
if (!tokens.matches('(')) {
tokens.consume(); // index name
}
List<String> pkColumnNames = parseIndexColumnNames(start);
table.setPrimaryKeyNames(pkColumnNames);
parseIndexOptions(start);
// MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ...
pkColumnNames.forEach(name -> {
Column c = table.columnWithName(name);
if (c.isOptional()) {
table.addColumn(c.edit().optional(false).create());
}
});
return;
} catch (ParsingException e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
} catch (MultipleParsingExceptions e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
}
}
if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "UNIQUE") || tokens.canConsume("UNIQUE")) {
tokens.canConsumeAnyOf("KEY", "INDEX");
try {
if (!tokens.matches('(')) {
if (!tokens.matches("USING")) {
tokens.consume(); // name of unique index ...
}
if (tokens.matches("USING")) {
parseIndexType(start);
}
}
List<String> uniqueKeyColumnNames = parseIndexColumnNames(start);
if (table.primaryKeyColumnNames().isEmpty()) {
table.setPrimaryKeyNames(uniqueKeyColumnNames); // this may eventually get overwritten by a real PK
}
parseIndexOptions(start);
return;
} catch (ParsingException e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
} catch (MultipleParsingExceptions e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
}
}
if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "FOREIGN", "KEY") || tokens.canConsume("FOREIGN", "KEY")) {
try {
if (!tokens.matches('(')) {
tokens.consume(); // name of foreign key
}
parseIndexColumnNames(start);
if (tokens.matches("REFERENCES")) {
parseReferenceDefinition(start);
}
return;
} catch (ParsingException e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
} catch (MultipleParsingExceptions e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
}
}
if (tokens.canConsumeAnyOf("INDEX", "KEY")) {
try {
if (!tokens.matches('(')) {
if (!tokens.matches("USING")) {
tokens.consume(); // name of unique index ...
}
if (tokens.matches("USING")) {
parseIndexType(start);
}
}
parseIndexColumnNames(start);
parseIndexOptions(start);
return;
} catch (ParsingException e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
} catch (MultipleParsingExceptions e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
}
}
if (tokens.canConsumeAnyOf("FULLTEXT", "SPATIAL")) {
try {
tokens.canConsumeAnyOf("INDEX", "KEY");
if (!tokens.matches('(')) {
tokens.consume(); // name of unique index ...
}
parseIndexColumnNames(start);
parseIndexOptions(start);
return;
} catch (ParsingException e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
} catch (MultipleParsingExceptions e) {
// Invalid names, so rewind and continue
errors = accumulateParsingFailure(e, errors);
tokens.rewind(defnStart);
}
}
}
// Try to parse the constraints first ...
if (!quoted && tokens.canConsume("CHECK")) {
consumeExpression(start);
} else if (!quoted && tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "PRIMARY", "KEY")
|| tokens.canConsume("PRIMARY", "KEY")) {
if (tokens.canConsume("USING")) {
parseIndexType(start);
}
if (!tokens.matches('(')) {
tokens.consume(); // index name
}
List<String> pkColumnNames = parseIndexColumnNames(start);
table.setPrimaryKeyNames(pkColumnNames);
parseIndexOptions(start);
// MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ...
pkColumnNames.forEach(name -> {
Column c = table.columnWithName(name);
if (c.isOptional()) {
table.addColumn(c.edit().optional(false).create());
}
});
} else if (!quoted && tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "UNIQUE") || tokens.canConsume("UNIQUE")) {
tokens.canConsumeAnyOf("KEY", "INDEX");
if (!tokens.matches('(')) {
if (!tokens.matches("USING")) {
tokens.consume(); // name of unique index ...
}
if (tokens.matches("USING")) {
parseIndexType(start);
}
}
List<String> uniqueKeyColumnNames = parseIndexColumnNames(start);
if (table.primaryKeyColumnNames().isEmpty()) {
table.setPrimaryKeyNames(uniqueKeyColumnNames); // this may eventually get overwritten by a real PK
}
parseIndexOptions(start);
} else if (!quoted && tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "FOREIGN", "KEY")
|| tokens.canConsume("FOREIGN", "KEY")) {
if (!tokens.matches('(')) {
tokens.consume(); // name of foreign key
}
parseIndexColumnNames(start);
if (tokens.matches("REFERENCES")) {
parseReferenceDefinition(start);
}
} else if (!quoted && tokens.canConsumeAnyOf("INDEX", "KEY")) {
if (!tokens.matches('(')) {
if (!tokens.matches("USING")) {
tokens.consume(); // name of unique index ...
}
if (tokens.matches("USING")) {
parseIndexType(start);
}
}
parseIndexColumnNames(start);
parseIndexOptions(start);
} else if (!quoted && tokens.canConsumeAnyOf("FULLTEXT", "SPATIAL")) {
tokens.canConsumeAnyOf("INDEX", "KEY");
if (!tokens.matches('(')) {
tokens.consume(); // name of unique index ...
}
parseIndexColumnNames(start);
parseIndexOptions(start);
} else {
try {
// It's either quoted (meaning it's a column definition)
tokens.canConsume("COLUMN"); // optional
// Obtain the column editor ...
String columnName = tokens.consume();
parseCreateColumn(start, table, columnName);
// ALTER TABLE allows reordering the columns after the definition ...
if (tokens.canConsume("FIRST")) {
table.reorderColumn(columnName, null);
} else if (tokens.canConsume("AFTER")) {
table.reorderColumn(columnName, tokens.consume());
String columnName = parseColumnName();
parseCreateColumn(start, table, columnName, null);
} catch (ParsingException e) {
if (errors != null) {
errors = accumulateParsingFailure(e, errors);
throw new MultipleParsingExceptions(errors);
}
throw e;
} catch (MultipleParsingExceptions e) {
if (errors != null) {
errors = accumulateParsingFailure(e, errors);
throw new MultipleParsingExceptions(errors);
}
throw e;
}
}
protected Column parseCreateColumn(Marker start, TableEditor table, String columnName) {
protected Column parseCreateColumn(Marker start, TableEditor table, String columnName, String newColumnName) {
// Obtain the column editor ...
Column existingColumn = table.columnWithName(columnName);
ColumnEditor column = existingColumn != null ? existingColumn.edit() : Column.editor().name(columnName);
@ -643,6 +713,18 @@ protected Column parseCreateColumn(Marker start, TableEditor table, String colum
if (isPrimaryKey.get()) {
table.setPrimaryKeyNames(newColumnDefn.name());
}
if (newColumnName != null && !newColumnName.equalsIgnoreCase(columnName)) {
table.renameColumn(columnName, newColumnName);
columnName = newColumnName;
}
// ALTER TABLE allows reordering the columns after the definition ...
if (tokens.canConsume("FIRST")) {
table.reorderColumn(columnName, null);
} else if (tokens.canConsume("AFTER")) {
table.reorderColumn(columnName, tokens.consume());
}
return table.columnWithName(newColumnDefn.name());
}
@ -808,7 +890,7 @@ protected List<String> parseIndexColumnNames(Marker start) {
}
private void parseIndexColumnName(Consumer<String> name) {
name.accept(tokens.consume());
name.accept(parseColumnName());
if (tokens.canConsume('(')) {
tokens.consume(); // length
tokens.consume(')');
@ -1095,7 +1177,7 @@ protected void parseAlterSpecification(Marker start, TableEditor table, Consumer
parsePartitionNames(start);
} else {
tokens.canConsume("COLUMN");
String columnName = tokens.consume();
String columnName = parseColumnName();
table.removeColumn(columnName);
}
} else if (tokens.canConsume("ALTER")) {
@ -1107,24 +1189,13 @@ protected void parseAlterSpecification(Marker start, TableEditor table, Consumer
}
} else if (tokens.canConsume("CHANGE")) {
tokens.canConsume("COLUMN");
String oldName = tokens.consume();
String newName = tokens.consume();
parseCreateColumn(start, table, oldName); // replaces the old definition but keeps old name
table.renameColumn(oldName, newName);
if (tokens.canConsume("FIRST")) {
table.reorderColumn(newName, null);
} else if (tokens.canConsume("AFTER")) {
table.reorderColumn(newName, tokens.consume());
}
String oldName = parseColumnName();
String newName = parseColumnName();
parseCreateColumn(start, table, oldName, newName);
} else if (tokens.canConsume("MODIFY")) {
tokens.canConsume("COLUMN");
String columnName = tokens.consume();
parseCreateColumn(start, table, columnName);
if (tokens.canConsume("FIRST")) {
table.reorderColumn(columnName, null);
} else if (tokens.canConsume("AFTER")) {
table.reorderColumn(columnName, tokens.consume());
}
String columnName = parseColumnName();
parseCreateColumn(start, table, columnName, null);
} else if (tokens.canConsumeAnyOf("ALGORITHM", "LOCK")) {
tokens.canConsume('=');
tokens.consume();
@ -1316,14 +1387,27 @@ protected String currentDatabaseCharset() {
protected List<String> parseColumnNameList(Marker start) {
List<String> names = new ArrayList<>();
tokens.consume('(');
names.add(tokens.consume());
names.add(parseColumnName());
while (tokens.canConsume(',')) {
names.add(tokens.consume());
names.add(parseColumnName());
}
tokens.consume(')');
return names;
}
protected String parseColumnName() {
boolean quoted = isNextTokenQuotedIdentifier();
String name = tokens.consume();
if (!quoted) {
// Unquoted names may not consist entirely of digits
if (name.matches("[0-9]+")) {
parsingFailed(tokens.previousPosition(), "Unquoted column names may not contain only digits");
return null;
}
}
return name;
}
protected void parsePartitionNames(Marker start) {
consumeCommaSeparatedValueList(start);
}

View File

@ -494,7 +494,7 @@ public void shouldParseDefiner() {
assertThat(tables.size()).isEqualTo(0); // no tables
assertThat(listener.total()).isEqualTo(0);
}
@Test
@FixFor("DBZ-169")
public void shouldParseTimeWithNowDefault() {
@ -519,7 +519,7 @@ public void shouldParseTimeWithNowDefault() {
assertThat(t.columnWithName("c3").position()).isEqualTo(3);
assertThat(t.columnWithName("c4").position()).isEqualTo(4);
}
@Test
@FixFor("DBZ-169")
public void shouldParseCreateAndAlterWithOnUpdate() {
@ -539,7 +539,7 @@ public void shouldParseCreateAndAlterWithOnUpdate() {
parser.parse(ddl, tables);
assertThat(tables.size()).isEqualTo(2);
assertThat(listener.total()).isEqualTo(3);
Table t = tables.forTable(new TableId(null, null, "customers"));
assertThat(t).isNotNull();
assertThat(t.columnNames()).containsExactly("id", "name");
@ -762,7 +762,7 @@ public void shouldParseAlterTableWithNewlineFeeds() {
assertThat(t.primaryKeyColumnNames()).containsExactly("collection_id");
assertColumn(t, "collection_id", "INT UNSIGNED", Types.INTEGER, 11, -1, false, true, true);
}
@FixFor("DBZ-176")
@Test
public void shouldParseButIgnoreCreateTriggerWithDefiner() {
@ -772,7 +772,7 @@ public void shouldParseButIgnoreCreateTriggerWithDefiner() {
assertThat(listener.total()).isEqualTo(0);
listener.forEach(this::printEvent);
}
@FixFor("DBZ-193")
@Test
public void shouldParseFulltextKeyInCreateTable() {
@ -799,7 +799,50 @@ public void shouldParseFulltextKeyInCreateTable() {
assertThat(t.columnWithName("client_id").position()).isEqualTo(5);
assertThat(t.columnWithName("scope_action_ids").position()).isEqualTo(6);
}
@Test
public void shouldParseStatementForDbz200() {
parser.parse(readFile("ddl/mysql-dbz-200.ddl"), tables);
Testing.print(tables);
assertThat(tables.size()).isEqualTo(1);
assertThat(listener.total()).isEqualTo(1);
Table t = tables.forTable(new TableId(null, null, "customfield"));
assertThat(t).isNotNull();
assertThat(t.columnNames()).containsExactly("ENCODEDKEY", "ID", "CREATIONDATE", "LASTMODIFIEDDATE", "DATATYPE",
"ISDEFAULT", "ISREQUIRED", "NAME", "VALUES", "AMOUNTS", "DESCRIPTION",
"TYPE", "VALUELENGTH", "INDEXINLIST", "CUSTOMFIELDSET_ENCODEDKEY_OID",
"STATE", "VALIDATIONPATTERN", "VIEWUSAGERIGHTSKEY", "EDITUSAGERIGHTSKEY",
"BUILTINCUSTOMFIELDID", "UNIQUE");
assertColumn(t, "ENCODEDKEY", "VARCHAR", Types.VARCHAR, 32, -1, false, false, false);
assertColumn(t, "ID", "VARCHAR", Types.VARCHAR, 32, -1, true, false, false);
assertColumn(t, "CREATIONDATE", "DATETIME", Types.TIMESTAMP, -1, -1, true, false, false);
assertColumn(t, "LASTMODIFIEDDATE", "DATETIME", Types.TIMESTAMP, -1, -1, true, false, false);
assertColumn(t, "DATATYPE", "VARCHAR", Types.VARCHAR, 256, -1, true, false, false);
assertColumn(t, "ISDEFAULT", "BIT", Types.BIT, 1, -1, true, false, false);
assertColumn(t, "ISREQUIRED", "BIT", Types.BIT, 1, -1, true, false, false);
assertColumn(t, "NAME", "VARCHAR", Types.VARCHAR, 256, -1, true, false, false);
assertColumn(t, "VALUES", "MEDIUMBLOB", Types.BLOB, -1, -1, true, false, false);
assertColumn(t, "AMOUNTS", "MEDIUMBLOB", Types.BLOB, -1, -1, true, false, false);
assertColumn(t, "DESCRIPTION", "VARCHAR", Types.VARCHAR, 256, -1, true, false, false);
assertColumn(t, "TYPE", "VARCHAR", Types.VARCHAR, 256, -1, true, false, false);
assertColumn(t, "VALUELENGTH", "VARCHAR", Types.VARCHAR, 256, -1, false, false, false);
assertColumn(t, "INDEXINLIST", "INT", Types.INTEGER, 11, -1, true, false, false);
assertColumn(t, "CUSTOMFIELDSET_ENCODEDKEY_OID", "VARCHAR", Types.VARCHAR, 32, -1, false, false, false);
assertColumn(t, "STATE", "VARCHAR", Types.VARCHAR, 256, -1, false, false, false);
assertColumn(t, "VALIDATIONPATTERN", "VARCHAR", Types.VARCHAR, 256, -1, true, false, false);
assertColumn(t, "VIEWUSAGERIGHTSKEY", "VARCHAR", Types.VARCHAR, 32, -1, true, false, false);
assertColumn(t, "EDITUSAGERIGHTSKEY", "VARCHAR", Types.VARCHAR, 32, -1, true, false, false);
assertColumn(t, "BUILTINCUSTOMFIELDID", "VARCHAR", Types.VARCHAR, 255, -1, true, false, false);
assertColumn(t, "UNIQUE", "VARCHAR", Types.VARCHAR, 32, -1, false, false, false);
assertThat(t.columnWithName("ENCODEDKEY").position()).isEqualTo(1);
assertThat(t.columnWithName("id").position()).isEqualTo(2);
assertThat(t.columnWithName("CREATIONDATE").position()).isEqualTo(3);
assertThat(t.columnWithName("DATATYPE").position()).isEqualTo(5);
assertThat(t.columnWithName("UNIQUE").position()).isEqualTo(21);
}
@Test
public void shouldParseTicketMonsterLiquibaseStatements() {
parser.parse(readLines(1, "ddl/mysql-ticketmonster-liquibase.ddl"), tables);

View File

@ -0,0 +1,24 @@
CREATE TABLE `customfield` (
`ENCODEDKEY` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`ID` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`CREATIONDATE` datetime DEFAULT NULL,
`LASTMODIFIEDDATE` datetime DEFAULT NULL,
`DATATYPE` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`ISDEFAULT` bit(1) DEFAULT NULL,
`ISREQUIRED` bit(1) DEFAULT NULL,
`NAME` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`VALUES` mediumblob,
`AMOUNTS` mediumblob,
`DESCRIPTION` varchar(256) DEFAULT NULL,
`TYPE` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`VALUELENGTH` varchar(256) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT 'SHORT',
`INDEXINLIST` int(11) DEFAULT '-1',
`CUSTOMFIELDSET_ENCODEDKEY_OID` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`STATE` varchar(256) NOT NULL DEFAULT 'NORMAL',
`VALIDATIONPATTERN` varchar(256) DEFAULT NULL,
`VIEWUSAGERIGHTSKEY` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`EDITUSAGERIGHTSKEY` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
`BUILTINCUSTOMFIELDID` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL,
UNIQUE varchar(32) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
KEY `index1` (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

View File

@ -7,6 +7,7 @@
import java.math.BigDecimal;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -633,7 +634,7 @@ protected String consumeQuotedString() {
* @param msg the leading portion of the message; may not be null
*/
protected void parsingFailed(Position position, String msg) {
parsingFailed(position, msg);
parsingFailed(position, msg, null);
}
/**
@ -662,6 +663,32 @@ protected void parsingFailed(Position position, Collection<ParsingException> err
}
throw new MultipleParsingExceptions(msg + " at line " + position.line() + ", column " + position.column(), errors);
}
/**
* Utility method to accumulate a parsing exception.
* @param e the parsing exception
* @param list the list of previous parsing exceptions; may be null
* @return the list of previous and current parsing exceptions; if {@code e} is null then always {@code list}, but otherwise non-null list
*/
protected Collection<ParsingException> accumulateParsingFailure(ParsingException e, Collection<ParsingException> list) {
if (e == null) return list;
if (list == null) list = new ArrayList<ParsingException>();
list.add(e);
return list;
}
/**
* Utility method to accumulate a parsing exception.
* @param e the multiple parsing exceptions
* @param list the list of previous parsing exceptions; may be null
* @return the list of previous and current parsing exceptions; if {@code e} is null then always {@code list}, but otherwise non-null list
*/
protected Collection<ParsingException> accumulateParsingFailure(MultipleParsingExceptions e, Collection<ParsingException> list) {
if (e == null) return list;
if (list == null) list = new ArrayList<ParsingException>();
list.addAll(e.getErrors());
return list;
}
protected Object parseLiteral(Marker start) {
if (tokens.canConsume('_')) { // introducer