From c54e377dd23234c53125d8ab668dab9cde247276 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Wed, 3 Jun 2020 14:41:44 +0200 Subject: [PATCH] DBZ-1814 Exporting Hex and Base64 representations as string --- .../connector/mysql/MySqlValueConverters.java | 10 +- .../connector/mysql/MySqlBinaryModeIT.java | 4 +- .../postgresql/PostgresValueConverter.java | 9 +- .../AbstractRecordsProducerTest.java | 4 +- .../postgresql/RecordsSnapshotProducerIT.java | 54 +++++++- .../config/CommonConnectorConfig.java | 15 ++- .../io/debezium/jdbc/JdbcValueConverters.java | 125 ++++++++++-------- .../ROOT/pages/connectors/postgresql.adoc | 6 +- ...w-the-mysql-connector-maps-data-types.adoc | 24 +++- 9 files changed, 164 insertions(+), 87 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 7d98c35c1..e5d6e1e02 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -120,7 +120,7 @@ else if (70 <= year && year <= 99) { * @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode} * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if * {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used - * @param binaryMode TODO + * @param binaryMode how binary columns should be represented */ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode, BinaryHandlingMode binaryMode) { @@ -140,7 +140,7 @@ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode tempo * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if * {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used * @param adjuster a temporal adjuster to make a database specific time modification before conversion - * @param binaryMode TODO + * @param binaryMode how binary columns should be represented */ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, TemporalAdjuster adjuster, BinaryHandlingMode binaryMode) { @@ -158,7 +158,7 @@ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode tempo * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if * {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used * @param adjuster a temporal adjuster to make a database specific time modification before conversion - * @param binaryMode TODO + * @param binaryMode how binary columns should be represented */ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode, TemporalAdjuster adjuster, BinaryHandlingMode binaryMode) { @@ -613,13 +613,13 @@ protected Object convertGeometry(Column column, Field fieldDefn, Object data) { } @Override - protected ByteBuffer convertByteArray(Column column, byte[] data) { + protected ByteBuffer toByteBuffer(Column column, byte[] data) { // DBZ-254 right-pad fixed-length binary column values with 0x00 (zero byte) if (column.jdbcType() == Types.BINARY && data.length < column.length()) { data = Arrays.copyOf(data, column.length()); } - return super.convertByteArray(column, data); + return super.toByteBuffer(column, data); } /** diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java index da969f978..eaa80582d 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java @@ -124,7 +124,7 @@ public void shouldReceiveHexBinary() throws SQLException, InterruptedException { SourceRecord topicSourceRecord = topicSourceRecords.get(0); Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after"); - ByteBuffer expectedValue = ByteBuffer.wrap("010203".getBytes()); + String expectedValue = "010203"; assertEquals(expectedValue, kafkaDataStructure.get("blob_col")); assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col")); assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col")); @@ -164,7 +164,7 @@ public void shouldReceiveBase64Binary() throws SQLException, InterruptedExceptio SourceRecord topicSourceRecord = topicSourceRecords.get(0); Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after"); - ByteBuffer expectedValue = ByteBuffer.wrap("AQID".getBytes()); + String expectedValue = "AQID"; assertEquals(expectedValue, kafkaDataStructure.get("blob_col")); assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col")); assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col")); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 284c2da29..f4fca473c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -14,7 +14,6 @@ import java.nio.charset.Charset; import java.sql.SQLException; import java.sql.Timestamp; -import java.time.Duration; import java.time.LocalDate; import java.time.OffsetDateTime; import java.time.OffsetTime; @@ -32,7 +31,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.connect.data.Decimal; @@ -119,9 +117,6 @@ public class PostgresValueConverter extends JdbcValueConverters { .appendPattern("[XXX][XX][X]") .toFormatter(); - private static final Duration ONE_DAY = Duration.ofDays(1); - private static final long NANO_SECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1); - /** * {@code true} if fields of data type not know should be handle as opaque binary; * {@code false} if they should be omitted @@ -200,7 +195,7 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.NUMERIC: return numericSchema(column); case PgOid.BYTEA: - return SchemaBuilder.bytes(); + return binaryMode.getSchema(); case PgOid.INT2_ARRAY: return SchemaBuilder.array(SchemaBuilder.OPTIONAL_INT16_SCHEMA); case PgOid.INT4_ARRAY: @@ -313,7 +308,7 @@ else if (resolvedType.isArrayType() && resolvedType.getElementType().isEnumType( final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column); if (jdbcSchemaBuilder == null) { - return includeUnknownDatatypes ? SchemaBuilder.bytes() : null; + return includeUnknownDatatypes ? binaryMode.getSchema() : null; } else { return jdbcSchemaBuilder; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 1e91d5470..084344954 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -405,11 +405,11 @@ protected List schemaAndValueForByteaBytes() { } protected List schemaAndValueForByteaHex() { - return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("010203".getBytes()))); + return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "010203")); } protected List schemaAndValueForByteaBase64() { - return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("AQID".getBytes()))); + return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID")); } protected List schemasAndValuesForStringTypesWithSourceColumnTypeInfo() { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index bd88c0207..2b426b328 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -40,7 +40,6 @@ import io.debezium.data.Bits; import io.debezium.data.Enum; import io.debezium.data.Envelope; -import io.debezium.data.SchemaAndValueField; import io.debezium.data.VerifyRecord; import io.debezium.doc.FixFor; import io.debezium.heartbeat.Heartbeat; @@ -929,6 +928,59 @@ public void shouldSnapshotTimeArrayTypesAsKnownTypes() throws Exception { consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.time_array_table", schemaAndValuesForTimeArrayTypes()))); } + @Test + @FixFor("DBZ-1814") + public void shouldGenerateSnapshotForByteaAsBytes() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + TestHelper.execute(INSERT_BYTEA_BINMODE_STMT); + + buildNoStreamProducer(TestHelper.defaultConfig()); + + TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final Map> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBytes()); + + consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); + } + + @Test + @FixFor("DBZ-1814") + public void shouldGenerateSnapshotForByteaAsBase64String() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + TestHelper.execute(INSERT_BYTEA_BINMODE_STMT); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64)); + + TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final Map> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBase64()); + + consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); + } + + @Test + @FixFor("DBZ-1814") + public void shouldGenerateSnapshotForByteaAsHexString() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + TestHelper.execute(INSERT_BYTEA_BINMODE_STMT); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.HEX)); + + TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final Map> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaHex()); + + consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); + } + private void buildNoStreamProducer(Configuration.Builder config) { start(PostgresConnector.class, config .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 3112fc09e..cf4ee0840 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -165,22 +166,24 @@ public enum BinaryHandlingMode implements EnumeratedValue { /** * Represent binary values as byte array */ - BYTES("bytes"), + BYTES("bytes", () -> SchemaBuilder.bytes()), /** * Represent binary values as base64-encoded string */ - BASE64("base64"), + BASE64("base64", () -> SchemaBuilder.string()), /** * Represents binary values as hex-encoded (base16) string */ - HEX("hex"); + HEX("hex", () -> SchemaBuilder.string()); private final String value; + private final Supplier schema; - BinaryHandlingMode(String value) { + BinaryHandlingMode(String value, Supplier schema) { this.value = value; + this.schema = schema; } @Override @@ -188,6 +191,10 @@ public String getValue() { return value; } + public SchemaBuilder getSchema() { + return schema.get(); + } + /** * Determine if the supplied values is one of the predefined options * diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java index ff447e869..a170db781 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java @@ -12,6 +12,8 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.SQLXML; @@ -122,7 +124,7 @@ public JdbcValueConverters() { * adjustment is necessary * @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if * {@link BigIntUnsignedMode#PRECISE} is to be used - * @param binaryMode TODO + * @param binaryMode how binary columns should be represented */ public JdbcValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, TemporalAdjuster adjuster, BigIntUnsignedMode bigIntUnsignedMode, BinaryHandlingMode binaryMode) { @@ -163,12 +165,12 @@ public SchemaBuilder schemaBuilder(Column column) { // Fixed-length binary values ... case Types.BLOB: case Types.BINARY: - return SchemaBuilder.bytes(); + return binaryMode.getSchema(); // Variable-length binary values ... case Types.VARBINARY: case Types.LONGVARBINARY: - return SchemaBuilder.bytes(); + return binaryMode.getSchema(); // Numeric integers case Types.TINYINT: @@ -700,36 +702,6 @@ protected Object convertDateToEpochDaysAsDate(Column column, Field fieldDefn, Ob }); } - /** - * Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY}, - * {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}. - * - * @param column the column definition describing the {@code data} value; never null - * @param fieldDefn the field definition; never null - * @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null - * @return the converted value, or null if the conversion could not be made and the column allows nulls - * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls - */ - protected Object convertBinaryToBytes(Column column, Field fieldDefn, Object data) { - return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> { - Object dataMut = data; - if (dataMut instanceof char[]) { - dataMut = new String((char[]) dataMut); // convert to string - } - if (dataMut instanceof String) { - // This was encoded as a hexadecimal string, but we receive it as a normal string ... - dataMut = ((String) dataMut).getBytes(); - } - if (dataMut instanceof byte[]) { - r.deliver(convertByteArray(column, (byte[]) dataMut)); - } - else { - // An unexpected value - r.deliver(unexpectedBinary(dataMut, fieldDefn)); - } - }); - } - protected Object convertBinary(Column column, Field fieldDefn, Object data, BinaryHandlingMode mode) { switch (mode) { case BASE64: @@ -752,24 +724,50 @@ protected Object convertBinary(Column column, Field fieldDefn, Object data, Bina * @return the converted value, or null if the conversion could not be made and the column allows nulls * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls */ - protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object data) { + protected Object convertBinaryToBytes(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> { - Object dataMut = data; - Encoder base64Encoder = Base64.getEncoder(); - if (dataMut instanceof char[]) { - dataMut = base64Encoder.encode(String.valueOf((char[]) dataMut).getBytes(StandardCharsets.UTF_8)); + if (data instanceof String) { + r.deliver(toByteBuffer(((String) data))); } - if (dataMut instanceof String) { - // This was encoded as a hexadecimal string, but we receive it as a normal string ... - String normalString = (String) dataMut; - dataMut = base64Encoder.encode(normalString.getBytes(StandardCharsets.UTF_8)); + else if (data instanceof char[]) { + r.deliver(toByteBuffer((char[]) data)); } - if (dataMut instanceof byte[]) { - r.deliver(convertByteArray(column, base64Encoder.encode((byte[]) dataMut))); + else if (data instanceof byte[]) { + r.deliver(toByteBuffer(column, (byte[]) data)); } else { // An unexpected value - r.deliver(unexpectedBinary(dataMut, fieldDefn)); + r.deliver(unexpectedBinary(data, fieldDefn)); + } + }); + } + + /** + * Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY}, + * {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}. + * + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null + * @return the converted value, or null if the conversion could not be made and the column allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls + */ + protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object data) { + return convertValue(column, fieldDefn, data, "", (r) -> { + Encoder base64Encoder = Base64.getEncoder(); + + if (data instanceof String) { + r.deliver(new String(base64Encoder.encode(((String) data).getBytes(StandardCharsets.UTF_8)))); + } + else if (data instanceof char[]) { + r.deliver(new String(base64Encoder.encode(toByteArray((char[]) data)), StandardCharsets.UTF_8)); + } + else if (data instanceof byte[]) { + r.deliver(new String(base64Encoder.encode((byte[]) data), StandardCharsets.UTF_8)); + } + else { + // An unexpected value + r.deliver(unexpectedBinary(data, fieldDefn)); } }); } @@ -785,22 +783,20 @@ protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object da * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls */ protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) { - return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> { - Object dataMut = data; - if (dataMut instanceof char[]) { - dataMut = HexConverter.convertToHexString(String.valueOf((char[]) dataMut).getBytes(StandardCharsets.UTF_8)); + return convertValue(column, fieldDefn, data, "", (r) -> { + + if (data instanceof String) { + r.deliver(HexConverter.convertToHexString(((String) data).getBytes(StandardCharsets.UTF_8))); } - if (dataMut instanceof String) { - // This was encoded as a hexadecimal string, but we receive it as a normal string ... - String normalString = (String) dataMut; - dataMut = HexConverter.convertToHexString(normalString.getBytes(StandardCharsets.UTF_8)); + else if (data instanceof char[]) { + r.deliver(HexConverter.convertToHexString(toByteArray((char[]) data))); } - if (dataMut instanceof byte[]) { - r.deliver(convertByteArray(column, HexConverter.convertToHexString((byte[]) dataMut).getBytes())); + else if (data instanceof byte[]) { + r.deliver(HexConverter.convertToHexString((byte[]) data)); } else { // An unexpected value - r.deliver(unexpectedBinary(dataMut, fieldDefn)); + r.deliver(unexpectedBinary(data, fieldDefn)); } }); } @@ -810,7 +806,7 @@ protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) * can perform value adjustments based on the column definition, e.g. right-pad with 0x00 bytes in case of * fixed length BINARY in MySQL. */ - protected ByteBuffer convertByteArray(Column column, byte[] data) { + protected ByteBuffer toByteBuffer(Column column, byte[] data) { // Kafka Connect would support raw byte arrays, too, but byte buffers are recommended return ByteBuffer.wrap(data); } @@ -1321,4 +1317,19 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec private boolean supportsLargeTimeValues() { return adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode; } + + private byte[] toByteArray(char[] chars) { + CharBuffer charBuffer = CharBuffer.wrap(chars); + ByteBuffer byteBuffer = Charset.forName("UTF-8").encode(charBuffer); + return byteBuffer.array(); + } + + private ByteBuffer toByteBuffer(char[] chars) { + CharBuffer charBuffer = CharBuffer.wrap(chars); + return Charset.forName("UTF-8").encode(charBuffer); + } + + private ByteBuffer toByteBuffer(String string) { + return ByteBuffer.wrap(string.getBytes(StandardCharsets.UTF_8)); + } } diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index ce1f0eec5..9b71bc58c 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -1113,15 +1113,15 @@ The _semantic type_ describes how the Kafka Connect schema captures the _meaning |The approximate number of microseconds for a time interval using the `365.25 / 12.0` formula for days per month average |`INTERVAL [P]` -|`String` +|`STRING` |`io.debezium.time.Interval` + (when `interval.handling.mode` is set to `string`) |The string representation of the interval value that follows pattern `PYMDTHMS`, e.g. `P1Y2M3DT4H5M6.78S` |`BYTEA` -|`BYTES` +|`BYTES` or `STRING` |n/a -| +|Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-postgresql-connector}#connector-property-binary-handling-mode[binary handling mode] setting |`JSON`, `JSONB` |`STRING` diff --git a/documentation/modules/ROOT/partials/modules/cdc-mysql-connector/c_how-the-mysql-connector-maps-data-types.adoc b/documentation/modules/ROOT/partials/modules/cdc-mysql-connector/c_how-the-mysql-connector-maps-data-types.adoc index ceb5d8e47..520164e5f 100644 --- a/documentation/modules/ROOT/partials/modules/cdc-mysql-connector/c_how-the-mysql-connector-maps-data-types.adoc +++ b/documentation/modules/ROOT/partials/modules/cdc-mysql-connector/c_how-the-mysql-connector-maps-data-types.adoc @@ -75,41 +75,53 @@ a| _n/a_ a| _n/a_ |`BINARY(M)]` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`VARBINARY(M)]` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`TINYBLOB` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`TINYTEXT` |`STRING` a| _n/a_ |`BLOB` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`TEXT` |`STRING` a| _n/a_ |`MEDIUMBLOB` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`MEDIUMTEXT` |`STRING` a| _n/a_ |`LONGBLOB` -|`BYTES` +|`BYTES` or `STRING` a| _n/a_ +Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting + |`LONGTEXT` |`STRING` a| _n/a_