DBZ-252 parse database queries (create, alter, delete), parse name tokens without quotes, add new Mysql connector configuration field for choosing ddl parsing mode

This commit is contained in:
rkuchar 2018-04-13 11:46:44 +02:00 committed by Gunnar Morling
parent 5d911d511f
commit b4480c55b7
9 changed files with 227 additions and 52 deletions

View File

@ -10,6 +10,8 @@
import java.util.Objects;
import java.util.Random;
import io.debezium.antlr.mysql.MySqlAntlrDdlParser;
import io.debezium.relational.ddl.DdlParser;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
@ -452,6 +454,60 @@ public static EventProcessingFailureHandlingMode parse(String value) {
}
}
public static enum DdlParsingMode implements EnumeratedValue {
LEGACY("legacy", MySqlDdlParser.class),
ANTLR("antlr", MySqlAntlrDdlParser.class);
private final String value;
private final Class<? extends DdlParser> parserClass;
private DdlParsingMode(String value, Class<? extends DdlParser> parserClass) {
this.value = value;
this.parserClass = parserClass;
}
@Override
public String getValue() {
return value;
}
public Class<? extends DdlParser> getParserClass() {
return parserClass;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static DdlParsingMode parse(String value) {
if (value == null) return null;
value = value.trim();
for (DdlParsingMode option : DdlParsingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static DdlParsingMode parse(String value, String defaultValue) {
DdlParsingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
private static final String DATABASE_WHITELIST_NAME = "database.whitelist";
private static final String TABLE_WHITELIST_NAME = "table.whitelist";
private static final String TABLE_IGNORE_BUILTIN_NAME = "table.ignore.builtin";
@ -903,6 +959,18 @@ public static EventProcessingFailureHandlingMode parse(String value) {
.withDefault(0L)
.withValidation(Field::isNonNegativeLong);
public static final Field DDL_PARSER_MODE = Field.create("ddl.parser.mode")
.withDisplayName("Ddl parser mode")
.withEnum(DdlParsingMode.class, DdlParsingMode.LEGACY)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
// TODO rkuchar: change description
.withDescription("Time, date, and timestamps can be represented with different kinds of precisions, including:"
+ "'adaptive_time_microseconds' (the default) like 'adaptive' mode, but TIME fields always use microseconds precision;"
+ "'adaptive' (deprecated) bases the precision of time, date, and timestamp values on the database column's precision; "
+ "'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, "
+ "which uses millisecond precision regardless of the database columns' precision.");
/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values truncated to be no longer than the specified number of characters.
@ -957,6 +1025,7 @@ public static final Field MASK_COLUMN(int length) {
EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
INCONSISTENT_SCHEMA_HANDLING_MODE,
SNAPSHOT_DELAY_MS,
DDL_PARSER_MODE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
/**
@ -1013,7 +1082,7 @@ protected static ConfigDef configDef() {
Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE,
CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS,
SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE,
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS);
BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, DDL_PARSER_MODE);
return config;
}

View File

@ -835,20 +835,6 @@ public static List<String> parseSetAndEnumOptions(String typeExpression) {
return options;
}
protected static String withoutQuotes(String possiblyQuoted) {
if (possiblyQuoted.length() < 2) {
// Too short to be quoted ...
return possiblyQuoted;
}
if (possiblyQuoted.startsWith("'") && possiblyQuoted.endsWith("'")) {
return possiblyQuoted.substring(1, possiblyQuoted.length() - 1);
}
if (possiblyQuoted.startsWith("\"") && possiblyQuoted.endsWith("\"")) {
return possiblyQuoted.substring(1, possiblyQuoted.length() - 1);
}
return possiblyQuoted;
}
protected void parseColumnDefinition(Marker start, String columnName, TokenStream tokens, TableEditor table, ColumnEditor column,
AtomicBoolean isPrimaryKey) {
// Parse the data type, which must be at this location ...
@ -1153,6 +1139,7 @@ protected void parseCreateIndex(Marker start) {
// We don't care about any other statements or the rest of this statement ...
consumeRemainingStatement(start);
// TODO fix: signal should be send only when some changes on table are made
signalCreateIndex(indexName, tableId, start);
debugParsed(start);
}

View File

@ -18,11 +18,11 @@
import org.slf4j.LoggerFactory;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.antlr.mysql.MySqlAntlrDdlParser;
import io.debezium.antlr.mysql.MySqlSystemVariables.MySqlScope;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.DdlParsingMode;
import io.debezium.document.Document;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
@ -98,14 +98,17 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
this.topicSelector = topicSelector;
this.tableIdCaseInsensitive = tableIdCaseInsensitive;
// TODO rkuchar: implement connector configuration using enum to define which parser should be used
if(true) {
this.ddlParser = new MySqlDdlParser();
} else {
this.ddlParser = new MySqlAntlrDdlParser();
}
String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE);
DdlParsingMode parsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString());
this.ddlChanges = this.ddlParser.getDdlChanges();
try {
this.ddlParser = parsingMode.getParserClass().newInstance();
this.ddlChanges = this.ddlParser.getDdlChanges();
}
catch (InstantiationException | IllegalAccessException e) {
// ddl parser constructor are not throwing any exceptions, so this should never happen
throw new IllegalArgumentException("Unable to create new instance for ddl parser class " + parsingMode.getParserClass().getCanonicalName());
}
// Use MySQL-specific converters and schemas for values ...
String timePrecisionModeStr = config.getString(MySqlConnectorConfig.TIME_PRECISION_MODE);
@ -128,7 +131,8 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
this.serverName = serverName;
if (this.serverName == null || serverName.isEmpty()) {
this.schemaPrefix = "";
} else {
}
else {
this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + ".";
}

View File

@ -27,6 +27,7 @@
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParserListener.Event;
import io.debezium.relational.ddl.SimpleDdlParserListener;
import io.debezium.util.IoUtil;
@ -42,7 +43,7 @@ public class MySqlDdlParserTest {
@Before
public void beforeEach() {
listener = new SimpleDdlParserListener();
parser = new MysqlDdlParserWithTest();
parser = new MysqlDdlParserWithSimpleTestListener(listener);
tables = new Tables();
}
@ -1629,10 +1630,10 @@ protected void assertColumn(Table table, String name, String typeName, int jdbcT
assertThat(column.isAutoIncremented()).isEqualTo(autoIncremented);
}
class MysqlDdlParserWithTest extends MySqlDdlParser {
public MysqlDdlParserWithTest() {
class MysqlDdlParserWithSimpleTestListener extends MySqlDdlParser {
public MysqlDdlParserWithSimpleTestListener(DdlChanges changesListener) {
super();
this.ddlChanges = listener;
this.ddlChanges = changesListener;
}
}

View File

@ -7,6 +7,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -168,6 +169,20 @@ public Table overwriteTable(Table table) {
});
}
public void removeTablesForDatabase(String schemaName) {
removeTablesForDatabase(schemaName, null);
}
public void removeTablesForDatabase(String catalogName, String schemaName) {
lock.write(() -> {
tablesByTableId.entrySet().removeIf(tableIdTableEntry -> {
TableId tableId = tableIdTableEntry.getKey();
return (schemaName == null || tableId.schema() == null || schemaName.equals(tableId.schema()))
& (catalogName == null || tableId.catalog() == null || catalogName.equals(tableId.catalog()));
});
});
}
/**
* Rename an existing table.
*
@ -381,6 +396,10 @@ void forEach(BiConsumer<? super TableId, ? super TableImpl> action) {
values.forEach(action);
}
Set<Map.Entry<TableId, TableImpl>> entrySet() {
return values.entrySet();
}
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

View File

@ -248,10 +248,39 @@ protected void signalDropIndex(String indexName, TableId id, String statement) {
signalChangeEvent(new DdlParserListener.TableIndexDroppedEvent(indexName, id, statement));
}
/**
* Removes line feeds from input string.
*
* @param input input with possible line feeds
* @return input string without line feeds
*/
protected String removeLineFeeds(String input) {
return input.replaceAll("[\\n|\\t]", "");
}
/**
* Cut out the string surrounded with single, double and reversed quotes.
*
* @param possiblyQuoted string with possible quotes
* @return string without quotes
*/
protected static String withoutQuotes(String possiblyQuoted) {
if (possiblyQuoted.length() < 2) {
// Too short to be quoted ...
return possiblyQuoted;
}
if (possiblyQuoted.startsWith("`") && possiblyQuoted.endsWith("`")) {
return possiblyQuoted.substring(1, possiblyQuoted.length() - 1);
}
if (possiblyQuoted.startsWith("'") && possiblyQuoted.endsWith("'")) {
return possiblyQuoted.substring(1, possiblyQuoted.length() - 1);
}
if (possiblyQuoted.startsWith("\"") && possiblyQuoted.endsWith("\"")) {
return possiblyQuoted.substring(1, possiblyQuoted.length() - 1);
}
return possiblyQuoted;
}
/**
* Utility method to accumulate a parsing exception.
*

View File

@ -250,4 +250,8 @@ protected void debugParsed(ParserRuleContext ctx) {
protected void debugSkipped(ParserRuleContext ctx) {
debugSkipped(getText(ctx));
}
protected String withoutQuotes(ParserRuleContext ctx) {
return withoutQuotes(ctx.getText());
}
}

View File

@ -29,6 +29,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
@ -39,6 +41,8 @@ public class MySqlAntlrDdlParser extends AntlrDdlParser<MySqlLexer, MySqlParser>
private TableEditor tableEditor;
private ColumnEditor columnEditor;
private final ConcurrentMap<String, String> charsetNameForDatabase = new ConcurrentHashMap<>();
public MySqlAntlrDdlParser() {
super();
systemVariables = new MySqlSystemVariables();
@ -51,10 +55,12 @@ protected ParseTree parseTree(MySqlParser parser) {
@Override
protected void assignParserListeners(ProxyParseTreeListener proxyParseTreeListener) {
proxyParseTreeListener.add(new DatabaseOptionsListener());
proxyParseTreeListener.add(new DropDatabaseParserListener());
proxyParseTreeListener.add(new ColumnDefinitionParserListener());
proxyParseTreeListener.add(new CreateTableParserListener());
proxyParseTreeListener.add(new DropTableParserListener());
proxyParseTreeListener.add(new AlterTableParserListener());
proxyParseTreeListener.add(new DropTableParserListener());
proxyParseTreeListener.add(new RenameTableParserListener());
proxyParseTreeListener.add(new TruncateTableParserListener());
proxyParseTreeListener.add(new CreateUniqueIndexParserListener());
@ -153,16 +159,16 @@ private TableId parseQualifiedTableId(MySqlParser.TableNameContext tableNameCont
String fullTableName = tableNameContext.fullId().getText();
int dotIndex;
if ((dotIndex = fullTableName.indexOf(".")) > 0) {
return resolveTableId(fullTableName.substring(0, dotIndex),
fullTableName.substring(dotIndex + 1, fullTableName.length()));
return resolveTableId(withoutQuotes(fullTableName.substring(0, dotIndex)),
withoutQuotes(fullTableName.substring(dotIndex + 1, fullTableName.length())));
}
else {
return resolveTableId(currentSchema(), fullTableName);
return resolveTableId(currentSchema(), withoutQuotes(fullTableName));
}
}
private String parseColumnName(MySqlParser.UidContext uidContext) {
return uidContext.getText();
private String parseName(MySqlParser.UidContext uidContext) {
return withoutQuotes(uidContext);
}
private String getFullTableName(TableId tableId) {
@ -295,7 +301,7 @@ private void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext in
List<String> pkColumnNames = indexColumnNamesContext.indexColumnName().stream()
.map(indexColumnNameContext -> {
// MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ...
String columnName = parseColumnName(indexColumnNameContext.uid());
String columnName = parseName(indexColumnNameContext.uid());
Column column = tableEditor.columnWithName(columnName);
if (column != null && column.isOptional()) {
tableEditor.addColumn(column.edit().optional(false).create());
@ -307,6 +313,59 @@ private void parsePrimaryIndexColumnNames(MySqlParser.IndexColumnNamesContext in
tableEditor.setPrimaryKeyNames(pkColumnNames);
}
/**
* Parser listener for MySQL create database query to get database charsetName.
*/
private class DatabaseOptionsListener extends MySqlParserBaseListener {
private String databaseName;
@Override
public void enterCreateDatabase(MySqlParser.CreateDatabaseContext ctx) {
databaseName = parseName(ctx.uid());
super.enterCreateDatabase(ctx);
}
@Override
public void exitCreateDatabase(MySqlParser.CreateDatabaseContext ctx) {
signalCreateDatabase(databaseName, ctx);
super.exitCreateDatabase(ctx);
}
@Override
public void enterAlterSimpleDatabase(MySqlParser.AlterSimpleDatabaseContext ctx) {
databaseName = ctx.uid() == null ? currentSchema() : parseName(ctx.uid());
super.enterAlterSimpleDatabase(ctx);
}
@Override
public void enterCreateDatabaseOption(MySqlParser.CreateDatabaseOptionContext ctx) {
if(ctx.charsetName() != null) {
String charsetName = withoutQuotes(ctx.charsetName());
if ("DEFAULT".equalsIgnoreCase(charsetName)) {
charsetName = systemVariables.getVariable(MySqlSystemVariables.CHARSET_NAME_SERVER);
}
charsetNameForDatabase.put(databaseName, charsetName);
}
super.enterCreateDatabaseOption(ctx);
}
}
/**
* Parser listener fro MySQL drop database queries.
*/
class DropDatabaseParserListener extends MySqlParserBaseListener {
@Override
public void enterDropDatabase(MySqlParser.DropDatabaseContext ctx) {
String databaseName = parseName(ctx.uid());
databaseTables.removeTablesForDatabase(databaseName);
charsetNameForDatabase.remove(databaseName);
signalDropDatabase(databaseName, ctx);
super.enterDropDatabase(ctx);
}
}
/**
* Parser listener for MySQL create table queries.
*/
@ -340,7 +399,7 @@ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
String columnName = parseColumnName(ctx.uid());
String columnName = parseName(ctx.uid());
columnEditor = Column.editor().name(columnName);
super.enterColumnDeclaration(ctx);
}
@ -419,7 +478,7 @@ public void exitAlterTable(MySqlParser.AlterTableContext ctx) {
@Override
public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
runIfTableEditorNotNull(() -> {
String columnName = parseColumnName(ctx.uid(0));
String columnName = parseName(ctx.uid(0));
columnEditor = Column.editor().name(columnName);
});
super.exitAlterByAddColumn(ctx);
@ -435,7 +494,7 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
tableEditor.reorderColumn(columnName, null);
}
else if (ctx.AFTER() != null) {
String afterColumn = parseColumnName(ctx.uid(1));
String afterColumn = parseName(ctx.uid(1));
tableEditor.reorderColumn(columnName, afterColumn);
}
});
@ -448,7 +507,7 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
runIfTableEditorNotNull(() -> {
columnEditors = new ArrayList<>(ctx.uid().size());
for (MySqlParser.UidContext uidContext : ctx.uid()) {
String columnName = parseColumnName(uidContext);
String columnName = parseName(uidContext);
columnEditors.add(Column.editor().name(columnName));
}
columnEditor = columnEditors.get(0);
@ -485,7 +544,7 @@ public void exitAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
@Override
public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) {
runIfTableEditorNotNull(() -> {
String oldColumnName = parseColumnName(ctx.oldColumn);
String oldColumnName = parseName(ctx.oldColumn);
Column existingColumn = tableEditor.columnWithName(oldColumnName);
if (existingColumn != null) {
columnEditor = existingColumn.edit();
@ -502,14 +561,14 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) {
runIfAllEditorsNotNull(() -> {
tableEditor.addColumn(columnEditor.create());
String newColumnName = parseColumnName(ctx.newColumn);
String newColumnName = parseName(ctx.newColumn);
tableEditor.renameColumn(columnEditor.name(), newColumnName);
if (ctx.FIRST() != null) {
tableEditor.reorderColumn(newColumnName, null);
}
else if (ctx.afterColumn != null) {
tableEditor.reorderColumn(newColumnName, parseColumnName(ctx.afterColumn));
tableEditor.reorderColumn(newColumnName, parseName(ctx.afterColumn));
}
});
super.exitAlterByChangeColumn(ctx);
@ -518,7 +577,7 @@ else if (ctx.afterColumn != null) {
@Override
public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
runIfTableEditorNotNull(() -> {
String columnName = parseColumnName(ctx.uid(0));
String columnName = parseName(ctx.uid(0));
Column column = tableEditor.columnWithName(columnName);
if (column != null) {
columnEditor = column.edit();
@ -540,7 +599,7 @@ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
tableEditor.reorderColumn(columnEditor.name(), null);
}
else if (ctx.AFTER() != null) {
String afterColumn = parseColumnName(ctx.uid(1));
String afterColumn = parseName(ctx.uid(1));
tableEditor.reorderColumn(columnEditor.name(), afterColumn);
}
});
@ -549,14 +608,15 @@ else if (ctx.AFTER() != null) {
@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
runIfTableEditorNotNull(() -> tableEditor.removeColumn(parseColumnName(ctx.uid())));
runIfTableEditorNotNull(() -> tableEditor.removeColumn(parseName(ctx.uid())));
super.enterAlterByDropColumn(ctx);
}
@Override
public void enterAlterByRename(MySqlParser.AlterByRenameContext ctx) {
runIfTableEditorNotNull(() -> {
TableId newTableId = resolveTableId(currentSchema(), ctx.uid().getText());
// TODO rkuchar: test uid
TableId newTableId = resolveTableId(currentSchema(), parseName(ctx.uid()));
databaseTables.renameTable(tableEditor.tableId(), newTableId);
});
super.enterAlterByRename(ctx);
@ -565,7 +625,7 @@ public void enterAlterByRename(MySqlParser.AlterByRenameContext ctx) {
@Override
public void enterAlterByChangeDefault(MySqlParser.AlterByChangeDefaultContext ctx) {
runIfTableEditorNotNull(() -> {
String columnName = parseColumnName(ctx.uid());
String columnName = parseName(ctx.uid());
Column column = tableEditor.columnWithName(columnName);
if (column != null) {
ColumnEditor columnEditor = column.edit();
@ -698,13 +758,14 @@ public void enterCreateIndex(MySqlParser.CreateIndexContext ctx) {
if (tableEditor != null) {
if (!tableEditor.hasPrimaryKey()) {
parsePrimaryIndexColumnNames(ctx.indexColumnNames());
signalAlterTable(tableId, null, ctx);
}
}
else {
throw new ParsingException(null, "Trying to create index on non existing table " + getFullTableName(tableId));
}
}
// TODO fixed together with MySql legacy parser bug.
signalAlterTable(null, null, ctx);
super.enterCreateIndex(ctx);
}
}

View File

@ -7,6 +7,7 @@
package io.debezium.antlr.mysql;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.SimpleDdlParserListener;
import io.debezium.text.MultipleParsingExceptions;
@ -26,9 +27,9 @@ public class MySqlAntlrDdlParserTest {
@Before
public void beforeEach() {
parser = new MySqlAntlrDdlParserWithSimpleTestListener();
listener = new SimpleDdlParserListener();
parser = new MySqlAntlrDdlParserWithSimpleTestListener(listener);
tables = new Tables();
listener = (SimpleDdlParserListener) parser.getDdlChanges();
}
@Test
@ -63,9 +64,9 @@ public void shouldParseAlterStatementsWithoutCreate() {
class MySqlAntlrDdlParserWithSimpleTestListener extends MySqlAntlrDdlParser {
public MySqlAntlrDdlParserWithSimpleTestListener() {
public MySqlAntlrDdlParserWithSimpleTestListener(DdlChanges changesListener) {
super();
ddlChanges = new SimpleDdlParserListener();
this.ddlChanges = changesListener;
}
}