From 1c5f14f7fdef1909bc1140f8112eaa716d53a47f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Mon, 20 Mar 2023 11:30:39 -0400 Subject: [PATCH] DBZ-6225 DBZ-6226 DBZ-6231 Introduce CustomConverter for JDBC sink consistency In DBZ-6225, it was identified that the schema type associated with TINYINT(1) and BOOLEAN data types wasn't consistent. This converter implementation will always emit BOOLEAN data types as INT16, similar to TINYINT(1). In DBZ-6226, it was identified that the schema type associated with REAL data types wasn't consistent, emitted as FLOAT64 during snapshot and FLOAT32 in the streaming phase. This converter implementation will always emit such types as FLOAT64 for consistency. In DBZ-6231, during streaming the DDL parser will get the column type as the literal type supplied by the user's DDL, i.e. NCHAR or NVARCHAR but when the DDL is sourced from SHOW CREATE TABLE, MySQL returns these columns as types of CHAR/VARCHAR but with a character set. By passing the character_set in the schema consistency, we can deduce nationalized vs non-nationalized. --- .../spi/converter/RelationalColumn.java | 10 +- .../ColumnDefinitionParserListener.java | 5 + .../JdbcSinkDataTypesConverter.java | 194 ++++++++++++ .../MySqlJdbcSinkDataTypeConverterIT.java | 295 ++++++++++++++++++ .../src/test/resources/ddl/boolean_test.sql | 7 + .../ddl/nationalized_character_test.sql | 8 + .../src/test/resources/ddl/real_test.sql | 6 + .../relational/CustomConverterRegistry.java | 5 + 8 files changed, 529 insertions(+), 1 deletion(-) create mode 100644 debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/converters/JdbcSinkDataTypesConverter.java create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlJdbcSinkDataTypeConverterIT.java create mode 100644 debezium-connector-mysql/src/test/resources/ddl/boolean_test.sql create mode 100644 debezium-connector-mysql/src/test/resources/ddl/nationalized_character_test.sql create mode 100644 debezium-connector-mysql/src/test/resources/ddl/real_test.sql diff --git a/debezium-api/src/main/java/io/debezium/spi/converter/RelationalColumn.java b/debezium-api/src/main/java/io/debezium/spi/converter/RelationalColumn.java index 992f038fc..f4efc1bc8 100644 --- a/debezium-api/src/main/java/io/debezium/spi/converter/RelationalColumn.java +++ b/debezium-api/src/main/java/io/debezium/spi/converter/RelationalColumn.java @@ -14,7 +14,6 @@ * A definition of a converted relational column. * * @author Randall Hauch - */ @Incubating public interface RelationalColumn extends ConvertedField { @@ -82,4 +81,13 @@ public interface RelationalColumn extends ConvertedField { * @return {@code true} if the default value was provided, or {@code false} otherwise */ boolean hasDefaultValue(); + + /** + * Get the character set associated with the column. + * + * @return the character set name + */ + default String charsetName() { + return null; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java index 6c3a667ad..00cf4d84c 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/antlr/listener/ColumnDefinitionParserListener.java @@ -271,6 +271,11 @@ else if (dataTypeName.equals("SERIAL")) { if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) { // NCHAR and NVARCHAR columns always uses utf8 as charset columnEditor.charsetName("utf8"); + + if (Types.NCHAR == jdbcDataType && columnEditor.length() == -1) { + // Explicitly set NCHAR column size as 1 when no length specified + columnEditor.length(1); + } } else { columnEditor.charsetName(charsetName); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/converters/JdbcSinkDataTypesConverter.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/converters/JdbcSinkDataTypesConverter.java new file mode 100644 index 000000000..6cef4caf9 --- /dev/null +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/converters/JdbcSinkDataTypesConverter.java @@ -0,0 +1,194 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.converters; + +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.function.Predicate; + +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.function.Predicates; +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import io.debezium.util.Strings; + +/** + * MySQL handles several data types differently between streaming and snapshot and its important + * that data types be handled consistently across both phases for JDBC sink connectors to create + * the sink tables properly that adhere to the data provided in both phases. + * + * This converter specific makes the following changes: + * - {@code BOOLEAN} columns always emitted as INT16 schema types, true=1 and false=0. + * - {@code REAL} columns always emitted as FLOAT64 schema types. + * - String-based columns always emitted with "__debezium.source.column.character_set" parameter. + * + * @author Chris Cranford + */ +public class JdbcSinkDataTypesConverter implements CustomConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkDataTypesConverter.class); + + private static final Short INT16_FALLBACK = (short) 0; + private static final Float FLOAT32_FALLBACK = 0f; + private static final Double FLOAT64_FALLBACK = 0d; + + public static final String SELECTOR_BOOLEAN_PROPERTY = "selector.boolean"; + public static final String SELECTOR_REAL_PROPERTY = "selector.real"; + public static final String SELECTOR_STRING_PROPERTY = "selector.string"; + public static final String TREAT_REAL_AS_DOUBLE = "treat.real.as.double"; + + private Predicate selectorBoolean = x -> false; + private Predicate selectorReal = x -> false; + private Predicate selectorString = x -> false; + private boolean treatRealAsDouble = true; // MySQL default + + @Override + public void configure(Properties props) { + final String booleanSelectorConfig = props.getProperty(SELECTOR_BOOLEAN_PROPERTY); + if (!Strings.isNullOrBlank(booleanSelectorConfig)) { + selectorBoolean = Predicates.includes(booleanSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name()); + } + final String realSelectorConfig = props.getProperty(SELECTOR_REAL_PROPERTY); + if (!Strings.isNullOrBlank(realSelectorConfig)) { + selectorReal = Predicates.includes(realSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name()); + } + final String stringSelectorConfig = props.getProperty(SELECTOR_STRING_PROPERTY); + if (!Strings.isNullOrBlank(stringSelectorConfig)) { + selectorString = Predicates.includes(stringSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name()); + } + final String realAsDouble = props.getProperty(TREAT_REAL_AS_DOUBLE); + if (!Strings.isNullOrEmpty(realAsDouble)) { + treatRealAsDouble = Boolean.parseBoolean(realAsDouble); + } + } + + @Override + public void converterFor(RelationalColumn field, ConverterRegistration registration) { + if (selectorBoolean.test(field)) { + registration.register(SchemaBuilder.int16(), getBooleanConverter(field)); + } + else if (selectorReal.test(field)) { + if (treatRealAsDouble) { + registration.register(SchemaBuilder.float64(), getRealConverterDouble(field)); + } + else { + registration.register(SchemaBuilder.float32(), getRealConverterFloat(field)); + } + } + else if (selectorString.test(field)) { + final SchemaBuilder schemaBuilder = SchemaBuilder.string(); + schemaBuilder.parameter("__debezium.source.column.character_set", field.charsetName()); + registration.register(schemaBuilder, getStringConverter(field)); + } + } + + private Converter getBooleanConverter(RelationalColumn field) { + return value -> { + if (value == null) { + if (field.isOptional()) { + return null; + } + else if (field.hasDefaultValue()) { + return toTinyInt((Boolean) field.defaultValue()); + } + return INT16_FALLBACK; + } + else if (value instanceof Boolean) { + return toTinyInt((Boolean) value); + } + else if (value instanceof Number) { + return toTinyInt(((Number) value).intValue() > 0); + } + else if (value instanceof String) { + try { + return toTinyInt(Integer.parseInt((String) value) > 0); + } + catch (NumberFormatException e) { + return toTinyInt(Boolean.parseBoolean((String) value)); + } + } + LOGGER.warn("Cannot convert '{}' to INT16", value.getClass()); + return INT16_FALLBACK; + }; + } + + private Converter getRealConverterDouble(RelationalColumn field) { + return value -> { + if (value == null) { + if (field.isOptional()) { + return null; + } + else if (field.hasDefaultValue()) { + return (double) field.defaultValue(); + } + return FLOAT64_FALLBACK; + } + else if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + else if (value instanceof String) { + return Double.parseDouble((String) value); + } + LOGGER.warn("Cannot convert '{}' to FLOAT64.", value.getClass()); + return FLOAT64_FALLBACK; + }; + } + + private Converter getRealConverterFloat(RelationalColumn field) { + return value -> { + if (value == null) { + if (field.isOptional()) { + return null; + } + else if (field.hasDefaultValue()) { + return (float) field.defaultValue(); + } + return FLOAT32_FALLBACK; + } + else if (value instanceof Number) { + return ((Number) value).floatValue(); + } + else if (value instanceof String) { + return Float.parseFloat((String) value); + } + LOGGER.warn("Cannot convert '{}' to FLOAT32.", value.getClass()); + return FLOAT32_FALLBACK; + }; + } + + private Converter getStringConverter(RelationalColumn field) { + return value -> { + if (value == null) { + if (field.isOptional()) { + return null; + } + else if (field.hasDefaultValue()) { + return (String) field.defaultValue(); + } + return ""; + } + else if (value instanceof byte[]) { + return new String((byte[]) value, StandardCharsets.UTF_8); + } + else if (value instanceof Number) { + return ((Number) value).toString(); + } + else if (value instanceof String) { + return (String) value; + } + LOGGER.warn("Cannot convert '{}' to STRING", value.getClass()); + return ""; + }; + } + + private static short toTinyInt(Boolean value) { + return (short) (value ? 1 : 0); + } + +} diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlJdbcSinkDataTypeConverterIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlJdbcSinkDataTypeConverterIT.java new file mode 100644 index 000000000..c9b5f4285 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlJdbcSinkDataTypeConverterIT.java @@ -0,0 +1,295 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Path; +import java.util.List; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.converters.JdbcSinkDataTypesConverter; +import io.debezium.data.Envelope; +import io.debezium.doc.FixFor; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.util.Testing; + +/** + * Tests for the MySQL JDBC Sink converter. + * + * @author Chris Cranford + */ +public class MySqlJdbcSinkDataTypeConverterIT extends AbstractConnectorTest { + + private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-jdbc-sink.text").toAbsolutePath(); + + private Configuration config; + + @Before + public void beforeEach() { + stopConnector(); + initializeConnectorTestFramework(); + } + + @After + public void afterEach() { + try { + stopConnector(); + } + finally { + Testing.Files.delete(SCHEMA_HISTORY_PATH); + } + } + + @Test + @FixFor("DBZ-6225") + public void testBooleanDataTypeMapping() throws Exception { + final UniqueDatabase DATABASE = new UniqueDatabase("booleanit", "boolean_test").withDbHistoryPath(SCHEMA_HISTORY_PATH); + DATABASE.createAndInitialize(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("BOOLEAN_TEST") + "," + DATABASE.qualifiedTableName("BOOLEAN_TEST2")) + .with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") + .with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink") + .with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()) + .with("jdbc-sink.selector.boolean", ".*BOOLEAN_TEST.b.*|.*BOOLEAN_TEST2.b.*") + .build(); + + start(MySqlConnector.class, config); + + SourceRecords records = consumeRecordsByTopic(2 + 4 + 1); + assertThat(records).isNotNull(); + + List tableRecords = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TEST")); + assertThat(tableRecords).hasSize(1); + + SourceRecord record = tableRecords.get(0); + System.out.println(record); + + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + Schema afterSchema = record.valueSchema().field("after").schema(); + + // Assert how the BOOLEAN data type is mapped during the snapshot phase. + assertThat(afterSchema.field("b1").schema().type()).isEqualTo(Schema.Type.INT16); + assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT"); + assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1"); + assertThat(after.get("b1")).isEqualTo((short) 0); + assertThat(afterSchema.field("b2").schema().type()).isEqualTo(Schema.Type.INT16); + assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT"); + assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1"); + assertThat(after.get("b2")).isEqualTo((short) 1); + + // Create the table after-the-fact + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) { + try (JdbcConnection conn = db.connect()) { + conn.execute("CREATE TABLE BOOLEAN_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " + + "`b1` boolean default true, `b2` boolean default false, " + + "primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"); + conn.execute("INSERT INTO BOOLEAN_TEST2 (b1,b2) VALUES (true, false)"); + } + } + + records = consumeRecordsByTopic(2); + assertThat(records).isNotNull(); + + tableRecords = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TEST2")); + assertThat(tableRecords).hasSize(1); + + record = tableRecords.get(0); + System.out.println(record); + + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + afterSchema = record.valueSchema().field("after").schema(); + + // Assert how the BOOLEAN data type is mapped during the streaming phase. + // During streaming the DDL that gets parsed provides the column type as BOOLEAN and this is what gets passed + // into the Column's relational model and gets propagated. Despite being BOOLEAN, it should still be sent as + // an INT16 data type into Kafka. The sink connector should be able to deduce the type as TINYINT(1) when the + // column propagation is enabled because of type being BOOLEAN. + assertThat(afterSchema.field("b1").schema().type()).isEqualTo(Schema.Type.INT16); + assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN"); + assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.length")).isNull(); + assertThat(after.get("b1")).isEqualTo((short) 1); + assertThat(afterSchema.field("b2").schema().type()).isEqualTo(Schema.Type.INT16); + assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN"); + assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.length")).isNull(); + assertThat(after.get("b2")).isEqualTo((short) 0); + + stopConnector(); + } + + @Test + @FixFor("DBZ-6226") + public void testRealDataTypeMapping() throws Exception { + final UniqueDatabase DATABASE = new UniqueDatabase("realit", "real_test").withDbHistoryPath(SCHEMA_HISTORY_PATH); + DATABASE.createAndInitialize(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("REAL_TEST") + "," + DATABASE.qualifiedTableName("REAL_TEST2")) + .with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") + .with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink") + .with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()) + .with("jdbc-sink.selector.real", ".*REAL_TEST.r.*|.*REAL_TEST2.r.*") + .build(); + + start(MySqlConnector.class, config); + + SourceRecords records = consumeRecordsByTopic(2 + 4 + 1); + assertThat(records).isNotNull(); + + List tableRecords = records.recordsForTopic(DATABASE.topicForTable("REAL_TEST")); + assertThat(tableRecords).hasSize(1); + + SourceRecord record = tableRecords.get(0); + + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + Schema afterSchema = record.valueSchema().field("after").schema(); + + // Assert how the BOOLEAN data type is mapped during the snapshot phase. + assertThat(afterSchema.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64); + assertThat(afterSchema.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("DOUBLE"); + assertThat(after.get("r1")).isEqualTo(2.36d); + + // Create the table after-the-fact + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) { + try (JdbcConnection conn = db.connect()) { + conn.execute("CREATE TABLE REAL_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " + + "`r1` real default 3.14, " + + "primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"); + conn.execute("INSERT INTO REAL_TEST2 (r1) VALUES (9.78)"); + } + } + + records = consumeRecordsByTopic(2); + assertThat(records).isNotNull(); + + tableRecords = records.recordsForTopic(DATABASE.topicForTable("REAL_TEST2")); + assertThat(tableRecords).hasSize(1); + + record = tableRecords.get(0); + + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + afterSchema = record.valueSchema().field("after").schema(); + + // Assert the created table after-the-fact record is identical to the snapshot + // During streaming the DDL that gets parsed provides the column type as REAL and this is what gets passed + // into the Column's relational model and gets propagated. Despite being REAL, it should still be sent as + // a FLOAT64 data type into Kafka. + assertThat(afterSchema.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64); + assertThat(afterSchema.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("REAL"); + assertThat(after.get("r1")).isEqualTo(9.78d); + + stopConnector(); + } + + @Test + @FixFor("DBZ-6231") + public void testNationalizedCharacterDataTypeMappings() throws Exception { + final UniqueDatabase DATABASE = new UniqueDatabase("nctestit", "nationalized_character_test").withDbHistoryPath(SCHEMA_HISTORY_PATH); + DATABASE.createAndInitialize(); + Testing.Files.delete(SCHEMA_HISTORY_PATH); + + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("NC_TEST") + "," + DATABASE.qualifiedTableName("NC_TEST2")) + .with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") + .with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink") + .with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName()) + .with("jdbc-sink.selector.char", ".*NC_TEST.nc.*|.*NC_TEST2.nc.*") + .build(); + + start(MySqlConnector.class, config); + + SourceRecords records = consumeRecordsByTopic(2 + 4 + 1); + assertThat(records).isNotNull(); + + List tableRecords = records.recordsForTopic(DATABASE.topicForTable("NC_TEST")); + assertThat(tableRecords).hasSize(1); + + SourceRecord record = tableRecords.get(0); + + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + Schema afterSchema = record.valueSchema().field("after").schema(); + System.out.println(afterSchema.field("nc1").schema().parameters()); + System.out.println(afterSchema.field("nc2").schema().parameters()); + System.out.println(afterSchema.field("nc3").schema().parameters()); + + // Assert how the BOOLEAN data type is mapped during the snapshot phase. + assertThat(afterSchema.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc1")).isEqualTo("a"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1"); + assertThat(afterSchema.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc2")).isEqualTo("123"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5"); + assertThat(afterSchema.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc3")).isEqualTo("hello"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("VARCHAR"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25"); + + // Create the table after-the-fact + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) { + try (JdbcConnection conn = db.connect()) { + conn.execute("CREATE TABLE NC_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " + + "`nc1` nchar, `nc2` nchar(5), `nc3` nvarchar(25), " + + "primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;"); + conn.execute("INSERT INTO NC_TEST2 (nc1,nc2,nc3) VALUES ('b', '456', 'world')"); + } + } + + records = consumeRecordsByTopic(2); + assertThat(records).isNotNull(); + + tableRecords = records.recordsForTopic(DATABASE.topicForTable("NC_TEST2")); + assertThat(tableRecords).hasSize(1); + + record = tableRecords.get(0); + + after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + afterSchema = record.valueSchema().field("after").schema(); + + // Assert the created table after-the-fact record is identical to the snapshot + // During streaming the DDL that gets parsed provides the column type as NCHAR or NVARCHAR as this gets pulled + // directly from the DDL and this is what gets passed into the Column's relational model and gets propagated. + // Using the converter, regardless of whether this is propagated or not, the character_set will be sent and the + // sink will be able to use this information to derived nationalized character types when sourcing from MySQL. + // Since these were always being mapped as Kafka STRING regardless, there is no issue with the schema types. + assertThat(afterSchema.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc1")).isEqualTo("b"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR"); + assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1"); + assertThat(afterSchema.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc2")).isEqualTo("456"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR"); + assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5"); + assertThat(afterSchema.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING); + assertThat(after.get("nc3")).isEqualTo("world"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NVARCHAR"); + assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25"); + + stopConnector(); + } + +} diff --git a/debezium-connector-mysql/src/test/resources/ddl/boolean_test.sql b/debezium-connector-mysql/src/test/resources/ddl/boolean_test.sql new file mode 100644 index 000000000..31bc72c8a --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/boolean_test.sql @@ -0,0 +1,7 @@ +CREATE TABLE `BOOLEAN_TEST` ( + `id` INT NOT NULL AUTO_INCREMENT, + `b1` boolean default true, + `b2` boolean default false, + PRIMARY KEY (`ID`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; +INSERT INTO BOOLEAN_TEST(b1, b2) VALUE (false, true); \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/resources/ddl/nationalized_character_test.sql b/debezium-connector-mysql/src/test/resources/ddl/nationalized_character_test.sql new file mode 100644 index 000000000..48cf6071c --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/nationalized_character_test.sql @@ -0,0 +1,8 @@ +CREATE TABLE `NC_TEST` ( + `id` INT NOT NULL AUTO_INCREMENT, + `nc1` nchar default null, + `nc2` nchar(5) default null, + `nc3` nvarchar(25) default null, + PRIMARY KEY (`ID`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; +INSERT INTO NC_TEST(nc1,nc2,nc3) VALUES ('a', '123', 'hello'); \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/resources/ddl/real_test.sql b/debezium-connector-mysql/src/test/resources/ddl/real_test.sql new file mode 100644 index 000000000..7fcf64f78 --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/real_test.sql @@ -0,0 +1,6 @@ +CREATE TABLE `REAL_TEST` ( + `id` INT NOT NULL AUTO_INCREMENT, + `r1` real default 1.25, + PRIMARY KEY (`ID`) +) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4; +INSERT INTO REAL_TEST(r1) VALUE (2.36); \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java b/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java index 9597b4c0a..be23be8f6 100644 --- a/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java +++ b/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java @@ -114,6 +114,11 @@ public boolean hasDefaultValue() { public Object defaultValue() { return defaultValue; } + + @Override + public String charsetName() { + return column.charsetName(); + } }, (fieldSchema, converter) -> definition.set(new ConverterDefinition<>(fieldSchema, converter))); if (definition.get() != null) {