diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java index 0b03780d2..6df05a1a1 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java @@ -10,7 +10,6 @@ import java.sql.Types; import java.util.List; -import java.util.Optional; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -39,7 +38,7 @@ public class ColumnDefinitionParserListener extends MySqlParserBaseListener { private final TableEditor tableEditor; private ColumnEditor columnEditor; private boolean uniqueColumn; - private Optional optionalColumn; + private Boolean optionalColumn; private final MySqlValueConverters converters; private final MySqlDefaultValuePreConverter defaultValuePreConverter = new MySqlDefaultValuePreConverter(); @@ -66,14 +65,16 @@ public Column getColumn() { @Override public void enterColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { uniqueColumn = false; - optionalColumn = Optional.empty(); + optionalColumn = null; resolveColumnDataType(ctx.dataType()); super.enterColumnDefinition(ctx); } @Override public void exitColumnDefinition(MySqlParser.ColumnDefinitionContext ctx) { - optionalColumn.ifPresent(optional -> columnEditor.optional(optional.booleanValue())); + if (optionalColumn != null) { + columnEditor.optional(optionalColumn.booleanValue()); + } if (uniqueColumn && !tableEditor.hasPrimaryKey()) { // take the first unique constrain if no primary key is set tableEditor.addColumn(columnEditor.create()); @@ -92,7 +93,7 @@ public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraint public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstraintContext ctx) { // this rule will be parsed only if no primary key is set in a table // otherwise the statement can't be executed due to multiple primary key error - optionalColumn = Optional.of(Boolean.FALSE); + optionalColumn = Boolean.FALSE; tableEditor.addColumn(columnEditor.create()); tableEditor.setPrimaryKeyNames(columnEditor.name()); super.enterPrimaryKeyColumnConstraint(ctx); @@ -100,7 +101,7 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai @Override public void enterNullNotnull(MySqlParser.NullNotnullContext ctx) { - optionalColumn = Optional.of(Boolean.valueOf(ctx.NOT() == null)); + optionalColumn = Boolean.valueOf(ctx.NOT() == null); super.enterNullNotnull(ctx); } @@ -266,8 +267,8 @@ else if (dataTypeName.equals("SERIAL")) { } private void serialColumn() { - if (!optionalColumn.isPresent()) { - optionalColumn = Optional.of(Boolean.FALSE); + if (optionalColumn == null) { + optionalColumn = Boolean.FALSE; } uniqueColumn = true; columnEditor.autoIncremented(true); @@ -275,7 +276,9 @@ private void serialColumn() { } private void convertDefaultValueToSchemaType(ColumnEditor columnEditor) { - optionalColumn.ifPresent(optional -> columnEditor.optional(optional.booleanValue())); + if (optionalColumn != null) { + columnEditor.optional(optionalColumn.booleanValue()); + } final Column column = columnEditor.create(); // if converters is not null and the default value is not null, we need to convert default value if (converters != null && columnEditor.defaultValue() != null) {