diff --git a/debezium-connector-mysql/pom.xml b/debezium-connector-mysql/pom.xml index cce811e16..3a3915a76 100644 --- a/debezium-connector-mysql/pom.xml +++ b/debezium-connector-mysql/pom.xml @@ -113,8 +113,6 @@ debezium/mysql-server-test-database false ln -s /usr/share/zoneinfo/US/Samoa /etc/localtime - - antlr @@ -327,7 +325,6 @@ ${docker.host.address} ${mysql.replica.port} ${skipLongRunningTests} - ${ddl.parser.mode} alphabetical @@ -444,7 +441,6 @@ ${mysql.port} ${mysql.port} false - ${ddl.parser.mode} alphabetical @@ -549,14 +545,5 @@ - - parser-legacy - - false - - - legacy - - diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java index d696d48ca..53a558817 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java @@ -19,14 +19,10 @@ import io.debezium.config.EnumeratedValue; import io.debezium.config.Field; import io.debezium.config.Field.ValidationOutput; -import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.heartbeat.Heartbeat; -import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; -import io.debezium.relational.Tables.TableFilter; -import io.debezium.relational.ddl.DdlParser; import io.debezium.relational.history.DatabaseHistory; import io.debezium.relational.history.KafkaDatabaseHistory; @@ -526,69 +522,6 @@ public static EventProcessingFailureHandlingMode parse(String value) { } } - public static enum DdlParsingMode implements EnumeratedValue { - - LEGACY("legacy") { - @Override - public DdlParser getNewParserInstance(JdbcValueConverters valueConverters, TableFilter tableFilter) { - return new MySqlDdlParser(false, (MySqlValueConverters) valueConverters); - } - }, - ANTLR("antlr") { - @Override - public DdlParser getNewParserInstance(JdbcValueConverters valueConverters, TableFilter tableFilter) { - return new MySqlAntlrDdlParser((MySqlValueConverters) valueConverters, tableFilter); - } - }; - - private final String value; - - private DdlParsingMode(String value) { - this.value = value; - } - - @Override - public String getValue() { - return value; - } - - public abstract DdlParser getNewParserInstance(JdbcValueConverters valueConverters, TableFilter tableFilter); - - /** - * Determine if the supplied value is one of the predefined options. - * - * @param value the configuration property value; may not be null - * @return the matching option, or null if no match is found - */ - public static DdlParsingMode parse(String value) { - if (value == null) { - return null; - } - value = value.trim(); - for (DdlParsingMode option : DdlParsingMode.values()) { - if (option.getValue().equalsIgnoreCase(value)) { - return option; - } - } - return null; - } - - /** - * Determine if the supplied value is one of the predefined options. - * - * @param value the configuration property value; may not be null - * @param defaultValue the default value; may be null - * @return the matching option, or null if no match is found and the non-null default is invalid - */ - public static DdlParsingMode parse(String value, String defaultValue) { - DdlParsingMode mode = parse(value); - if (mode == null && defaultValue != null) { - mode = parse(defaultValue); - } - return mode; - } - } - /** * {@link Integer#MIN_VALUE Minimum value} used for fetch size hint. * See DBZ-94 for details. @@ -1042,17 +975,6 @@ public static DdlParsingMode parse(String value, String defaultValue) { "The value of those properties is the select statement to use when retrieving data from the specific table during snapshotting. " + "A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted."); - public static final Field DDL_PARSER_MODE = Field.create("ddl.parser.mode") - .withDisplayName("DDL parser mode") - .withEnum(DdlParsingMode.class, DdlParsingMode.ANTLR) - .withWidth(Width.SHORT) - .withImportance(Importance.MEDIUM) - .withDescription("MySQL DDL statements can be parsed in different ways:" + - "'legacy' parsing is creating a TokenStream and comparing token by token with an expected values." + - "The decisions are made by matched token values." + - "'antlr' (the default) uses generated parser from MySQL grammar using ANTLR v4 tool which use ALL(*) algorithm for parsing." + - "This parser creates a parsing tree for DDL statement, then walks trough it and apply changes by node types in parsed tree."); - /** * Method that generates a Field for specifying that string columns whose names match a set of regular expressions should * have their values truncated to be no longer than the specified number of characters. @@ -1124,7 +1046,6 @@ public static final Field MASK_COLUMN(int length) { INCONSISTENT_SCHEMA_HANDLING_MODE, CommonConnectorConfig.SNAPSHOT_DELAY_MS, CommonConnectorConfig.SNAPSHOT_FETCH_SIZE, - DDL_PARSER_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE, ENABLE_TIME_ADJUSTER); /** @@ -1142,7 +1063,6 @@ public static final Field MASK_COLUMN(int length) { DatabaseHistory.DDL_FILTER); private final SnapshotLockingMode snapshotLockingMode; - private final DdlParsingMode ddlParsingMode; private final GtidNewChannelPosition gitIdNewChannelPosition; private final SnapshotNewTables snapshotNewTables; @@ -1167,9 +1087,6 @@ public MySqlConnectorConfig(Configuration config) { this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); } - String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE); - this.ddlParsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString()); - String gitIdNewChannelPosition = config.getString(MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION); this.gitIdNewChannelPosition = GtidNewChannelPosition.parse(gitIdNewChannelPosition, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION.defaultValueAsString()); @@ -1181,10 +1098,6 @@ public SnapshotLockingMode getSnapshotLockingMode() { return this.snapshotLockingMode; } - public DdlParsingMode getDdlParsingMode() { - return ddlParsingMode; - } - public GtidNewChannelPosition gtidNewChannelPosition() { return gitIdNewChannelPosition; } @@ -1215,7 +1128,7 @@ protected static ConfigDef configDef() { Field.group(config, "Connector", CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS, CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, SNAPSHOT_MODE, SNAPSHOT_LOCKING_MODE, SNAPSHOT_NEW_TABLES, SNAPSHOT_MINIMAL_LOCKING, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, - BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, SNAPSHOT_FETCH_SIZE, DDL_PARSER_MODE, ENABLE_TIME_ADJUSTER); + BIGINT_UNSIGNED_HANDLING_MODE, SNAPSHOT_DELAY_MS, SNAPSHOT_FETCH_SIZE, ENABLE_TIME_ADJUSTER); return config; } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java deleted file mode 100644 index 806935919..000000000 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java +++ /dev/null @@ -1,1737 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.mysql; - -import java.sql.Types; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; - -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; - -import io.debezium.annotation.NotThreadSafe; -import io.debezium.connector.mysql.MySqlSystemVariables.MySqlScope; -import io.debezium.relational.Column; -import io.debezium.relational.ColumnEditor; -import io.debezium.relational.SystemVariables; -import io.debezium.relational.Table; -import io.debezium.relational.TableEditor; -import io.debezium.relational.TableId; -import io.debezium.relational.ValueConverter; -import io.debezium.relational.ddl.DataType; -import io.debezium.relational.ddl.DataTypeParser; -import io.debezium.relational.ddl.DdlParserListener.SetVariableEvent; -import io.debezium.relational.ddl.DdlTokenizer; -import io.debezium.relational.ddl.LegacyDdlParser; -import io.debezium.text.MultipleParsingExceptions; -import io.debezium.text.ParsingException; -import io.debezium.text.TokenStream; -import io.debezium.text.TokenStream.Marker; - -/** - * A parser for DDL statements. - *

- * See the MySQL SQL Syntax documentation for - * the grammar supported by this parser. - * - * @author Randall Hauch - */ -@NotThreadSafe -public class MySqlDdlParser extends LegacyDdlParser { - - /** - * The system variable name for the name of the character set that the server uses by default. - * See http://dev.mysql.com/doc/refman/5.7/en/server-options.html#option_mysqld_character-set-server - */ - private static final String SERVER_CHARSET_NAME = MySqlSystemVariables.CHARSET_NAME_SERVER; - - private final ConcurrentMap charsetNameForDatabase = new ConcurrentHashMap<>(); - private MySqlValueConverters converters = null; - private final MySqlDefaultValuePreConverter defaultValuePreConverter = new MySqlDefaultValuePreConverter(); - - /** - * Create a new DDL parser for MySQL that does not include view definitions. - */ - public MySqlDdlParser() { - super(";"); - } - - /** - * Create a new DDL parser for MySQL. - * - * @param includeViews {@code true} if view definitions should be included, or {@code false} if they should be skipped - */ - public MySqlDdlParser(boolean includeViews) { - super(";", includeViews); - } - - @Override - protected SystemVariables createNewSystemVariablesInstance() { - return new MySqlSystemVariables(); - } - - protected MySqlDdlParser(boolean includeViews, MySqlValueConverters converters) { - super(";", includeViews); - this.converters = converters; - systemVariables = new MySqlSystemVariables(); - } - - @Override - protected void initializeDataTypes(DataTypeParser dataTypes) { - dataTypes.register(Types.BIT, "BIT[(L)]"); - // MySQL unsigned TINYINTs can be mapped to JDBC TINYINT, but signed values don't map. Therefore, per JDBC spec - // the best mapping for all TINYINT values is JDBC's SMALLINT, which maps to a short. - dataTypes.register(Types.SMALLINT, "TINYINT[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.SMALLINT, "SMALLINT[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.INTEGER, "MEDIUMINT[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.INTEGER, "INT[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.INTEGER, "INTEGER[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.BIGINT, "BIGINT[(L)] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.REAL, "REAL[(M[,D])] [UNSIGNED] [ZEROFILL]"); - dataTypes.register(Types.DOUBLE, "DOUBLE[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.DOUBLE, "DOUBLE PRECISION[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.FLOAT, "FLOAT[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.DECIMAL, "DECIMAL[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.DECIMAL, "FIXED[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.DECIMAL, "DEC[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.NUMERIC, "NUMERIC[(M[,D])] [UNSIGNED|SIGNED] [ZEROFILL]"); - dataTypes.register(Types.BOOLEAN, "BOOLEAN"); - dataTypes.register(Types.BOOLEAN, "BOOL"); - dataTypes.register(Types.DATE, "DATE"); - dataTypes.register(Types.TIME, "TIME[(L)]"); - dataTypes.register(Types.TIMESTAMP_WITH_TIMEZONE, "TIMESTAMP[(L)]"); // includes timezone information - dataTypes.register(Types.TIMESTAMP, "DATETIME[(L)]"); - dataTypes.register(Types.INTEGER, "YEAR[(2|4)]"); - dataTypes.register(Types.BINARY, "CHAR[(L)] BINARY"); - dataTypes.register(Types.VARBINARY, "VARCHAR(L) BINARY"); - dataTypes.register(Types.BINARY, "BINARY[(L)]"); - dataTypes.register(Types.VARCHAR, "VARCHAR(L)"); - dataTypes.register(Types.NVARCHAR, "NVARCHAR(L)"); - dataTypes.register(Types.NVARCHAR, "NATIONAL VARCHAR(L)"); - dataTypes.register(Types.NVARCHAR, "NCHAR VARCHAR(L)"); - dataTypes.register(Types.NVARCHAR, "NATIONAL CHARACTER VARYING(L)"); - dataTypes.register(Types.NVARCHAR, "NATIONAL CHAR VARYING(L)"); - dataTypes.register(Types.CHAR, "CHAR[(L)]"); - dataTypes.register(Types.NCHAR, "NCHAR[(L)]"); - dataTypes.register(Types.NCHAR, "NATIONAL CHARACTER(L)"); - dataTypes.register(Types.VARBINARY, "VARBINARY(L)"); - dataTypes.register(Types.BLOB, "TINYBLOB"); - dataTypes.register(Types.BLOB, "BLOB"); - dataTypes.register(Types.BLOB, "MEDIUMBLOB"); - dataTypes.register(Types.BLOB, "LONGBLOB"); - dataTypes.register(Types.BLOB, "TINYTEXT BINARY"); - dataTypes.register(Types.BLOB, "TEXT BINARY"); - dataTypes.register(Types.BLOB, "MEDIUMTEXT BINARY"); - dataTypes.register(Types.BLOB, "LONGTEXT BINARY"); - dataTypes.register(Types.VARCHAR, "TINYTEXT"); - dataTypes.register(Types.VARCHAR, "TEXT[(L)]"); - dataTypes.register(Types.VARCHAR, "MEDIUMTEXT"); - dataTypes.register(Types.VARCHAR, "LONGTEXT"); - dataTypes.register(Types.CHAR, "ENUM(...)"); - dataTypes.register(Types.CHAR, "SET(...)"); - dataTypes.register(Types.OTHER, "JSON"); - dataTypes.register(Types.OTHER, "GEOMETRY"); - dataTypes.register(Types.OTHER, "POINT"); - dataTypes.register(Types.OTHER, "LINESTRING"); - dataTypes.register(Types.OTHER, "POLYGON"); - dataTypes.register(Types.OTHER, "MULTIPOINT"); - dataTypes.register(Types.OTHER, "MULTILINESTRING"); - dataTypes.register(Types.OTHER, "MULTIPOLYGON"); - dataTypes.register(Types.OTHER, "GEOMETRYCOLLECTION"); - } - - @Override - protected void initializeKeywords(TokenSet keywords) { - } - - @Override - protected void initializeStatementStarts(TokenSet statementStartTokens) { - statementStartTokens.add("CREATE", "ALTER", "DROP", "GRANT", "REVOKE", "FLUSH", "TRUNCATE", "COMMIT", "USE", "SAVEPOINT", "ROLLBACK", - // table maintenance statements: https://dev.mysql.com/doc/refman/5.7/en/table-maintenance-sql.html - "ANALYZE", "OPTIMIZE", "REPAIR", - // DML-related statements - "DELETE", "INSERT" - ); - } - - @Override - protected void parseNextStatement(Marker marker) { - if (tokens.matches(DdlTokenizer.COMMENT)) { - parseComment(marker); - } else if (tokens.matches("CREATE")) { - parseCreate(marker); - } else if (tokens.matches("ALTER")) { - parseAlter(marker); - } else if (tokens.matches("DROP")) { - parseDrop(marker); - } else if (tokens.matches("RENAME")) { - parseRename(marker); - } else if (tokens.matches("USE")) { - parseUse(marker); - } else if (tokens.matches("SET")) { - parseSet(marker); - } else if (tokens.matches("INSERT")) { - consumeStatement(); - } else if (tokens.matches("DELETE")) { - consumeStatement(); - } else { - parseUnknownStatement(marker); - } - } - - protected void parseSet(Marker start) { - tokens.consume("SET"); - AtomicReference scope = new AtomicReference<>(); - parseSetVariable(start, scope); - while (tokens.canConsume(',')) { - parseSetVariable(start, scope); - } - consumeRemainingStatement(start); - debugParsed(start); - } - - protected void parseSetVariable(Marker start, AtomicReference scope) { - // First, use the modifier to set the scope ... - if (tokens.canConsume("GLOBAL") || tokens.canConsume("@@GLOBAL", ".")) { - scope.set(MySqlScope.GLOBAL); - } else if (tokens.canConsume("SESSION") || tokens.canConsume("@@SESSION", ".")) { - scope.set(MySqlScope.SESSION); - } else if (tokens.canConsume("LOCAL") || tokens.canConsume("@@LOCAL", ".")) { - scope.set(MySqlScope.LOCAL); - } - - // Now handle the remainder of the variable assignment ... - if (tokens.canConsume("PASSWORD")) { - // ignore - } else if (tokens.canConsume("TRANSACTION", "ISOLATION", "LEVEL")) { - // ignore - } else if (tokens.canConsume("CHARACTER", "SET") || tokens.canConsume("CHARSET")) { - // Sets two variables plus the current character set for the current database - // See https://dev.mysql.com/doc/refman/5.7/en/set-statement.html - String charsetName = tokens.consume(); - if ("DEFAULT".equalsIgnoreCase(charsetName)) { - charsetName = currentDatabaseCharset(); - } - systemVariables.setVariable(scope.get(), "character_set_client", charsetName); - systemVariables.setVariable(scope.get(), "character_set_results", charsetName); - systemVariables.setVariable(MySqlScope.SESSION, MySqlSystemVariables.CHARSET_NAME_CONNECTION, - systemVariables.getVariable(MySqlSystemVariables.CHARSET_NAME_DATABASE)); - // systemVariables.setVariable(scope.get(), "collation_connection", ...); - } else if (tokens.canConsume("NAMES")) { - // https://dev.mysql.com/doc/refman/5.7/en/set-statement.html - String charsetName = tokens.consume(); - if ("DEFAULT".equalsIgnoreCase(charsetName)) { - charsetName = currentDatabaseCharset(); - } - systemVariables.setVariable(scope.get(), "character_set_client", charsetName); - systemVariables.setVariable(scope.get(), "character_set_results", charsetName); - systemVariables.setVariable(scope.get(), "character_set_connection", charsetName); - // systemVariables.setVariable(scope.get(), "collation_connection", ...); - - if (tokens.canConsume("COLLATION")) { - tokens.consume(); // consume the collation name but do nothing with it - } - } else { - // This is a global, session, or local system variable, or a user variable. - String variableName = parseVariableName(); - tokens.canConsume(":"); // := is for user variables - tokens.consume("="); - String value = parseVariableValue(); - - if (variableName.startsWith("@")) { - // This is a user variable, so do nothing with it ... - } else { - systemVariables.setVariable(scope.get(), variableName, value); - - // If this is setting 'character_set_database', then we need to record the character set for - // the given database ... - if ("character_set_database".equalsIgnoreCase(variableName)) { - String currentDatabaseName = currentSchema(); - if (currentDatabaseName != null) { - charsetNameForDatabase.put(currentDatabaseName, value); - } - } - - // Signal that the variable was set ... - signalChangeEvent(new SetVariableEvent(variableName, value, statement(start))); - } - } - } - - protected String parseVariableName() { - String variableName = tokens.consume(); - while (tokens.canConsume("-")) { - variableName = variableName + "-" + tokens.consume(); - } - return variableName; - } - - protected String parseVariableValue() { - if (tokens.canConsumeAnyOf(",", ";")) { - // The variable is blank ... - return ""; - } - Marker start = tokens.mark(); - tokens.consumeUntilEndOrOneOf(",", ";"); - String value = tokens.getContentFrom(start); - if (value.startsWith("'") && value.endsWith("'")) { - // Remove the single quotes around the value ... - if (value.length() <= 2) { - value = ""; - } else { - value = value.substring(1, value.length() - 2); - } - } - return value; - } - - @SuppressWarnings("unchecked") - @Override - protected void parseCreate(Marker marker) { - tokens.consume("CREATE"); - if (tokens.matches("TABLE") || tokens.matches("TEMPORARY", "TABLE")) { - parseCreateTable(marker); - } else if (tokens.matches("VIEW")) { - parseCreateView(marker); - } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) { - parseCreateDatabase(marker); - } else if (tokens.matchesAnyOf("EVENT")) { - parseCreateEvent(marker); - } else if (tokens.matchesAnyOf("FUNCTION", "PROCEDURE")) { - parseCreateProcedure(marker); - } else if (tokens.matchesAnyOf("UNIQUE", "FULLTEXT", "SPATIAL", "INDEX")) { - parseCreateIndex(marker); - } else if (tokens.matchesAnyOf("SERVER")) { - parseCreateUnknown(marker); - } else if (tokens.matchesAnyOf("TABLESPACE")) { - parseCreateUnknown(marker); - } else if (tokens.matchesAnyOf("TRIGGER")) { - parseCreateTrigger(marker); - } else { - // It could be several possible things (including more elaborate forms of those matches tried above), - sequentially(this::parseCreateView, - this::parseCreateProcedure, - this::parseCreateTrigger, - this::parseCreateEvent, - this::parseCreateUnknown); - } - } - - protected void parseCreateDatabase(Marker start) { - tokens.consumeAnyOf("DATABASE", "SCHEMA"); - tokens.canConsume("IF", "NOT", "EXISTS"); - String dbName = tokens.consume(); - parseDatabaseOptions(start, dbName); - consumeRemainingStatement(start); - signalCreateDatabase(dbName, start); - debugParsed(start); - } - - protected void parseAlterDatabase(Marker start) { - tokens.consumeAnyOf("DATABASE", "SCHEMA"); - String dbName = tokens.consume(); - parseDatabaseOptions(start, dbName); - consumeRemainingStatement(start); - signalAlterDatabase(dbName, null, start); - debugParsed(start); - } - - protected void parseDropDatabase(Marker start) { - tokens.consumeAnyOf("DATABASE", "SCHEMA"); - tokens.canConsume("IF", "EXISTS"); - String dbName = tokens.consume(); - signalDropDatabase(dbName, start); - debugParsed(start); - } - - protected void parseDatabaseOptions(Marker start, String dbName) { - // Handle the default character set and collation ... - tokens.canConsume("DEFAULT"); - if (tokens.canConsume("CHARACTER", "SET") || tokens.canConsume("CHARSET")) { - tokens.canConsume("="); - String charsetName = tokens.consume(); - if ("DEFAULT".equalsIgnoreCase(charsetName)) { - charsetName = systemVariables.getVariable(SERVER_CHARSET_NAME); - } - charsetNameForDatabase.put(dbName, charsetName); - } - if (tokens.canConsume("COLLATE")) { - tokens.canConsume("="); - tokens.consume(); // collation name - } - } - - protected void parseCreateTable(Marker start) { - tokens.canConsume("TEMPORARY"); - tokens.consume("TABLE"); - boolean onlyIfNotExists = tokens.canConsume("IF", "NOT", "EXISTS"); - TableId tableId = parseQualifiedTableName(start); - if (tokens.canConsume("LIKE")) { - TableId originalId = parseQualifiedTableName(start); - Table original = databaseTables.forTable(originalId); - if (original != null) { - databaseTables.overwriteTable(tableId, original.columns(), original.primaryKeyColumnNames(), original.defaultCharsetName()); - } - consumeRemainingStatement(start); - signalCreateTable(tableId, start); - debugParsed(start); - return; - } - if (onlyIfNotExists && databaseTables.forTable(tableId) != null) { - // The table does exist, so we should do nothing ... - consumeRemainingStatement(start); - signalCreateTable(tableId, start); - debugParsed(start); - return; - } - TableEditor table = databaseTables.editOrCreateTable(tableId); - - // create_definition ... - if (tokens.matches('(')) { - parseCreateDefinitionList(start, table); - } - // table_options ... - parseTableOptions(start, table); - // partition_options ... - if (tokens.matches("PARTITION")) { - parsePartitionOptions(start, table); - } - // select_statement - if (tokens.canConsume("AS") || tokens.canConsume("IGNORE", "AS") || tokens.canConsume("REPLACE", "AS")) { - parseAsSelectStatement(start, table); - } - - // Make sure that the table's character set has been set ... - if (!table.hasDefaultCharsetName()) { - table.setDefaultCharsetName(currentDatabaseCharset()); - } - - // Update the table definition ... - databaseTables.overwriteTable(table.create()); - signalCreateTable(tableId, start); - debugParsed(start); - } - - protected void parseTableOptions(Marker start, TableEditor table) { - while (parseTableOption(start, table)) {} - } - - protected boolean parseTableOption(Marker start, TableEditor table) { - if (tokens.canConsume("AUTO_INCREMENT")) { - // Sets the auto-incremented value for the next incremented value ... - tokens.canConsume('='); - tokens.consume(); - return true; - } else if (tokens.canConsumeAnyOf("CHECKSUM", "ENGINE", "AVG_ROW_LENGTH", "MAX_ROWS", "MIN_ROWS", "ROW_FORMAT", - "DELAY_KEY_WRITE", "INSERT_METHOD", "KEY_BLOCK_SIZE", "PACK_KEYS", - "STATS_AUTO_RECALC", "STATS_PERSISTENT", "STATS_SAMPLE_PAGES" , "PAGE_CHECKSUM", - "COMPRESSION")) { - // One option token followed by '=' by a single value - tokens.canConsume('='); - tokens.consume(); - return true; - } else if (tokens.canConsume("DEFAULT", "CHARACTER", "SET") || tokens.canConsume("CHARACTER", "SET") || - tokens.canConsume("DEFAULT", "CHARSET") || tokens.canConsume("CHARSET")) { - tokens.canConsume('='); - String charsetName = tokens.consume(); - if ("DEFAULT".equalsIgnoreCase(charsetName)) { - // The table's default character set is set to the character set of the current database ... - charsetName = currentDatabaseCharset(); - } - table.setDefaultCharsetName(charsetName); - return true; - } else if (tokens.canConsume("DEFAULT", "COLLATE") || tokens.canConsume("COLLATE")) { - tokens.canConsume('='); - tokens.consume(); - return true; - } else if (tokens.canConsumeAnyOf("COMMENT", "CONNECTION", "ENCRYPTION", "PASSWORD")) { - tokens.canConsume('='); - consumeQuotedString(); - return true; - } else if (tokens.canConsume("DATA", "DIRECTORY") || tokens.canConsume("INDEX", "DIRECTORY")) { - tokens.canConsume('='); - consumeQuotedString(); - return true; - } else if (tokens.canConsume("TABLESPACE")) { - tokens.consume(); - return true; - } else if (tokens.canConsumeAnyOf("STORAGE", "ENGINE")) { - tokens.consume(); // storage engine name - return true; - } else if (tokens.canConsume("UNION")) { - tokens.canConsume('='); - tokens.consume('('); - tokens.consume(); - while (tokens.canConsume(',')) { - tokens.consume(); - } - tokens.consume(')'); - return true; - } - return false; - } - - protected void parsePartitionOptions(Marker start, TableEditor table) { - tokens.consume("PARTITION", "BY"); - if (tokens.canConsume("LINEAR", "HASH") || tokens.canConsume("HASH")) { - consumeExpression(start); - } else if (tokens.canConsume("LINEAR", "KEY") || tokens.canConsume("KEY")) { - if (tokens.canConsume("ALGORITHM")) { - tokens.consume("="); - tokens.consumeAnyOf("1", "2"); - } - parseColumnNameList(start); - } else if (tokens.canConsumeAnyOf("RANGE", "LIST")) { - if (tokens.canConsume("COLUMNS")) { - parseColumnNameList(start); - } else { - consumeExpression(start); - } - } - - if (tokens.canConsume("PARTITIONS")) { - tokens.consume(); - } - if (tokens.canConsume("SUBPARTITION", "BY")) { - if (tokens.canConsume("LINEAR", "HASH") || tokens.canConsume("HASH")) { - consumeExpression(start); - } else if (tokens.canConsume("LINEAR", "KEY") || tokens.canConsume("KEY")) { - if (tokens.canConsume("ALGORITHM")) { - tokens.consume("="); - tokens.consumeAnyOf("1", "2"); - } - parseColumnNameList(start); - } - if (tokens.canConsume("SUBPARTITIONS")) { - tokens.consume(); - } - } - if (tokens.canConsume('(')) { - do { - parsePartitionDefinition(start, table); - if(tokens.canConsume("ENGINE")) { - tokens.canConsume('='); - tokens.consume(); - } - } while (tokens.canConsume(',')); - tokens.consume(')'); - } - } - - protected void parsePartitionDefinition(Marker start, TableEditor table) { - tokens.consume("PARTITION"); - tokens.consume(); // name - if (tokens.canConsume("VALUES")) { - if (tokens.canConsume("LESS", "THAN")) { - if (!tokens.canConsume("MAXVALUE")) { - consumeExpression(start); - } - } else { - tokens.consume("IN"); - consumeValueList(start); - } - } else if (tokens.canConsume("STORAGE", "ENGINE") || tokens.canConsume("ENGINE")) { - tokens.canConsume('='); - tokens.consume(); - } else if (tokens.canConsumeAnyOf("COMMENT")) { - tokens.canConsume('='); - consumeQuotedString(); - } else if (tokens.canConsumeAnyOf("DATA", "INDEX") && tokens.canConsume("DIRECTORY")) { - tokens.canConsume('='); - consumeQuotedString(); - } else if (tokens.canConsumeAnyOf("MAX_ROWS", "MIN_ROWS", "TABLESPACE")) { - tokens.canConsume('='); - tokens.consume(); - } else if (tokens.canConsume('(')) { - do { - parseSubpartitionDefinition(start, table); - } while (tokens.canConsume(',')); - tokens.consume(')'); - } - } - - protected void parseSubpartitionDefinition(Marker start, TableEditor table) { - tokens.consume("SUBPARTITION"); - tokens.consume(); // name - if (tokens.canConsume("STORAGE", "ENGINE") || tokens.canConsume("ENGINE")) { - tokens.canConsume('='); - tokens.consume(); - } else if (tokens.canConsumeAnyOf("COMMENT")) { - tokens.canConsume('='); - consumeQuotedString(); - } else if (tokens.canConsumeAnyOf("DATA", "INDEX") && tokens.canConsume("DIRECTORY")) { - tokens.canConsume('='); - consumeQuotedString(); - } else if (tokens.canConsumeAnyOf("MAX_ROWS", "MIN_ROWS", "TABLESPACE")) { - tokens.canConsume('='); - tokens.consume(); - } - } - - protected void parseAsSelectStatement(Marker start, TableEditor table) { - tokens.consume("SELECT"); - consumeRemainingStatement(start); - } - - protected void parseCreateDefinitionList(Marker start, TableEditor table) { - tokens.consume('('); - parseCreateDefinition(start, table, false); - while (tokens.canConsume(',')) { - parseCreateDefinition(start, table, false); - } - tokens.consume(')'); - } - - /** - * @param isAlterStatement whether this is an ALTER TABLE statement or not (i.e. CREATE TABLE) - */ - protected void parseCreateDefinition(Marker start, TableEditor table, boolean isAlterStatement) { - // If the first token is a quoted identifier, then we know it is a column name ... - Collection errors = null; - boolean quoted = isNextTokenQuotedIdentifier(); - Marker defnStart = tokens.mark(); - if (!quoted) { - // The first token is not quoted so let's check for other expressions ... - if (tokens.canConsume("CHECK")) { - // Try to parse the constraints first ... - consumeExpression(start); - return; - } - if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "PRIMARY", "KEY") - || tokens.canConsume("CONSTRAINT", "PRIMARY", "KEY") - || tokens.canConsume("PRIMARY", "KEY") - ) { - try { - if (tokens.canConsume("USING")) { - parseIndexType(start); - } - if (!tokens.matches('(')) { - tokens.consume(); // index name - } - List pkColumnNames = parseIndexColumnNames(start); - table.setPrimaryKeyNames(pkColumnNames); - parseIndexOptions(start); - // MySQL does not allow a primary key to have nullable columns, so let's make sure we model that correctly ... - pkColumnNames.forEach(name -> { - Column c = table.columnWithName(name); - if (c != null && c.isOptional()) { - final ColumnEditor ce = c.edit().optional(false); - if (ce.hasDefaultValue() && ce.defaultValue() == null) { - ce.unsetDefaultValue(); - } - table.addColumn(ce.create()); - } - }); - return; - } catch (ParsingException e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } catch (MultipleParsingExceptions e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } - } - if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "UNIQUE") - || tokens.canConsume("CONSTRAINT", "UNIQUE") - || tokens.canConsume("UNIQUE")) { - tokens.canConsumeAnyOf("KEY", "INDEX"); - try { - if (!tokens.matches('(')) { - if (!tokens.matches("USING")) { - tokens.consume(); // name of unique index ... - } - if (tokens.matches("USING")) { - parseIndexType(start); - } - } - List uniqueKeyColumnNames = parseIndexColumnNames(start); - if (table.primaryKeyColumnNames().isEmpty()) { - table.setPrimaryKeyNames(uniqueKeyColumnNames); // this may eventually get overwritten by a real PK - } - parseIndexOptions(start); - return; - } catch (ParsingException e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } catch (MultipleParsingExceptions e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } - } - if (tokens.canConsume("CONSTRAINT", TokenStream.ANY_VALUE, "FOREIGN", "KEY") || tokens.canConsume("FOREIGN", "KEY")) { - try { - if (!tokens.matches('(')) { - tokens.consume(); // name of foreign key - } - parseIndexColumnNames(start); - if (tokens.matches("REFERENCES")) { - parseReferenceDefinition(start); - } - return; - } catch (ParsingException e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } catch (MultipleParsingExceptions e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } - } - if (tokens.canConsumeAnyOf("INDEX", "KEY")) { - try { - if (!tokens.matches('(')) { - if (!tokens.matches("USING")) { - tokens.consume(); // name of unique index ... - } - if (tokens.matches("USING")) { - parseIndexType(start); - } - } - parseIndexColumnNames(start); - parseIndexOptions(start); - return; - } catch (ParsingException e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } catch (MultipleParsingExceptions e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } - } - if (tokens.canConsumeAnyOf("FULLTEXT", "SPATIAL")) { - try { - tokens.canConsumeAnyOf("INDEX", "KEY"); - if (!tokens.matches('(')) { - tokens.consume(); // name of unique index ... - } - parseIndexColumnNames(start); - parseIndexOptions(start); - return; - } catch (ParsingException e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } catch (MultipleParsingExceptions e) { - // Invalid names, so rewind and continue - errors = accumulateParsingFailure(e, errors); - tokens.rewind(defnStart); - } - } - } - - try { - // It's either quoted (meaning it's a column definition) - if (isAlterStatement && !quoted) { - tokens.canConsume("COLUMN"); // optional for ALTER TABLE - } - - String columnName = parseColumnName(); - parseCreateColumn(start, table, columnName, null); - } catch (ParsingException e) { - if (errors != null) { - errors = accumulateParsingFailure(e, errors); - throw new MultipleParsingExceptions(errors); - } - throw e; - } catch (MultipleParsingExceptions e) { - if (errors != null) { - errors = accumulateParsingFailure(e, errors); - throw new MultipleParsingExceptions(errors); - } - throw e; - } - } - - protected Column parseCreateColumn(Marker start, TableEditor table, String columnName, String newColumnName) { - // Obtain the column editor ... - Column existingColumn = table.columnWithName(columnName); - ColumnEditor column = existingColumn != null ? existingColumn.edit() : Column.editor().name(columnName); - AtomicBoolean isPrimaryKey = new AtomicBoolean(false); - - parseColumnDefinition(start, columnName, tokens, table, column, isPrimaryKey); - - convertDefaultValueToSchemaType(column); - - // Update the table ... - Column newColumnDefn = column.create(); - table.addColumns(newColumnDefn); - if (isPrimaryKey.get()) { - table.setPrimaryKeyNames(newColumnDefn.name()); - } - - if (newColumnName != null && !newColumnName.equalsIgnoreCase(columnName)) { - table.renameColumn(columnName, newColumnName); - columnName = newColumnName; - } - - // ALTER TABLE allows reordering the columns after the definition ... - if (tokens.canConsume("FIRST")) { - table.reorderColumn(columnName, null); - } else if (tokens.canConsume("AFTER")) { - table.reorderColumn(columnName, tokens.consume()); - } - return table.columnWithName(newColumnDefn.name()); - } - - private void convertDefaultValueToSchemaType(ColumnEditor columnEditor) { - final Column column = columnEditor.create(); - // if converters is not null and the default value is not null, we need to convert default value - if (converters != null && columnEditor.defaultValue() != null) { - Object defaultValue = columnEditor.defaultValue(); - final SchemaBuilder schemaBuilder = converters.schemaBuilder(column); - if (schemaBuilder == null) { - return; - } - final Schema schema = schemaBuilder.build(); - //In order to get the valueConverter for this column, we have to create a field; - //The index value -1 in the field will never used when converting default value; - //So we can set any number here; - final Field field = new Field(column.name(), -1, schema); - final ValueConverter valueConverter = converters.converter(column, field); - if (defaultValue instanceof String) { - defaultValue = defaultValuePreConverter.convert(column, (String) defaultValue); - } - defaultValue = valueConverter.convert(defaultValue); - columnEditor.defaultValue(defaultValue); - } - } - - /** - * Parse the {@code ENUM} or {@code SET} data type expression to extract the character options, where the index(es) appearing - * in the {@code ENUM} or {@code SET} values can be used to identify the acceptable characters. - * - * @param typeExpression the data type expression - * @return the string containing the character options allowed by the {@code ENUM} or {@code SET}; never null - */ - public static List parseSetAndEnumOptions(String typeExpression) { - List options = new ArrayList<>(); - TokenStream tokens = new TokenStream(typeExpression, TokenStream.basicTokenizer(false), false); - tokens.start(); - if (tokens.canConsumeAnyOf("ENUM", "SET") && tokens.canConsume('(')) { - // The literals should be quoted values ... - if (tokens.matchesAnyOf(DdlTokenizer.DOUBLE_QUOTED_STRING, DdlTokenizer.SINGLE_QUOTED_STRING)) { - options.add(withoutQuotes(tokens.consume())); - while (tokens.canConsume(',')) { - if (tokens.matchesAnyOf(DdlTokenizer.DOUBLE_QUOTED_STRING, DdlTokenizer.SINGLE_QUOTED_STRING)) { - options.add(withoutQuotes(tokens.consume())); - } - } - } - tokens.consume(')'); - } - return options; - } - - protected void parseColumnDefinition(Marker start, String columnName, TokenStream tokens, TableEditor table, ColumnEditor column, - AtomicBoolean isPrimaryKey) { - // Parse the data type, which must be at this location ... - List errors = new ArrayList<>(); - Marker dataTypeStart = tokens.mark(); - DataType dataType = dataTypeParser.parse(tokens, errors::addAll); - if (dataType == null) { - String dataTypeName = parseDomainName(start); - if (dataTypeName != null) { - dataType = DataType.userDefinedType(dataTypeName); - } - } - if (dataType == null) { - // No data type was found - parsingFailed(dataTypeStart.position(), errors, "Unable to read the data type"); - return; - } - - // DBZ-771 unset previously set default value, as it's not kept by MySQL; for any column modifications a new - // default value (which could be the same) has to be provided by the column_definition which we'll parse later - // on; only in 8.0 (not supported by this parser) columns can be renamed without repeating the full column - // definition - column.unsetDefaultValue(); - - column.jdbcType(dataType.jdbcType()); - column.type(dataType.name(), dataType.expression()); - if ("ENUM".equals(dataType.name())) { - column.length(1); - } else if ("SET".equals(dataType.name())) { - List options = parseSetAndEnumOptions(dataType.expression()); - // After DBZ-132, it will always be comma seperated - column.length(Math.max(0, options.size() * 2 - 1)); // number of options + number of commas - } else { - if (dataType.length() > -1) { - column.length((int) dataType.length()); - } - if (dataType.scale() > -1) { - column.scale(dataType.scale()); - } - } - - if (Types.NCHAR == dataType.jdbcType() || Types.NVARCHAR == dataType.jdbcType()) { - // NCHAR and NVARCHAR columns always uses utf8 as charset - column.charsetName("utf8"); - } - - if (Types.DECIMAL == dataType.jdbcType() || Types.NUMERIC == dataType.jdbcType()) { - if (dataType.length() == -1) { - column.length(10); - } - if (dataType.scale() == -1) { - column.scale(0); - } - } - - if (tokens.canConsume("CHARSET") || tokens.canConsume("CHARACTER", "SET")) { - String charsetName = tokens.consume(); - if (!"DEFAULT".equalsIgnoreCase(charsetName)) { - // Only record it if not inheriting the character set from the table - column.charsetName(charsetName); - } - } - if (tokens.canConsume("COLLATE")) { - tokens.consume(); // name of collation - } - - if (tokens.canConsume("AS") || tokens.canConsume("GENERATED", "ALWAYS", "AS")) { - consumeExpression(start); - tokens.canConsumeAnyOf("VIRTUAL", "STORED"); - if (tokens.canConsume("UNIQUE")) { - tokens.canConsume("KEY"); - } - if (tokens.canConsume("COMMENT")) { - consumeQuotedString(); - } - tokens.canConsume("NOT", "NULL"); - tokens.canConsume("NULL"); - tokens.canConsume("PRIMARY", "KEY"); - tokens.canConsume("KEY"); - } else { - while (tokens.matchesAnyOf("NOT", "NULL", "DEFAULT", "AUTO_INCREMENT", "UNIQUE", "PRIMARY", "KEY", "COMMENT", - "REFERENCES", "COLUMN_FORMAT", "ON", "COLLATE")) { - // Nullability ... - if (tokens.canConsume("NOT", "NULL")) { - column.optional(false); - } else if (tokens.canConsume("NULL")) { - column.optional(true); - } - // Default value ... - if (tokens.matches("DEFAULT")) { - parseDefaultClause(start, column); - } - if (tokens.matches("ON", "UPDATE") || tokens.matches("ON", "DELETE")) { - parseOnUpdateOrDelete(tokens.mark()); - column.autoIncremented(true); - } - // Other options ... - if (tokens.canConsume("AUTO_INCREMENT")) { - column.autoIncremented(true); - column.generated(true); - } - if (tokens.canConsume("UNIQUE", "KEY") || tokens.canConsume("UNIQUE")) { - if (table.primaryKeyColumnNames().isEmpty() && !column.isOptional()) { - // The table has no primary key (yet) but this is a non-null column and therefore will have all unique - // values (MySQL allows UNIQUE indexes with some nullable columns, but in that case allows duplicate - // rows), - // so go ahead and set it to this column as it's a unique key - isPrimaryKey.set(true); - } - } - if (tokens.canConsume("PRIMARY", "KEY") || tokens.canConsume("KEY")) { - // Always set this column as the primary key - column.optional(false); // MySQL primary key columns may not be null - isPrimaryKey.set(true); - } - if (tokens.canConsume("COMMENT")) { - consumeQuotedString(); - } - if (tokens.canConsume("COLUMN_FORMAT")) { - tokens.consumeAnyOf("FIXED", "DYNAMIC", "DEFAULT"); - } - if (tokens.matches("REFERENCES")) { - parseReferenceDefinition(start); - } - if (tokens.canConsume("COLLATE")) { - tokens.consume(); // name of collation - } - } - } - } - - protected String parseDomainName(Marker start) { - return parseSchemaQualifiedName(start); - } - - protected List parseIndexColumnNames(Marker start) { - List names = new ArrayList<>(); - tokens.consume('('); - parseIndexColumnName(names::add); - while (tokens.canConsume(',')) { - parseIndexColumnName(names::add); - } - tokens.consume(')'); - return names; - } - - private void parseIndexColumnName(Consumer name) { - name.accept(parseColumnName()); - if (tokens.canConsume('(')) { - tokens.consume(); // length - tokens.consume(')'); - } - tokens.canConsumeAnyOf("ASC", "DESC"); - } - - protected void parseIndexType(Marker start) { - tokens.consume("USING"); - tokens.consumeAnyOf("BTREE", "HASH"); - } - - protected void parseIndexOptions(Marker start) { - while (true) { - if (tokens.matches("USING")) { - parseIndexType(start); - } else if (tokens.canConsume("COMMENT")) { - consumeQuotedString(); - } else if (tokens.canConsume("KEY_BLOCK_SIZE")) { - tokens.consume("="); - tokens.consume(); - } else if (tokens.canConsume("WITH", "PARSER")) { - tokens.consume(); - } else { - break; - } - } - } - - protected void parseReferenceDefinition(Marker start) { - tokens.consume("REFERENCES"); - parseSchemaQualifiedName(start); // table name - if (tokens.matches('(')) { - parseColumnNameList(start); - } - if (tokens.canConsume("MATCH")) { - tokens.consumeAnyOf("FULL", "PARTIAL", "SIMPLE"); - } - if (tokens.canConsume("ON", "DELETE")) { - parseReferenceOption(start); - } - if (tokens.canConsume("ON", "UPDATE")) { - parseReferenceOption(start); - } - if (tokens.canConsume("ON", "DELETE")) { // in case ON UPDATE is first - parseReferenceOption(start); - } - } - - protected void parseReferenceOption(Marker start) { - if (tokens.canConsume("RESTRICT")) {} else if (tokens.canConsume("CASCADE")) {} else if (tokens.canConsume("SET", "NULL")) {} else { - tokens.canConsume("NO", "ACTION"); - } - } - - protected void parseCreateView(Marker start) { - tokens.canConsume("OR", "REPLACE"); - if (tokens.canConsume("ALGORITHM")) { - tokens.consume('='); - tokens.consumeAnyOf("UNDEFINED", "MERGE", "TEMPTABLE"); - } - parseDefiner(tokens.mark()); - if (tokens.canConsume("SQL", "SECURITY")) { - tokens.consumeAnyOf("DEFINER", "INVOKER"); - } - tokens.consume("VIEW"); - TableId tableId = parseQualifiedTableName(start); - if (skipViews) { - // We don't care about the rest ... - consumeRemainingStatement(start); - signalCreateView(tableId, start); - debugSkipped(start); - return; - } - - TableEditor table = databaseTables.editOrCreateTable(tableId); - if (tokens.matches('(')) { - List columnNames = parseColumnNameList(start); - // We know nothing other than the names ... - columnNames.forEach(name -> { - table.addColumn(Column.editor().name(name).create()); - }); - } - tokens.canConsume("AS"); - - // We should try to discover the types of the columns by looking at this select - if (tokens.canConsume("SELECT")) { - // If the SELECT clause is selecting qualified column names or columns names from a single table, then - // we can look up the columns and use those to set the type and nullability of the view's columns ... - Map selectedColumnsByAlias = parseColumnsInSelectClause(start); - if (table.columns().isEmpty()) { - selectedColumnsByAlias.forEach((columnName, fromTableColumn) -> { - if (fromTableColumn != null && columnName != null) { - table.addColumn(fromTableColumn.edit().name(columnName).create()); - } - }); - } else { - List changedColumns = new ArrayList<>(); - table.columns().forEach(column -> { - // Find the column from the SELECT statement defining the view ... - Column selectedColumn = selectedColumnsByAlias.get(column.name()); - if (selectedColumn != null) { - changedColumns.add(column.edit() - .jdbcType(selectedColumn.jdbcType()) - .type(selectedColumn.typeName(), selectedColumn.typeExpression()) - .length(selectedColumn.length()) - .scale(selectedColumn.scale().orElse(null)) - .autoIncremented(selectedColumn.isAutoIncremented()) - .generated(selectedColumn.isGenerated()) - .optional(selectedColumn.isOptional()).create()); - } - }); - changedColumns.forEach(table::addColumn); - } - - // Parse the FROM clause to see if the view is only referencing a single table, and if so then update the view - // with an equivalent primary key ... - Map fromTables = parseSelectFromClause(start); - if (fromTables.size() == 1) { - Table fromTable = fromTables.values().stream().findFirst().get(); - List fromTablePkColumnNames = fromTable.retrieveColumnNames(); - List viewPkColumnNames = new ArrayList<>(); - selectedColumnsByAlias.forEach((viewColumnName, fromTableColumn) -> { - if (fromTablePkColumnNames.contains(fromTableColumn.name())) { - viewPkColumnNames.add(viewColumnName); - } - }); - if (viewPkColumnNames.size() == fromTablePkColumnNames.size()) { - table.setPrimaryKeyNames(viewPkColumnNames); - } - } - } - - // We don't care about the rest ... - consumeRemainingStatement(start); - - // Update the table definition ... - databaseTables.overwriteTable(table.create()); - - signalCreateView(tableId, start); - debugParsed(start); - } - - protected void parseCreateIndex(Marker start) { - boolean unique = tokens.canConsume("UNIQUE"); - tokens.canConsumeAnyOf("FULLTEXT", "SPATIAL"); - tokens.consume("INDEX"); - String indexName = tokens.consume(); // index name - if (tokens.matches("USING")) { - parseIndexType(start); - } - TableId tableId = null; - if (tokens.canConsume("ON")) { - // Usually this is required, but in some cases ON is not required - tableId = parseQualifiedTableName(start); - } - - if (unique && tableId != null) { - // This is a unique index, and we can mark the index's columns as the primary key iff there is not already - // a primary key on the table. (Should a PK be created later via an alter, then it will overwrite this.) - TableEditor table = databaseTables.editTable(tableId); - if (table != null && !table.hasPrimaryKey()) { - List names = parseIndexColumnNames(start); - if (table.columns().stream().allMatch(Column::isRequired)) { - databaseTables.overwriteTable(table.setPrimaryKeyNames(names).create()); - } - } - } - - // We don't care about any other statements or the rest of this statement ... - consumeRemainingStatement(start); - // TODO fix: signal should be send only when some changes on table are made - signalCreateIndex(indexName, tableId, start); - debugParsed(start); - } - - protected void parseDefiner(Marker start) { - if (tokens.canConsume("DEFINER")) { - tokens.consume('='); - tokens.consume(); // user or CURRENT_USER - if (tokens.canConsume("@")) { - tokens.consume(); // host - } else { - String next = tokens.peek(); - if (next.startsWith("@")) { // e.g., @`localhost` - tokens.consume(); - } - } - } - } - - protected void parseCreateProcedure(Marker start) { - parseDefiner(tokens.mark()); - tokens.consumeAnyOf("FUNCTION", "PROCEDURE"); - tokens.consume(); // name - consumeRemainingStatement(start); - } - - protected void parseCreateTrigger(Marker start) { - parseDefiner(tokens.mark()); - tokens.consume("TRIGGER"); - tokens.consume(); // name - consumeRemainingStatement(start); - } - - protected void parseCreateEvent(Marker start) { - parseDefiner(tokens.mark()); - tokens.consume("EVENT"); - tokens.consume(); // name - consumeRemainingStatement(start); - } - - protected void parseCreateUnknown(Marker start) { - consumeRemainingStatement(start); - } - - @Override - protected void parseAlter(Marker marker) { - tokens.consume("ALTER"); - if (tokens.matches("TABLE") || tokens.matches("IGNORE", "TABLE")) { - parseAlterTable(marker); - debugParsed(marker); - } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) { - parseAlterDatabase(marker); - } else { - parseAlterUnknown(marker); - } - } - - protected void parseAlterTable(Marker start) { - tokens.canConsume("IGNORE"); - tokens.consume("TABLE"); - TableId tableId = parseQualifiedTableName(start); - TableEditor table = databaseTables.editTable(tableId); - TableId oldTableId = null; - if (table != null) { - AtomicReference newTableName = new AtomicReference<>(null); - if (!tokens.matches(terminator()) && !tokens.matches("PARTITION")) { - parseAlterSpecificationList(start, table, newTableName::set); - } - if (tokens.matches("PARTITION")) { - parsePartitionOptions(start, table); - } - databaseTables.overwriteTable(table.create()); - if (newTableName.get() != null) { - // the table was renamed ... - Table renamed = databaseTables.renameTable(tableId, newTableName.get()); - if (renamed != null) { - oldTableId = tableId; - tableId = renamed.id(); - } - } - } else { - Marker marker = tokens.mark(); - try { - // We don't know about this table but we still have to parse the statement ... - table = TableEditor.noOp(tableId); - if (!tokens.matches(terminator()) && !tokens.matches("PARTITION")) { - parseAlterSpecificationList(start, table, str -> { - }); - } - if (tokens.matches("PARTITION")) { - parsePartitionOptions(start, table); - } - parseTableOptions(start, table); - // do nothing with this - } catch (ParsingException e) { - tokens.rewind(marker); - consumeRemainingStatement(start); - } - } - signalAlterTable(tableId, oldTableId, start); - } - - protected void parseAlterSpecificationList(Marker start, TableEditor table, Consumer newTableName) { - parseAlterSpecification(start, table, newTableName); - while (tokens.canConsume(',')) { - parseAlterSpecification(start, table, newTableName); - } - } - - protected void parseAlterSpecification(Marker start, TableEditor table, Consumer newTableName) { - parseTableOptions(start, table); - if (tokens.canConsume("ADD")) { - if (tokens.matches("COLUMN", "(") || tokens.matches('(')) { - tokens.canConsume("COLUMN"); - parseCreateDefinitionList(start, table); - } else if (tokens.canConsume("PARTITION", "(")) { - do { - parsePartitionDefinition(start, table); - } while (tokens.canConsume(',')); - tokens.consume(')'); - } else { - parseCreateDefinition(start, table, true); - } - } else if (tokens.canConsume("DROP")) { - if (tokens.canConsume("PRIMARY", "KEY")) { - table.setPrimaryKeyNames(); - } else if (tokens.canConsume("FOREIGN", "KEY")) { - tokens.consume(); // foreign key symbol - } else if (tokens.canConsumeAnyOf("INDEX", "KEY")) { - tokens.consume(); // index name - } else if (tokens.canConsume("PARTITION")) { - parsePartitionNames(start); - } else { - if(!isNextTokenQuotedIdentifier()) { - tokens.canConsume("COLUMN"); - } - String columnName = parseColumnName(); - table.removeColumn(columnName); - tokens.canConsume("RESTRICT"); - } - } else if (tokens.canConsume("ALTER")) { - if (!isNextTokenQuotedIdentifier()) { - tokens.canConsume("COLUMN"); - } - String columnName = tokens.consume(); // column name - if (!tokens.canConsume("DROP", "DEFAULT")) { - tokens.consume("SET"); - ColumnEditor columnEditor = table.columnWithName(columnName).edit(); - parseDefaultClause(start, columnEditor); - } - } else if (tokens.canConsume("CHANGE")) { - if (!isNextTokenQuotedIdentifier()) { - tokens.canConsume("COLUMN"); - } - String oldName = parseColumnName(); - String newName = parseColumnName(); - parseCreateColumn(start, table, oldName, newName); - } else if (tokens.canConsume("MODIFY")) { - if (!isNextTokenQuotedIdentifier()) { - tokens.canConsume("COLUMN"); - } - String columnName = parseColumnName(); - parseCreateColumn(start, table, columnName, null); - } else if (tokens.canConsumeAnyOf("ALGORITHM", "LOCK")) { - tokens.canConsume('='); - tokens.consume(); - } else if (tokens.canConsume("DISABLE", "KEYS") - || tokens.canConsume("ENABLE", "KEYS")) {} else if (tokens.canConsume("RENAME", "INDEX") - || tokens.canConsume("RENAME", "KEY")) { - tokens.consume(); // old - tokens.consume("TO"); - tokens.consume(); // new - } else if (tokens.canConsume("RENAME")) { - tokens.canConsumeAnyOf("AS", "TO"); - TableId newTableId = parseQualifiedTableName(start); - newTableName.accept(newTableId); - } else if (tokens.canConsume("ORDER", "BY")) { - consumeCommaSeparatedValueList(start); // this should not affect the order of the columns in the table - } else if (tokens.canConsume("CONVERT", "TO", "CHARACTER", "SET") - || tokens.canConsume("CONVERT", "TO", "CHARSET")) { - tokens.consume(); // charset name - if (tokens.canConsume("COLLATE")) { - tokens.consume(); // collation name - } - } else if (tokens.canConsume("CHARACTER", "SET") - || tokens.canConsume("CHARSET") - || tokens.canConsume("DEFAULT", "CHARACTER", "SET") - || tokens.canConsume("DEFAULT", "CHARSET")) { - tokens.canConsume('='); - String charsetName = tokens.consume(); // charset name - table.setDefaultCharsetName(charsetName); - if (tokens.canConsume("COLLATE")) { - tokens.canConsume('='); - tokens.consume(); // collation name (ignored) - } - } else if (tokens.canConsume("DISCARD", "TABLESPACE") || tokens.canConsume("IMPORT", "TABLESPACE")) { - // nothing - } else if (tokens.canConsume("FORCE")) { - // nothing - } else if (tokens.canConsume("WITH", "VALIDATION") || tokens.canConsume("WITHOUT", "VALIDATION")) { - // nothing - } else if (tokens.canConsume("DISCARD", "PARTITION") || tokens.canConsume("IMPORT", "PARTITION")) { - if (!tokens.canConsume("ALL")) { - tokens.consume(); // partition name - } - tokens.consume("TABLESPACE"); - } else if (tokens.canConsume("COALLESCE", "PARTITION")) { - tokens.consume(); // number - } else if (tokens.canConsume("REORGANIZE", "PARTITION")) { - parsePartitionNames(start); - tokens.consume("INTO", "("); - do { - parsePartitionDefinition(start, table); - } while (tokens.canConsume(',')); - tokens.consume(')'); - } else if (tokens.canConsume("EXCHANGE", "PARTITION")) { - tokens.consume(); // partition name - tokens.consume("WITH", "TABLE"); - parseSchemaQualifiedName(start); // table name - if (tokens.canConsumeAnyOf("WITH", "WITHOUT")) { - tokens.consume("VALIDATION"); - } - } else if (tokens.matches(TokenStream.ANY_VALUE, "PARTITION")) { - tokens.consumeAnyOf("TRUNCATE", "CHECK", "ANALYZE", "OPTIMIZE", "REBUILD", "REPAIR"); - tokens.consume("PARTITION"); - if (!tokens.canConsume("ALL")) { - parsePartitionNames(start); - } - } else if (tokens.canConsume("REMOVE", "PARTITIONING")) { - // nothing - } else if (tokens.canConsume("UPGRADE", "PARTITIONING")) { - // nothing - } - } - - protected void parseAlterUnknown(Marker start) { - consumeRemainingStatement(start); - debugSkipped(start); - } - - @Override - protected void parseDrop(Marker marker) { - tokens.consume("DROP"); - if (tokens.matches("TABLE") || tokens.matches("TEMPORARY", "TABLE")) { - parseDropTable(marker); - } else if (tokens.matches("VIEW")) { - parseDropView(marker); - } else if (tokens.matches("INDEX")) { - parseDropIndex(marker); - } else if (tokens.matchesAnyOf("DATABASE", "SCHEMA")) { - parseDropDatabase(marker); - } else { - parseDropUnknown(marker); - } - } - - protected void parseDropTable(Marker start) { - tokens.canConsume("TEMPORARY"); - tokens.consume("TABLE"); - tokens.canConsume("IF", "EXISTS"); - String statementPrefix = statement(start); - List ids = parseQualifiedTableNames(start); - boolean restrict = tokens.canConsume("RESTRICT"); - boolean cascade = tokens.canConsume("CASCADE"); - ids.forEach(tableId -> { - databaseTables.removeTable(tableId); - signalDropTable(tableId, statementPrefix + tableId + (restrict ? " RESTRICT" : cascade ? " CASCADE" : "")); - }); - debugParsed(start); - } - - protected void parseDropView(Marker start) { - if (skipViews) { - consumeRemainingStatement(start); - debugSkipped(start); - return; - } - tokens.consume("VIEW"); - tokens.canConsume("IF", "EXISTS"); - String statementPrefix = statement(start); - List ids = parseQualifiedTableNames(start); - boolean restrict = tokens.canConsume("RESTRICT"); - boolean cascade = tokens.canConsume("CASCADE"); - ids.forEach(tableId -> { - databaseTables.removeTable(tableId); - signalDropView(tableId, statementPrefix + tableId + (restrict ? " RESTRICT" : cascade ? " CASCADE" : "")); - }); - debugParsed(start); - } - - protected void parseDropIndex(Marker start) { - tokens.consume("INDEX"); - String indexName = tokens.consume(); // index name - tokens.consume("ON"); - TableId tableId = parseQualifiedTableName(start); - consumeRemainingStatement(start); - signalDropIndex(indexName, tableId, start); - debugParsed(start); - } - - protected void parseDropUnknown(Marker start) { - consumeRemainingStatement(start); - debugSkipped(start); - } - - protected void parseRename(Marker start) { - tokens.consume("RENAME"); - if (tokens.canConsume("TABLE")) { - parseRenameTable(start); - while (tokens.canConsume(',')) { - parseRenameTable(start); - } - } else if (tokens.canConsumeAnyOf("DATABASE", "SCHEMA", "USER")) { - // See https://dev.mysql.com/doc/refman/5.1/en/rename-database.html - consumeRemainingStatement(start); - } - } - - protected void parseRenameTable(Marker start) { - TableId from = parseQualifiedTableName(start); - tokens.consume("TO"); - TableId to = parseQualifiedTableName(start); - databaseTables.renameTable(from, to); - // Signal a separate statement for this table rename action, even though multiple renames might be - // performed by a single DDL statement on the token stream ... - signalAlterTable(from, to, "RENAME TABLE " + from + " TO " + to); - } - - protected void parseUse(Marker marker) { - tokens.consume("USE"); - String dbName = tokens.consume(); - setCurrentSchema(dbName); - - // Every time MySQL switches to a different database, it sets the "character_set_database" and "collation_database" - // system variables. We replicate that behavior here (or the variable we care about) so that these variables are always - // right for the current database. - String charsetForDb = charsetNameForDatabase.get(dbName); - systemVariables.setVariable(MySqlScope.GLOBAL, "character_set_database", charsetForDb); - } - - /** - * Get the name of the character set for the current database, via the "character_set_database" system property. - * - * @return the name of the character set for the current database, or null if not known ... - */ - protected String currentDatabaseCharset() { - String charsetName = systemVariables.getVariable("character_set_database"); - if (charsetName == null || "DEFAULT".equalsIgnoreCase(charsetName)) { - charsetName = systemVariables.getVariable(SERVER_CHARSET_NAME); - } - return charsetName; - } - - protected List parseColumnNameList(Marker start) { - List names = new ArrayList<>(); - tokens.consume('('); - names.add(parseColumnName()); - while (tokens.canConsume(',')) { - names.add(parseColumnName()); - } - tokens.consume(')'); - return names; - } - - protected String parseColumnName() { - boolean quoted = isNextTokenQuotedIdentifier(); - String name = tokens.consume(); - if (!quoted) { - // Unquoted names may not consist entirely of digits - if (name.matches("[0-9]+")) { - parsingFailed(tokens.previousPosition(), "Unquoted column names may not contain only digits"); - return null; - } - } - return name; - } - - protected void parsePartitionNames(Marker start) { - consumeCommaSeparatedValueList(start); - } - - protected void consumeCommaSeparatedValueList(Marker start) { - tokens.consume(); - while (tokens.canConsume(',')) { - tokens.consume(); - } - } - - protected void consumeValueList(Marker start) { - tokens.consume('('); - consumeCommaSeparatedValueList(start); - tokens.consume(')'); - } - - /** - * Consume an expression surrounded by parentheses. - * - * @param start the start of the statement - */ - protected void consumeExpression(Marker start) { - tokens.consume("("); - tokens.consumeThrough(')', '('); - } - - /** - * Consume the entire {@code BEGIN...END} block that appears next in the token stream. This handles nested - * BEGIN...END blocks, - * labeled statements, - * and control blocks. - * - * @param start the marker at which the statement was begun - */ - @Override - protected void consumeBeginStatement(Marker start) { - tokens.consume("BEGIN"); - // Look for a label that preceded the BEGIN ... - LinkedList labels = new LinkedList<>(); - labels.addFirst(getPrecedingBlockLabel()); - - int expectedPlainEnds = 0; - - // Now look for the "END", ignoring intermediate control blocks that also use "END" ... - while (tokens.hasNext()) { - if (tokens.matchesWord("BEGIN")) { - consumeBeginStatement(tokens.mark()); - } - if (tokens.canConsumeWords("IF", "EXISTS")) { - // Ignore any IF EXISTS phrases ... - } else if (tokens.canConsumeWords("CASE", "WHEN")) { - // This block can end with END without suffix - expectedPlainEnds++; - } else if (tokens.matchesAnyWordOf("REPEAT", "LOOP", "WHILE")) { - // This block can contain label - String label = getPrecedingBlockLabel(); - tokens.consume(); - labels.addFirst(label); // may be null - } else if (tokens.canConsumeWord("END")) { - if (tokens.matchesAnyOf("REPEAT", "LOOP", "WHILE")) { - // Read block label if set - tokens.consume(); - String label = labels.remove(); - if (label != null) { - tokens.canConsume(label); - } - } else if (tokens.matchesAnyWordOf("IF", "CASE")) { - tokens.consume(); - } else if (expectedPlainEnds > 0) { - // There was a statement that will be ended with plain END - expectedPlainEnds--; - } else { - break; - } - } else { - tokens.consume(); - } - } - - // We've consumed the corresponding END of the BEGIN, but consume the label if one was used ... - assert labels.size() == 1; - String label = labels.remove(); - if (label != null) { - tokens.canConsume(label); - } - } - - /** - * Get the label that appears with a colon character just prior to the current position. Some MySQL DDL statements can be - * labeled, and this label can then appear at the - * end of a block. - * - * @return the label for the block starting at the current position; null if there is no such label - * @throws NoSuchElementException if there is no previous token - */ - protected String getPrecedingBlockLabel() { - if (tokens.previousToken(1).matches(':')) { - // A label preceded the beginning of the control block ... - return tokens.previousToken(2).value(); - } - return null; - } - - /** - * Try calling the supplied functions in sequence, stopping as soon as one of them succeeds. - * - * @param functions the functions - */ - @SuppressWarnings("unchecked") - protected void sequentially(Consumer... functions) { - if (functions == null || functions.length == 0) { - return; - } - Collection errors = new ArrayList<>(); - Marker marker = tokens.mark(); - for (Consumer function : functions) { - try { - function.accept(marker); - return; - } catch (ParsingException e) { - errors.add(e); - tokens.rewind(marker); - } - } - parsingFailed(marker.position(), errors, "One or more errors trying to parse statement"); - } - - /** - * Parse and consume the {@code DEFAULT} clause. Currently, this method does not capture the default in any way, - * since that will likely require parsing the default clause into a useful value (e.g., dealing with hexadecimals, - * bit-set literals, date-time literals, etc.). - * - * @param start the marker at the beginning of the clause - */ - protected void parseDefaultClause(Marker start) { - tokens.consume("DEFAULT"); - if (isNextTokenQuotedIdentifier()) { - // We know that it is a quoted literal ... - parseLiteral(start); - } else { - if (tokens.matchesAnyOf("CURRENT_TIMESTAMP", "NOW")) { - parseCurrentTimestampOrNow(); - parseOnUpdateOrDelete(tokens.mark()); - } else if (tokens.canConsume("NULL")) { - // do nothing ... - } else { - parseLiteral(start); - } - } - } - - /** - * Parse and consume the {@code DEFAULT} clause. Add default value in column; - * - * @param start the marker at the beginning of the clause - * @param column column editor - */ - protected void parseDefaultClause(Marker start, ColumnEditor column) { - tokens.consume("DEFAULT"); - if (isNextTokenQuotedIdentifier()) { - // We know that it is a quoted literal ... - Object defaultValue = parseLiteral(start); - column.defaultValue(defaultValue); - } else { - if (tokens.matchesAnyOf("CURRENT_TIMESTAMP", "NOW")) { - parseCurrentTimestampOrNow(); - parseOnUpdateOrDelete(tokens.mark()); - //CURRENT_TIMESTAMP and NOW default value will be replaced with epoch timestamp - column.defaultValue("1970-01-01 00:00:00"); - } else if (tokens.canConsume("NULL")) { - // If the default value of column is Null, we will set default value null; - column.defaultValue(null); - } else { - Object defaultValue = parseLiteral(start); - column.defaultValue(defaultValue); - } - } - } - - protected void parseOnUpdateOrDelete(Marker start) { - if (tokens.canConsume("ON") && tokens.canConsumeAnyOf("UPDATE", "DELETE")) { - parseCurrentTimestampOrNow(); - } - } - - private void parseCurrentTimestampOrNow() { - tokens.consumeAnyOf("CURRENT_TIMESTAMP", "NOW"); - if (tokens.canConsume('(')) { - if (!tokens.canConsume(')')) { - tokens.consumeInteger(); - tokens.consume(')'); - } - } - } -} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index f1b25a51e..2c6bc9494 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -19,6 +19,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode; import io.debezium.connector.mysql.MySqlSystemVariables.MySqlScope; +import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.document.Document; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; @@ -111,10 +112,7 @@ public MySqlSchema(MySqlConnectorConfig configuration, this.skipUnparseableDDL = dbHistoryConfig.getBoolean(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS); this.storeOnlyMonitoredTablesDdl = dbHistoryConfig.getBoolean(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL); - this.ddlParser = configuration.getDdlParsingMode().getNewParserInstance( - getValueConverters(configuration), - getTableFilter() - ); + this.ddlParser = new MySqlAntlrDdlParser(getValueConverters(configuration), getTableFilter()); this.ddlChanges = this.ddlParser.getDdlChanges(); // Create and configure the database history ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java index 3752a23cb..0dc070a09 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/UniqueDatabase.java @@ -24,7 +24,6 @@ import io.debezium.config.Configuration; import io.debezium.relational.history.FileDatabaseHistory; -import io.debezium.util.Testing; /** * Create and populate a unique instance of a MySQL database for each run of JUnit test. A user of class @@ -179,12 +178,6 @@ public Configuration.Builder defaultJdbcConfigBuilder() { * @return Configuration builder initialized with JDBC connection parameters and most frequently used parameters */ public Configuration.Builder defaultConfig() { - String ddlParserMode = System.getProperty( - MySqlConnectorConfig.DDL_PARSER_MODE.name(), - MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString() - ); - Testing.print("DDL parser mode: " + ddlParserMode); - final Configuration.Builder builder = defaultJdbcConfigBuilder() .with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED) .with(MySqlConnectorConfig.SERVER_ID, 18765) @@ -192,8 +185,7 @@ public Configuration.Builder defaultConfig() { .with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10) .with(MySqlConnectorConfig.DATABASE_WHITELIST, getDatabaseName()) .with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class) - .with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000) - .with(MySqlConnectorConfig.DDL_PARSER_MODE, ddlParserMode); + .with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10_000); if (dbHistoryPath != null) { builder.with(FileDatabaseHistory.FILE_PATH, dbHistoryPath); diff --git a/debezium-microbenchmark/src/main/java/io/debezium/performance/connector/mysql/parser/MySqlDdlParserPerf.java b/debezium-microbenchmark/src/main/java/io/debezium/performance/connector/mysql/parser/MySqlDdlParserPerf.java index 705a42e70..82d98b89e 100644 --- a/debezium-microbenchmark/src/main/java/io/debezium/performance/connector/mysql/parser/MySqlDdlParserPerf.java +++ b/debezium-microbenchmark/src/main/java/io/debezium/performance/connector/mysql/parser/MySqlDdlParserPerf.java @@ -20,7 +20,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; -import io.debezium.connector.mysql.MySqlDdlParser; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.relational.Tables; import io.debezium.relational.ddl.AbstractDdlParser; @@ -37,7 +36,6 @@ public class MySqlDdlParserPerf { @State(Scope.Thread) public static class ParserState { - public AbstractDdlParser legacyParser; public AbstractDdlParser antlrParser; public Tables tables; public String ddl; @@ -48,7 +46,6 @@ public static class ParserState { @Setup(Level.Trial) public void doSetup() { - legacyParser = new MySqlDdlParser(); antlrParser = new MySqlAntlrDdlParser(); tables = new Tables(); ddl = testStatement(); @@ -64,17 +61,6 @@ private String testStatement() { } } - - @Benchmark - @BenchmarkMode(Mode.AverageTime) - @OutputTimeUnit(TimeUnit.MICROSECONDS) - @Fork(value = 1) - @Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) - @Measurement(iterations = 3, time = 2, timeUnit = TimeUnit.SECONDS) - public void legacy(ParserState state) { - state.legacyParser.parse(state.ddl, state.tables); - } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS)