DBZ-6225 DBZ-6226 DBZ-6231 Introduce CustomConverter for JDBC sink consistency

In DBZ-6225, it was identified that the schema type associated with TINYINT(1)
and BOOLEAN data types wasn't consistent. This converter implementation will
always emit BOOLEAN data types as INT16, similar to TINYINT(1).

In DBZ-6226, it was identified that the schema type associated with REAL data
types wasn't consistent, emitted as FLOAT64 during snapshot and FLOAT32 in
the streaming phase. This converter implementation will always emit such types
as FLOAT64 for consistency.

In DBZ-6231, during streaming the DDL parser will get the column type as the
literal type supplied by the user's DDL, i.e. NCHAR or NVARCHAR but when the
DDL is sourced from SHOW CREATE TABLE, MySQL returns these columns as types
of CHAR/VARCHAR but with a character set.  By passing the character_set in
the schema consistency, we can deduce nationalized vs non-nationalized.
This commit is contained in:
Chris Cranford 2023-03-20 11:30:39 -04:00 committed by Jiri Pechanec
parent 66ee2dd4eb
commit 1c5f14f7fd
8 changed files with 529 additions and 1 deletions

View File

@ -14,7 +14,6 @@
* A definition of a converted relational column.
*
* @author Randall Hauch
*/
@Incubating
public interface RelationalColumn extends ConvertedField {
@ -82,4 +81,13 @@ public interface RelationalColumn extends ConvertedField {
* @return {@code true} if the default value was provided, or {@code false} otherwise
*/
boolean hasDefaultValue();
/**
* Get the character set associated with the column.
*
* @return the character set name
*/
default String charsetName() {
return null;
}
}

View File

@ -271,6 +271,11 @@ else if (dataTypeName.equals("SERIAL")) {
if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) {
// NCHAR and NVARCHAR columns always uses utf8 as charset
columnEditor.charsetName("utf8");
if (Types.NCHAR == jdbcDataType && columnEditor.length() == -1) {
// Explicitly set NCHAR column size as 1 when no length specified
columnEditor.length(1);
}
}
else {
columnEditor.charsetName(charsetName);

View File

@ -0,0 +1,194 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.converters;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.function.Predicates;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.util.Strings;
/**
* MySQL handles several data types differently between streaming and snapshot and its important
* that data types be handled consistently across both phases for JDBC sink connectors to create
* the sink tables properly that adhere to the data provided in both phases.
*
* This converter specific makes the following changes:
* - {@code BOOLEAN} columns always emitted as INT16 schema types, true=1 and false=0.
* - {@code REAL} columns always emitted as FLOAT64 schema types.
* - String-based columns always emitted with "__debezium.source.column.character_set" parameter.
*
* @author Chris Cranford
*/
public class JdbcSinkDataTypesConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkDataTypesConverter.class);
private static final Short INT16_FALLBACK = (short) 0;
private static final Float FLOAT32_FALLBACK = 0f;
private static final Double FLOAT64_FALLBACK = 0d;
public static final String SELECTOR_BOOLEAN_PROPERTY = "selector.boolean";
public static final String SELECTOR_REAL_PROPERTY = "selector.real";
public static final String SELECTOR_STRING_PROPERTY = "selector.string";
public static final String TREAT_REAL_AS_DOUBLE = "treat.real.as.double";
private Predicate<RelationalColumn> selectorBoolean = x -> false;
private Predicate<RelationalColumn> selectorReal = x -> false;
private Predicate<RelationalColumn> selectorString = x -> false;
private boolean treatRealAsDouble = true; // MySQL default
@Override
public void configure(Properties props) {
final String booleanSelectorConfig = props.getProperty(SELECTOR_BOOLEAN_PROPERTY);
if (!Strings.isNullOrBlank(booleanSelectorConfig)) {
selectorBoolean = Predicates.includes(booleanSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name());
}
final String realSelectorConfig = props.getProperty(SELECTOR_REAL_PROPERTY);
if (!Strings.isNullOrBlank(realSelectorConfig)) {
selectorReal = Predicates.includes(realSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name());
}
final String stringSelectorConfig = props.getProperty(SELECTOR_STRING_PROPERTY);
if (!Strings.isNullOrBlank(stringSelectorConfig)) {
selectorString = Predicates.includes(stringSelectorConfig.trim(), x -> x.dataCollection() + "." + x.name());
}
final String realAsDouble = props.getProperty(TREAT_REAL_AS_DOUBLE);
if (!Strings.isNullOrEmpty(realAsDouble)) {
treatRealAsDouble = Boolean.parseBoolean(realAsDouble);
}
}
@Override
public void converterFor(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
if (selectorBoolean.test(field)) {
registration.register(SchemaBuilder.int16(), getBooleanConverter(field));
}
else if (selectorReal.test(field)) {
if (treatRealAsDouble) {
registration.register(SchemaBuilder.float64(), getRealConverterDouble(field));
}
else {
registration.register(SchemaBuilder.float32(), getRealConverterFloat(field));
}
}
else if (selectorString.test(field)) {
final SchemaBuilder schemaBuilder = SchemaBuilder.string();
schemaBuilder.parameter("__debezium.source.column.character_set", field.charsetName());
registration.register(schemaBuilder, getStringConverter(field));
}
}
private Converter getBooleanConverter(RelationalColumn field) {
return value -> {
if (value == null) {
if (field.isOptional()) {
return null;
}
else if (field.hasDefaultValue()) {
return toTinyInt((Boolean) field.defaultValue());
}
return INT16_FALLBACK;
}
else if (value instanceof Boolean) {
return toTinyInt((Boolean) value);
}
else if (value instanceof Number) {
return toTinyInt(((Number) value).intValue() > 0);
}
else if (value instanceof String) {
try {
return toTinyInt(Integer.parseInt((String) value) > 0);
}
catch (NumberFormatException e) {
return toTinyInt(Boolean.parseBoolean((String) value));
}
}
LOGGER.warn("Cannot convert '{}' to INT16", value.getClass());
return INT16_FALLBACK;
};
}
private Converter getRealConverterDouble(RelationalColumn field) {
return value -> {
if (value == null) {
if (field.isOptional()) {
return null;
}
else if (field.hasDefaultValue()) {
return (double) field.defaultValue();
}
return FLOAT64_FALLBACK;
}
else if (value instanceof Number) {
return ((Number) value).doubleValue();
}
else if (value instanceof String) {
return Double.parseDouble((String) value);
}
LOGGER.warn("Cannot convert '{}' to FLOAT64.", value.getClass());
return FLOAT64_FALLBACK;
};
}
private Converter getRealConverterFloat(RelationalColumn field) {
return value -> {
if (value == null) {
if (field.isOptional()) {
return null;
}
else if (field.hasDefaultValue()) {
return (float) field.defaultValue();
}
return FLOAT32_FALLBACK;
}
else if (value instanceof Number) {
return ((Number) value).floatValue();
}
else if (value instanceof String) {
return Float.parseFloat((String) value);
}
LOGGER.warn("Cannot convert '{}' to FLOAT32.", value.getClass());
return FLOAT32_FALLBACK;
};
}
private Converter getStringConverter(RelationalColumn field) {
return value -> {
if (value == null) {
if (field.isOptional()) {
return null;
}
else if (field.hasDefaultValue()) {
return (String) field.defaultValue();
}
return "";
}
else if (value instanceof byte[]) {
return new String((byte[]) value, StandardCharsets.UTF_8);
}
else if (value instanceof Number) {
return ((Number) value).toString();
}
else if (value instanceof String) {
return (String) value;
}
LOGGER.warn("Cannot convert '{}' to STRING", value.getClass());
return "";
};
}
private static short toTinyInt(Boolean value) {
return (short) (value ? 1 : 0);
}
}

View File

@ -0,0 +1,295 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.file.Path;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.converters.JdbcSinkDataTypesConverter;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
/**
* Tests for the MySQL JDBC Sink converter.
*
* @author Chris Cranford
*/
public class MySqlJdbcSinkDataTypeConverterIT extends AbstractConnectorTest {
private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-jdbc-sink.text").toAbsolutePath();
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
initializeConnectorTestFramework();
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(SCHEMA_HISTORY_PATH);
}
}
@Test
@FixFor("DBZ-6225")
public void testBooleanDataTypeMapping() throws Exception {
final UniqueDatabase DATABASE = new UniqueDatabase("booleanit", "boolean_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
DATABASE.createAndInitialize();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("BOOLEAN_TEST") + "," + DATABASE.qualifiedTableName("BOOLEAN_TEST2"))
.with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*")
.with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink")
.with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName())
.with("jdbc-sink.selector.boolean", ".*BOOLEAN_TEST.b.*|.*BOOLEAN_TEST2.b.*")
.build();
start(MySqlConnector.class, config);
SourceRecords records = consumeRecordsByTopic(2 + 4 + 1);
assertThat(records).isNotNull();
List<SourceRecord> tableRecords = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TEST"));
assertThat(tableRecords).hasSize(1);
SourceRecord record = tableRecords.get(0);
System.out.println(record);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Schema afterSchema = record.valueSchema().field("after").schema();
// Assert how the BOOLEAN data type is mapped during the snapshot phase.
assertThat(afterSchema.field("b1").schema().type()).isEqualTo(Schema.Type.INT16);
assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT");
assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
assertThat(after.get("b1")).isEqualTo((short) 0);
assertThat(afterSchema.field("b2").schema().type()).isEqualTo(Schema.Type.INT16);
assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("TINYINT");
assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
assertThat(after.get("b2")).isEqualTo((short) 1);
// Create the table after-the-fact
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection conn = db.connect()) {
conn.execute("CREATE TABLE BOOLEAN_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " +
"`b1` boolean default true, `b2` boolean default false, " +
"primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;");
conn.execute("INSERT INTO BOOLEAN_TEST2 (b1,b2) VALUES (true, false)");
}
}
records = consumeRecordsByTopic(2);
assertThat(records).isNotNull();
tableRecords = records.recordsForTopic(DATABASE.topicForTable("BOOLEAN_TEST2"));
assertThat(tableRecords).hasSize(1);
record = tableRecords.get(0);
System.out.println(record);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
afterSchema = record.valueSchema().field("after").schema();
// Assert how the BOOLEAN data type is mapped during the streaming phase.
// During streaming the DDL that gets parsed provides the column type as BOOLEAN and this is what gets passed
// into the Column's relational model and gets propagated. Despite being BOOLEAN, it should still be sent as
// an INT16 data type into Kafka. The sink connector should be able to deduce the type as TINYINT(1) when the
// column propagation is enabled because of type being BOOLEAN.
assertThat(afterSchema.field("b1").schema().type()).isEqualTo(Schema.Type.INT16);
assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN");
assertThat(afterSchema.field("b1").schema().parameters().get("__debezium.source.column.length")).isNull();
assertThat(after.get("b1")).isEqualTo((short) 1);
assertThat(afterSchema.field("b2").schema().type()).isEqualTo(Schema.Type.INT16);
assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("BOOLEAN");
assertThat(afterSchema.field("b2").schema().parameters().get("__debezium.source.column.length")).isNull();
assertThat(after.get("b2")).isEqualTo((short) 0);
stopConnector();
}
@Test
@FixFor("DBZ-6226")
public void testRealDataTypeMapping() throws Exception {
final UniqueDatabase DATABASE = new UniqueDatabase("realit", "real_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
DATABASE.createAndInitialize();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("REAL_TEST") + "," + DATABASE.qualifiedTableName("REAL_TEST2"))
.with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*")
.with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink")
.with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName())
.with("jdbc-sink.selector.real", ".*REAL_TEST.r.*|.*REAL_TEST2.r.*")
.build();
start(MySqlConnector.class, config);
SourceRecords records = consumeRecordsByTopic(2 + 4 + 1);
assertThat(records).isNotNull();
List<SourceRecord> tableRecords = records.recordsForTopic(DATABASE.topicForTable("REAL_TEST"));
assertThat(tableRecords).hasSize(1);
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Schema afterSchema = record.valueSchema().field("after").schema();
// Assert how the BOOLEAN data type is mapped during the snapshot phase.
assertThat(afterSchema.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64);
assertThat(afterSchema.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("DOUBLE");
assertThat(after.get("r1")).isEqualTo(2.36d);
// Create the table after-the-fact
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection conn = db.connect()) {
conn.execute("CREATE TABLE REAL_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " +
"`r1` real default 3.14, " +
"primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;");
conn.execute("INSERT INTO REAL_TEST2 (r1) VALUES (9.78)");
}
}
records = consumeRecordsByTopic(2);
assertThat(records).isNotNull();
tableRecords = records.recordsForTopic(DATABASE.topicForTable("REAL_TEST2"));
assertThat(tableRecords).hasSize(1);
record = tableRecords.get(0);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
afterSchema = record.valueSchema().field("after").schema();
// Assert the created table after-the-fact record is identical to the snapshot
// During streaming the DDL that gets parsed provides the column type as REAL and this is what gets passed
// into the Column's relational model and gets propagated. Despite being REAL, it should still be sent as
// a FLOAT64 data type into Kafka.
assertThat(afterSchema.field("r1").schema().type()).isEqualTo(Schema.Type.FLOAT64);
assertThat(afterSchema.field("r1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("REAL");
assertThat(after.get("r1")).isEqualTo(9.78d);
stopConnector();
}
@Test
@FixFor("DBZ-6231")
public void testNationalizedCharacterDataTypeMappings() throws Exception {
final UniqueDatabase DATABASE = new UniqueDatabase("nctestit", "nationalized_character_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
DATABASE.createAndInitialize();
Testing.Files.delete(SCHEMA_HISTORY_PATH);
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("NC_TEST") + "," + DATABASE.qualifiedTableName("NC_TEST2"))
.with(MySqlConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*")
.with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "jdbc-sink")
.with("jdbc-sink.type", JdbcSinkDataTypesConverter.class.getName())
.with("jdbc-sink.selector.char", ".*NC_TEST.nc.*|.*NC_TEST2.nc.*")
.build();
start(MySqlConnector.class, config);
SourceRecords records = consumeRecordsByTopic(2 + 4 + 1);
assertThat(records).isNotNull();
List<SourceRecord> tableRecords = records.recordsForTopic(DATABASE.topicForTable("NC_TEST"));
assertThat(tableRecords).hasSize(1);
SourceRecord record = tableRecords.get(0);
Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
Schema afterSchema = record.valueSchema().field("after").schema();
System.out.println(afterSchema.field("nc1").schema().parameters());
System.out.println(afterSchema.field("nc2").schema().parameters());
System.out.println(afterSchema.field("nc3").schema().parameters());
// Assert how the BOOLEAN data type is mapped during the snapshot phase.
assertThat(afterSchema.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc1")).isEqualTo("a");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
assertThat(afterSchema.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc2")).isEqualTo("123");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("CHAR");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5");
assertThat(afterSchema.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc3")).isEqualTo("hello");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8mb3");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("VARCHAR");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25");
// Create the table after-the-fact
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection conn = db.connect()) {
conn.execute("CREATE TABLE NC_TEST2 (`id` INT NOT NULL AUTO_INCREMENT, " +
"`nc1` nchar, `nc2` nchar(5), `nc3` nvarchar(25), " +
"primary key (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;");
conn.execute("INSERT INTO NC_TEST2 (nc1,nc2,nc3) VALUES ('b', '456', 'world')");
}
}
records = consumeRecordsByTopic(2);
assertThat(records).isNotNull();
tableRecords = records.recordsForTopic(DATABASE.topicForTable("NC_TEST2"));
assertThat(tableRecords).hasSize(1);
record = tableRecords.get(0);
after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER);
afterSchema = record.valueSchema().field("after").schema();
// Assert the created table after-the-fact record is identical to the snapshot
// During streaming the DDL that gets parsed provides the column type as NCHAR or NVARCHAR as this gets pulled
// directly from the DDL and this is what gets passed into the Column's relational model and gets propagated.
// Using the converter, regardless of whether this is propagated or not, the character_set will be sent and the
// sink will be able to use this information to derived nationalized character types when sourcing from MySQL.
// Since these were always being mapped as Kafka STRING regardless, there is no issue with the schema types.
assertThat(afterSchema.field("nc1").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc1")).isEqualTo("b");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR");
assertThat(afterSchema.field("nc1").schema().parameters().get("__debezium.source.column.length")).isEqualTo("1");
assertThat(afterSchema.field("nc2").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc2")).isEqualTo("456");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NCHAR");
assertThat(afterSchema.field("nc2").schema().parameters().get("__debezium.source.column.length")).isEqualTo("5");
assertThat(afterSchema.field("nc3").schema().type()).isEqualTo(Schema.Type.STRING);
assertThat(after.get("nc3")).isEqualTo("world");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.character_set")).isEqualTo("utf8");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.type")).isEqualTo("NVARCHAR");
assertThat(afterSchema.field("nc3").schema().parameters().get("__debezium.source.column.length")).isEqualTo("25");
stopConnector();
}
}

View File

@ -0,0 +1,7 @@
CREATE TABLE `BOOLEAN_TEST` (
`id` INT NOT NULL AUTO_INCREMENT,
`b1` boolean default true,
`b2` boolean default false,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
INSERT INTO BOOLEAN_TEST(b1, b2) VALUE (false, true);

View File

@ -0,0 +1,8 @@
CREATE TABLE `NC_TEST` (
`id` INT NOT NULL AUTO_INCREMENT,
`nc1` nchar default null,
`nc2` nchar(5) default null,
`nc3` nvarchar(25) default null,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
INSERT INTO NC_TEST(nc1,nc2,nc3) VALUES ('a', '123', 'hello');

View File

@ -0,0 +1,6 @@
CREATE TABLE `REAL_TEST` (
`id` INT NOT NULL AUTO_INCREMENT,
`r1` real default 1.25,
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
INSERT INTO REAL_TEST(r1) VALUE (2.36);

View File

@ -114,6 +114,11 @@ public boolean hasDefaultValue() {
public Object defaultValue() {
return defaultValue;
}
@Override
public String charsetName() {
return column.charsetName();
}
}, (fieldSchema, converter) -> definition.set(new ConverterDefinition<>(fieldSchema, converter)));
if (definition.get() != null) {