diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java index 36458ab18..aff1e2188 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java @@ -13,6 +13,8 @@ import java.util.stream.Collectors; import org.antlr.v4.runtime.tree.ParseTreeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.debezium.antlr.AntlrDdlParser; import io.debezium.antlr.DataTypeResolver; @@ -32,6 +34,8 @@ */ public class ColumnDefinitionParserListener extends MySqlParserBaseListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ColumnDefinitionParserListener.class); + private static final Pattern DOT = Pattern.compile("\\."); private final MySqlAntlrDdlParser parser; private final DataTypeResolver dataTypeResolver; @@ -147,7 +151,7 @@ private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) MySqlParser.StringDataTypeContext stringDataTypeContext = (MySqlParser.StringDataTypeContext) dataTypeContext; if (stringDataTypeContext.lengthOneDimension() != null) { - Integer length = Integer.valueOf(stringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); + Integer length = parseLength(stringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); columnEditor.length(length); } @@ -163,7 +167,7 @@ else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) { MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext = (MySqlParser.NationalStringDataTypeContext) dataTypeContext; if (nationalStringDataTypeContext.lengthOneDimension() != null) { - Integer length = Integer.valueOf(nationalStringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); + Integer length = parseLength(nationalStringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); columnEditor.length(length); } } @@ -171,7 +175,7 @@ else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeCon MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext = (MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext; if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) { - Integer length = Integer.valueOf(nationalVaryingStringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); + Integer length = parseLength(nationalVaryingStringDataTypeContext.lengthOneDimension().decimalLiteral().getText()); columnEditor.length(length); } } @@ -181,12 +185,12 @@ else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) { Integer length = null; Integer scale = null; if (dimensionDataTypeContext.lengthOneDimension() != null) { - length = Integer.valueOf(dimensionDataTypeContext.lengthOneDimension().decimalLiteral().getText()); + length = parseLength(dimensionDataTypeContext.lengthOneDimension().decimalLiteral().getText()); } if (dimensionDataTypeContext.lengthTwoDimension() != null) { List decimalLiterals = dimensionDataTypeContext.lengthTwoDimension().decimalLiteral(); - length = Integer.valueOf(decimalLiterals.get(0).getText()); + length = parseLength(decimalLiterals.get(0).getText()); scale = Integer.valueOf(decimalLiterals.get(1).getText()); } @@ -199,11 +203,11 @@ else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) { length = 10; } else { - length = Integer.valueOf(digits[0]); + length = parseLength(digits[0]); } } else { - length = Integer.valueOf(decimalLiterals.get(0).getText()); + length = parseLength(decimalLiterals.get(0).getText()); } if (decimalLiterals.size() > 1) { @@ -273,6 +277,16 @@ else if (dataTypeName.equals("SERIAL")) { } } + private Integer parseLength(String lengthStr) { + Long length = Long.parseLong(lengthStr); + if (length > Integer.MAX_VALUE) { + LOGGER.warn("The length '{}' of the column `{}`.`{}` is too large to be supported, truncating it to '{}'", + length, tableEditor.tableId(), columnEditor.name(), Integer.MAX_VALUE); + length = (long) Integer.MAX_VALUE; + } + return length.intValue(); + } + private void serialColumn() { if (optionalColumn.get() == null) { optionalColumn.set(Boolean.FALSE); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java index c236fae5a..d74d88e8b 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java @@ -81,6 +81,24 @@ public void beforeEach() { SchemaNameAdjuster.create(), new CustomConverterRegistry(null), SchemaBuilder.struct().build(), false, false); } + @Test + @FixFor("DBZ-4583") + public void shouldProcessLargeColumn() { + String ddl = "create table if not exists tbl_large_col(\n" + + "`id` bigint(20) NOT NULL AUTO_INCREMENT,\n" + + "c1 blob(4294967295) NOT NULL,\n" + + "PRIMARY KEY (`id`)\n" + + ")"; + parser.parse(ddl, tables); + assertThat(((MySqlAntlrDdlParser) parser).getParsingExceptionsFromWalker().size()).isEqualTo(0); + assertThat(tables.size()).isEqualTo(1); + + Table table = tables.forTable(null, null, "tbl_large_col"); + + assertThat(table.columnWithName("c1").typeName()).isEqualTo("BLOB"); + assertThat(table.columnWithName("c1").length()).isEqualTo(Integer.MAX_VALUE); + } + @Test @FixFor("DBZ-4497") public void shouldProcessMultipleSignedUnsignedForTable() { diff --git a/documentation/modules/ROOT/pages/connectors/mysql.adoc b/documentation/modules/ROOT/pages/connectors/mysql.adoc index a7bc86c34..d6a69af6c 100644 --- a/documentation/modules/ROOT/pages/connectors/mysql.adoc +++ b/documentation/modules/ROOT/pages/connectors/mysql.adoc @@ -1443,11 +1443,13 @@ a|_n/a_ |`BLOB` |`BYTES` or `STRING` a|_n/a_ + -Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#mysql-property-binary-handling-mode[`binary.handling.mode`] connector configuration property setting. +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#mysql-property-binary-handling-mode[`binary.handling.mode`] connector configuration property setting. + +Only values with a size of up to 2GB are supported. It is recommended to externalize large column values, using the claim check pattern. |`TEXT` |`STRING` -a|_n/a_ +a|_n/a_ + +Only values with a size of up to 2GB are supported. It is recommended to externalize large column values, using the claim check pattern. |`MEDIUMBLOB` |`BYTES` or `STRING` @@ -1461,11 +1463,13 @@ a|_n/a_ |`LONGBLOB` |`BYTES` or `STRING` a|_n/a_ + -Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#mysql-property-binary-handling-mode[`binary.handling.mode`] connector configuration property setting. +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#mysql-property-binary-handling-mode[`binary.handling.mode`] connector configuration property setting. + +Only values with a size of up to 2GB are supported. It is recommended to externalize large column values, using the claim check pattern. |`LONGTEXT` |`STRING` -a|_n/a_ +a|_n/a_ + +Only values with a size of up to 2GB are supported. It is recommended to externalize large column values, using the claim check pattern. |`JSON` |`STRING`