diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java index b41e22521..ff067acce 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDdlParser.java @@ -98,7 +98,14 @@ protected void initializeDataTypes(DataTypeParser dataTypes) { dataTypes.register(Types.BLOB, "VARCHAR(L) BINARY"); dataTypes.register(Types.BLOB, "BINARY[(L)]"); dataTypes.register(Types.VARCHAR, "VARCHAR(L)"); + dataTypes.register(Types.NVARCHAR, "NVARCHAR(L)"); + dataTypes.register(Types.NVARCHAR, "NATIONAL VARCHAR(L)"); + dataTypes.register(Types.NVARCHAR, "NCHAR VARCHAR(L)"); + dataTypes.register(Types.NVARCHAR, "NATIONAL CHARACTER VARYING(L)"); + dataTypes.register(Types.NVARCHAR, "NATIONAL CHAR VARYING(L)"); dataTypes.register(Types.CHAR, "CHAR[(L)]"); + dataTypes.register(Types.NCHAR, "NCHAR[(L)]"); + dataTypes.register(Types.NCHAR, "NATIONAL CHARACTER(L)"); dataTypes.register(Types.VARBINARY, "VARBINARY(L)"); dataTypes.register(Types.BLOB, "TINYBLOB"); dataTypes.register(Types.BLOB, "BLOB"); @@ -705,6 +712,11 @@ protected void parseColumnDefinition(Marker start, String columnName, TokenStrea if (dataType.scale() > -1) column.scale(dataType.scale()); } + if(Types.NCHAR == dataType.jdbcType() || Types.NVARCHAR == dataType.jdbcType()) { + // NCHAR and NVARCHAR columns always uses utf8 as charset + column.charsetName("utf8"); + } + if (tokens.canConsume("CHARSET") || tokens.canConsume("CHARACTER", "SET")) { String charsetName = tokens.consume(); if (!"DEFAULT".equalsIgnoreCase(charsetName)) { diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java index 9b5986e05..03b4ab80e 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDdlParserTest.java @@ -750,6 +750,25 @@ public void shouldParseCreateTableWithBitDefault() { assertColumn(t, "c2", "BIT", Types.BIT, 2, -1, false, false, false); } + @Test + public void shouldParseStatementForDbz142() { + parser.parse(readFile("ddl/mysql-dbz-142.ddl"), tables); + Testing.print(tables); + assertThat(tables.size()).isEqualTo(2); + assertThat(listener.total()).isEqualTo(2); + + Table t = tables.forTable(new TableId(null, null, "nvarchars")); + assertColumn(t, "c1", "NVARCHAR", Types.NVARCHAR, 255, "utf8", true); + assertColumn(t, "c2", "NATIONAL VARCHAR", Types.NVARCHAR, 255, "utf8", true); + assertColumn(t, "c3", "NCHAR VARCHAR", Types.NVARCHAR, 255, "utf8", true); + assertColumn(t, "c4", "NATIONAL CHARACTER VARYING", Types.NVARCHAR, 255, "utf8", true); + assertColumn(t, "c5", "NATIONAL CHAR VARYING", Types.NVARCHAR, 255, "utf8", true); + + Table t2 = tables.forTable(new TableId(null, null, "nchars")); + assertColumn(t2, "c1", "NATIONAL CHARACTER", Types.NCHAR, 10, "utf8", true); + assertColumn(t2, "c2", "NCHAR", Types.NCHAR, 10, "utf8", true); + } + protected void assertParseEnumAndSetOptions(String typeExpression, String optionString) { List options = MySqlDdlParser.parseSetAndEnumOptions(typeExpression); String commaSeperatedOptions = Strings.join(",", options); diff --git a/debezium-connector-mysql/src/test/resources/ddl/mysql-dbz-142.ddl b/debezium-connector-mysql/src/test/resources/ddl/mysql-dbz-142.ddl new file mode 100644 index 000000000..b40bdebed --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/mysql-dbz-142.ddl @@ -0,0 +1,2 @@ +CREATE TABLE nvarchars ( c1 NVARCHAR(255), c2 NATIONAL VARCHAR(255), c3 NCHAR VARCHAR(255), c4 NATIONAL CHARACTER VARYING(255), c5 NATIONAL CHAR VARYING(255) ); +CREATE TABLE nchars ( c1 NATIONAL CHARACTER(10), c2 NCHAR(10) );