DBZ-252 Misc. improvements;
* Dedicated getter for DDL mode * Using Objects#equal() * Typo fixes
This commit is contained in:
parent
6a6a8c9f99
commit
54ca30624d
@ -1047,11 +1047,16 @@ public static final Field MASK_COLUMN(int length) {
|
||||
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL,
|
||||
DatabaseHistory.DDL_FILTER);
|
||||
|
||||
private final Configuration config;
|
||||
|
||||
private final SnapshotLockingMode snapshotLockingMode;
|
||||
private final DdlParsingMode ddlParsingMode;
|
||||
|
||||
public MySqlConnectorConfig(Configuration config) {
|
||||
super(config);
|
||||
|
||||
this.config = config;
|
||||
|
||||
// If deprecated snapshot.minimal.locking property is explicitly configured
|
||||
if (config.hasKey(MySqlConnectorConfig.SNAPSHOT_MINIMAL_LOCKING.name())) {
|
||||
// Coerce it into its replacement appropriate snapshot.locking.mode value
|
||||
@ -1064,12 +1069,27 @@ public MySqlConnectorConfig(Configuration config) {
|
||||
// Otherwise use configured snapshot.locking.mode configuration.
|
||||
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
|
||||
}
|
||||
|
||||
String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE);
|
||||
this.ddlParsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString());
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
/**
|
||||
* @deprecated Typed accessors should be used instead
|
||||
*/
|
||||
public Configuration getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
public SnapshotLockingMode getSnapshotLockingMode() {
|
||||
return this.snapshotLockingMode;
|
||||
}
|
||||
|
||||
public DdlParsingMode getDdlParsingMode() {
|
||||
return ddlParsingMode;
|
||||
}
|
||||
|
||||
protected static ConfigDef configDef() {
|
||||
ConfigDef config = new ConfigDef();
|
||||
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID,
|
||||
|
@ -20,7 +20,6 @@
|
||||
import io.debezium.annotation.NotThreadSafe;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.DdlParsingMode;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.DecimalHandlingMode;
|
||||
import io.debezium.connector.mysql.MySqlSystemVariables.MySqlScope;
|
||||
import io.debezium.document.Document;
|
||||
@ -92,7 +91,9 @@ public class MySqlSchema {
|
||||
* may be null if not needed
|
||||
* @param tableIdCaseInsensitive true if table lookup ignores letter case
|
||||
*/
|
||||
public MySqlSchema(Configuration config, String serverName, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, TopicSelector topicSelector) {
|
||||
public MySqlSchema(MySqlConnectorConfig configuration, String serverName, Predicate<String> gtidFilter, boolean tableIdCaseInsensitive, TopicSelector topicSelector) {
|
||||
Configuration config = configuration.getConfig();
|
||||
|
||||
this.filters = new Filters(config);
|
||||
this.tables = new Tables(tableIdCaseInsensitive);
|
||||
this.topicSelector = topicSelector;
|
||||
@ -110,10 +111,7 @@ public MySqlSchema(Configuration config, String serverName, Predicate<String> gt
|
||||
MySqlValueConverters valueConverters = new MySqlValueConverters(decimalMode, timePrecisionMode, bigIntUnsignedMode);
|
||||
this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameAdjuster, SourceInfo.SCHEMA);
|
||||
|
||||
String ddlParsingModeStr = config.getString(MySqlConnectorConfig.DDL_PARSER_MODE);
|
||||
DdlParsingMode parsingMode = DdlParsingMode.parse(ddlParsingModeStr, MySqlConnectorConfig.DDL_PARSER_MODE.defaultValueAsString());
|
||||
|
||||
this.ddlParser = parsingMode.getNewParserInstance(valueConverters);
|
||||
this.ddlParser = configuration.getDdlParsingMode().getNewParserInstance(valueConverters);
|
||||
this.ddlChanges = this.ddlParser.getDdlChanges();
|
||||
|
||||
// Set up the server name and schema prefix ...
|
||||
|
@ -81,7 +81,7 @@ public MySqlTaskContext(Configuration config, Boolean tableIdCaseInsensitive) {
|
||||
}
|
||||
|
||||
// Set up the MySQL schema ...
|
||||
this.dbSchema = new MySqlSchema(config, serverName(), this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector);
|
||||
this.dbSchema = new MySqlSchema(connectorConfig, serverName(), this.gtidSourceFilter, this.tableIdCaseInsensitive, topicSelector);
|
||||
|
||||
// Set up the record processor ...
|
||||
this.recordProcessor = new RecordMakers(dbSchema, source, topicSelector, config.getBoolean(CommonConnectorConfig.TOMBSTONES_ON_DELETE));
|
||||
|
@ -87,7 +87,7 @@ public MySqlSchema createSchemas() {
|
||||
Configuration config = configBuilder.build();
|
||||
String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME);
|
||||
|
||||
return new MySqlSchema(config, serverName, null, false,
|
||||
return new MySqlSchema(new MySqlConnectorConfig(config), serverName, null, false,
|
||||
TopicSelector.defaultSelector(serverName, "__debezium-heartbeat"));
|
||||
}
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ public void shouldParseCreateTableStatementWithCollate() {
|
||||
Column column = table.columnWithName("v1");
|
||||
assertThat(column.typeUsesCharset()).isTrue();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-646")
|
||||
public void shouldParseTokuDBTable() {
|
||||
@ -705,7 +705,7 @@ public void shouldParseSetOfSessionVariable() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseButNotSetUserVariableWithHyphenDelimiter() {
|
||||
public void shouldParseButNotSetUserVariableWithUnderscoreDelimiter() {
|
||||
String ddl = "SET @a_b_c_d:=1";
|
||||
parser.parse(ddl, tables);
|
||||
assertLocalVariable("a_b_c_d", null);
|
||||
@ -714,7 +714,7 @@ public void shouldParseButNotSetUserVariableWithHyphenDelimiter() {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldParseVariableWithHyphenDelimiter() {
|
||||
public void shouldParseVariableWithUnderscoreDelimiter() {
|
||||
String ddl = "SET a_b_c_d=1";
|
||||
parser.parse(ddl, tables);
|
||||
assertSessionVariable("a_b_c_d", "1");
|
||||
|
@ -8,6 +8,7 @@
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -177,10 +178,10 @@ public void removeTablesForDatabase(String catalogName, String schemaName) {
|
||||
lock.write(() -> {
|
||||
tablesByTableId.entrySet().removeIf(tableIdTableEntry -> {
|
||||
TableId tableId = tableIdTableEntry.getKey();
|
||||
boolean equalSchema = schemaName == null && tableId.schema() == null
|
||||
|| schemaName != null && schemaName.equals(tableId.schema());
|
||||
boolean equalCatalog = catalogName == null && tableId.catalog() == null
|
||||
|| catalogName != null && catalogName.equals(tableId.schema());
|
||||
|
||||
boolean equalCatalog = Objects.equals(catalogName, tableId.catalog());
|
||||
boolean equalSchema = Objects.equals(schemaName, tableId.schema());
|
||||
|
||||
return equalSchema && equalCatalog;
|
||||
});
|
||||
});
|
||||
|
@ -6,19 +6,20 @@
|
||||
|
||||
package io.debezium.relational.ddl;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
import io.debezium.relational.SystemVariables;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.text.MultipleParsingExceptions;
|
||||
import io.debezium.text.ParsingException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @author Roman Kuchár <kucharrom@gmail.com>.
|
||||
@ -77,6 +78,7 @@ public DdlChanges getDdlChanges() {
|
||||
return ddlChanges;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SystemVariables systemVariables() {
|
||||
return systemVariables;
|
||||
}
|
||||
@ -410,7 +412,9 @@ protected void debugSkipped(String statement) {
|
||||
}
|
||||
}
|
||||
|
||||
protected void commnetParsed(String comment) {
|
||||
logger.trace("COMMENT: {}", comment);
|
||||
protected void commentParsed(String comment) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("COMMENT: {}", comment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -256,7 +256,7 @@ protected void parseNextStatement(Marker marker) {
|
||||
*/
|
||||
protected void parseComment(Marker marker) {
|
||||
String comment = tokens.consume();
|
||||
commnetParsed(comment);
|
||||
commentParsed(comment);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -6,21 +6,22 @@
|
||||
|
||||
package io.debezium.antlr;
|
||||
|
||||
import io.debezium.relational.ddl.DataType;
|
||||
import io.debezium.relational.ddl.DataTypeBuilder;
|
||||
import io.debezium.text.ParsingException;
|
||||
import org.antlr.v4.runtime.ParserRuleContext;
|
||||
import org.antlr.v4.runtime.tree.TerminalNode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.antlr.v4.runtime.ParserRuleContext;
|
||||
import org.antlr.v4.runtime.tree.TerminalNode;
|
||||
|
||||
import io.debezium.relational.ddl.DataType;
|
||||
import io.debezium.relational.ddl.DataTypeBuilder;
|
||||
import io.debezium.text.ParsingException;
|
||||
|
||||
/**
|
||||
* A resolver for DBMS data types.
|
||||
*
|
||||
* It's main purpose is to match corresponding JDBC data type, resolve a name of parsed data type,
|
||||
* Its main purpose is to match corresponding JDBC data type, resolve a name of parsed data type,
|
||||
* and optionally predefine default values for length and scale for DBMS data type.
|
||||
*
|
||||
* @author Roman Kuchár <kucharrom@gmail.com>.
|
||||
@ -170,7 +171,7 @@ public DataTypeEntry setSuffixTokens(Integer... suffixTokens) {
|
||||
* @param defaultLength default length for data type.
|
||||
* @return instance of this class, so the calls may be chained.
|
||||
*/
|
||||
public DataTypeEntry setDefualtLengthDimmension(int defaultLength) {
|
||||
public DataTypeEntry setDefaultLengthDimension(int defaultLength) {
|
||||
this.defaultLength = defaultLength;
|
||||
return this;
|
||||
}
|
||||
@ -187,6 +188,5 @@ public DataTypeEntry setDefaultLengthScaleDimension(int defaultLength, int defau
|
||||
this.defaultScale = defaultScale;
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user