DBZ-1814 Exporting Hex and Base64 representations as string

This commit is contained in:
Gunnar Morling 2020-06-03 14:41:44 +02:00 committed by Jiri Pechanec
parent 023064eedb
commit c54e377dd2
9 changed files with 164 additions and 87 deletions

View File

@ -120,7 +120,7 @@ else if (70 <= year && year <= 99) {
* @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode}
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used
* @param binaryMode TODO
* @param binaryMode how binary columns should be represented
*/
public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryMode) {
@ -140,7 +140,7 @@ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode tempo
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used
* @param adjuster a temporal adjuster to make a database specific time modification before conversion
* @param binaryMode TODO
* @param binaryMode how binary columns should be represented
*/
public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode,
TemporalAdjuster adjuster, BinaryHandlingMode binaryMode) {
@ -158,7 +158,7 @@ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode tempo
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode#PRECISE} is to be used
* @param adjuster a temporal adjuster to make a database specific time modification before conversion
* @param binaryMode TODO
* @param binaryMode how binary columns should be represented
*/
public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode, TemporalAdjuster adjuster,
BinaryHandlingMode binaryMode) {
@ -613,13 +613,13 @@ protected Object convertGeometry(Column column, Field fieldDefn, Object data) {
}
@Override
protected ByteBuffer convertByteArray(Column column, byte[] data) {
protected ByteBuffer toByteBuffer(Column column, byte[] data) {
// DBZ-254 right-pad fixed-length binary column values with 0x00 (zero byte)
if (column.jdbcType() == Types.BINARY && data.length < column.length()) {
data = Arrays.copyOf(data, column.length());
}
return super.convertByteArray(column, data);
return super.toByteBuffer(column, data);
}
/**

View File

@ -124,7 +124,7 @@ public void shouldReceiveHexBinary() throws SQLException, InterruptedException {
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
ByteBuffer expectedValue = ByteBuffer.wrap("010203".getBytes());
String expectedValue = "010203";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));
@ -164,7 +164,7 @@ public void shouldReceiveBase64Binary() throws SQLException, InterruptedExceptio
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
ByteBuffer expectedValue = ByteBuffer.wrap("AQID".getBytes());
String expectedValue = "AQID";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));

View File

@ -14,7 +14,6 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
@ -32,7 +31,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Decimal;
@ -119,9 +117,6 @@ public class PostgresValueConverter extends JdbcValueConverters {
.appendPattern("[XXX][XX][X]")
.toFormatter();
private static final Duration ONE_DAY = Duration.ofDays(1);
private static final long NANO_SECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
/**
* {@code true} if fields of data type not know should be handle as opaque binary;
* {@code false} if they should be omitted
@ -200,7 +195,7 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.NUMERIC:
return numericSchema(column);
case PgOid.BYTEA:
return SchemaBuilder.bytes();
return binaryMode.getSchema();
case PgOid.INT2_ARRAY:
return SchemaBuilder.array(SchemaBuilder.OPTIONAL_INT16_SCHEMA);
case PgOid.INT4_ARRAY:
@ -313,7 +308,7 @@ else if (resolvedType.isArrayType() && resolvedType.getElementType().isEnumType(
final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
if (jdbcSchemaBuilder == null) {
return includeUnknownDatatypes ? SchemaBuilder.bytes() : null;
return includeUnknownDatatypes ? binaryMode.getSchema() : null;
}
else {
return jdbcSchemaBuilder;

View File

@ -405,11 +405,11 @@ protected List<SchemaAndValueField> schemaAndValueForByteaBytes() {
}
protected List<SchemaAndValueField> schemaAndValueForByteaHex() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("010203".getBytes())));
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "010203"));
}
protected List<SchemaAndValueField> schemaAndValueForByteaBase64() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("AQID".getBytes())));
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID"));
}
protected List<SchemaAndValueField> schemasAndValuesForStringTypesWithSourceColumnTypeInfo() {

View File

@ -40,7 +40,6 @@
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
@ -929,6 +928,59 @@ public void shouldSnapshotTimeArrayTypesAsKnownTypes() throws Exception {
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.time_array_table", schemaAndValuesForTimeArrayTypes())));
}
@Test
@FixFor("DBZ-1814")
public void shouldGenerateSnapshotForByteaAsBytes() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(INSERT_BYTEA_BINMODE_STMT);
buildNoStreamProducer(TestHelper.defaultConfig());
TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBytes());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-1814")
public void shouldGenerateSnapshotForByteaAsBase64String() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(INSERT_BYTEA_BINMODE_STMT);
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64));
TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBase64());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-1814")
public void shouldGenerateSnapshotForByteaAsHexString() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(INSERT_BYTEA_BINMODE_STMT);
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.HEX));
TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaHex());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
private void buildNoStreamProducer(Configuration.Builder config) {
start(PostgresConnector.class, config
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)

View File

@ -12,6 +12,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -165,22 +166,24 @@ public enum BinaryHandlingMode implements EnumeratedValue {
/**
* Represent binary values as byte array
*/
BYTES("bytes"),
BYTES("bytes", () -> SchemaBuilder.bytes()),
/**
* Represent binary values as base64-encoded string
*/
BASE64("base64"),
BASE64("base64", () -> SchemaBuilder.string()),
/**
* Represents binary values as hex-encoded (base16) string
*/
HEX("hex");
HEX("hex", () -> SchemaBuilder.string());
private final String value;
private final Supplier<SchemaBuilder> schema;
BinaryHandlingMode(String value) {
BinaryHandlingMode(String value, Supplier<SchemaBuilder> schema) {
this.value = value;
this.schema = schema;
}
@Override
@ -188,6 +191,10 @@ public String getValue() {
return value;
}
public SchemaBuilder getSchema() {
return schema.get();
}
/**
* Determine if the supplied values is one of the predefined options
*

View File

@ -12,6 +12,8 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.SQLXML;
@ -122,7 +124,7 @@ public JdbcValueConverters() {
* adjustment is necessary
* @param bigIntUnsignedMode how {@code BIGINT UNSIGNED} values should be treated; may be null if
* {@link BigIntUnsignedMode#PRECISE} is to be used
* @param binaryMode TODO
* @param binaryMode how binary columns should be represented
*/
public JdbcValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset,
TemporalAdjuster adjuster, BigIntUnsignedMode bigIntUnsignedMode, BinaryHandlingMode binaryMode) {
@ -163,12 +165,12 @@ public SchemaBuilder schemaBuilder(Column column) {
// Fixed-length binary values ...
case Types.BLOB:
case Types.BINARY:
return SchemaBuilder.bytes();
return binaryMode.getSchema();
// Variable-length binary values ...
case Types.VARBINARY:
case Types.LONGVARBINARY:
return SchemaBuilder.bytes();
return binaryMode.getSchema();
// Numeric integers
case Types.TINYINT:
@ -700,36 +702,6 @@ protected Object convertDateToEpochDaysAsDate(Column column, Field fieldDefn, Ob
});
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinaryToBytes(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> {
Object dataMut = data;
if (dataMut instanceof char[]) {
dataMut = new String((char[]) dataMut); // convert to string
}
if (dataMut instanceof String) {
// This was encoded as a hexadecimal string, but we receive it as a normal string ...
dataMut = ((String) dataMut).getBytes();
}
if (dataMut instanceof byte[]) {
r.deliver(convertByteArray(column, (byte[]) dataMut));
}
else {
// An unexpected value
r.deliver(unexpectedBinary(dataMut, fieldDefn));
}
});
}
protected Object convertBinary(Column column, Field fieldDefn, Object data, BinaryHandlingMode mode) {
switch (mode) {
case BASE64:
@ -752,24 +724,50 @@ protected Object convertBinary(Column column, Field fieldDefn, Object data, Bina
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object data) {
protected Object convertBinaryToBytes(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> {
Object dataMut = data;
Encoder base64Encoder = Base64.getEncoder();
if (dataMut instanceof char[]) {
dataMut = base64Encoder.encode(String.valueOf((char[]) dataMut).getBytes(StandardCharsets.UTF_8));
if (data instanceof String) {
r.deliver(toByteBuffer(((String) data)));
}
if (dataMut instanceof String) {
// This was encoded as a hexadecimal string, but we receive it as a normal string ...
String normalString = (String) dataMut;
dataMut = base64Encoder.encode(normalString.getBytes(StandardCharsets.UTF_8));
else if (data instanceof char[]) {
r.deliver(toByteBuffer((char[]) data));
}
if (dataMut instanceof byte[]) {
r.deliver(convertByteArray(column, base64Encoder.encode((byte[]) dataMut)));
else if (data instanceof byte[]) {
r.deliver(toByteBuffer(column, (byte[]) data));
}
else {
// An unexpected value
r.deliver(unexpectedBinary(dataMut, fieldDefn));
r.deliver(unexpectedBinary(data, fieldDefn));
}
});
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "", (r) -> {
Encoder base64Encoder = Base64.getEncoder();
if (data instanceof String) {
r.deliver(new String(base64Encoder.encode(((String) data).getBytes(StandardCharsets.UTF_8))));
}
else if (data instanceof char[]) {
r.deliver(new String(base64Encoder.encode(toByteArray((char[]) data)), StandardCharsets.UTF_8));
}
else if (data instanceof byte[]) {
r.deliver(new String(base64Encoder.encode((byte[]) data), StandardCharsets.UTF_8));
}
else {
// An unexpected value
r.deliver(unexpectedBinary(data, fieldDefn));
}
});
}
@ -785,22 +783,20 @@ protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object da
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, BYTE_ZERO, (r) -> {
Object dataMut = data;
if (dataMut instanceof char[]) {
dataMut = HexConverter.convertToHexString(String.valueOf((char[]) dataMut).getBytes(StandardCharsets.UTF_8));
return convertValue(column, fieldDefn, data, "", (r) -> {
if (data instanceof String) {
r.deliver(HexConverter.convertToHexString(((String) data).getBytes(StandardCharsets.UTF_8)));
}
if (dataMut instanceof String) {
// This was encoded as a hexadecimal string, but we receive it as a normal string ...
String normalString = (String) dataMut;
dataMut = HexConverter.convertToHexString(normalString.getBytes(StandardCharsets.UTF_8));
else if (data instanceof char[]) {
r.deliver(HexConverter.convertToHexString(toByteArray((char[]) data)));
}
if (dataMut instanceof byte[]) {
r.deliver(convertByteArray(column, HexConverter.convertToHexString((byte[]) dataMut).getBytes()));
else if (data instanceof byte[]) {
r.deliver(HexConverter.convertToHexString((byte[]) data));
}
else {
// An unexpected value
r.deliver(unexpectedBinary(dataMut, fieldDefn));
r.deliver(unexpectedBinary(data, fieldDefn));
}
});
}
@ -810,7 +806,7 @@ protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data)
* can perform value adjustments based on the column definition, e.g. right-pad with 0x00 bytes in case of
* fixed length BINARY in MySQL.
*/
protected ByteBuffer convertByteArray(Column column, byte[] data) {
protected ByteBuffer toByteBuffer(Column column, byte[] data) {
// Kafka Connect would support raw byte arrays, too, but byte buffers are recommended
return ByteBuffer.wrap(data);
}
@ -1321,4 +1317,19 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec
private boolean supportsLargeTimeValues() {
return adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode;
}
private byte[] toByteArray(char[] chars) {
CharBuffer charBuffer = CharBuffer.wrap(chars);
ByteBuffer byteBuffer = Charset.forName("UTF-8").encode(charBuffer);
return byteBuffer.array();
}
private ByteBuffer toByteBuffer(char[] chars) {
CharBuffer charBuffer = CharBuffer.wrap(chars);
return Charset.forName("UTF-8").encode(charBuffer);
}
private ByteBuffer toByteBuffer(String string) {
return ByteBuffer.wrap(string.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -1113,15 +1113,15 @@ The _semantic type_ describes how the Kafka Connect schema captures the _meaning
|The approximate number of microseconds for a time interval using the `365.25 / 12.0` formula for days per month average
|`INTERVAL [P]`
|`String`
|`STRING`
|`io.debezium.time.Interval` +
(when `interval.handling.mode` is set to `string`)
|The string representation of the interval value that follows pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, e.g. `P1Y2M3DT4H5M6.78S`
|`BYTEA`
|`BYTES`
|`BYTES` or `STRING`
|n/a
|
|Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-postgresql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`JSON`, `JSONB`
|`STRING`

View File

@ -75,41 +75,53 @@ a| _n/a_
a| _n/a_
|`BINARY(M)]`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`VARBINARY(M)]`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`TINYBLOB`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`TINYTEXT`
|`STRING`
a| _n/a_
|`BLOB`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`TEXT`
|`STRING`
a| _n/a_
|`MEDIUMBLOB`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`MEDIUMTEXT`
|`STRING`
a| _n/a_
|`LONGBLOB`
|`BYTES`
|`BYTES` or `STRING`
a| _n/a_
Either the raw bytes (the default), a base64-encoded String, or a hex-encoded String, based on the {link-prefix}:{link-mysql-connector}#connector-property-binary-handling-mode[binary handling mode] setting
|`LONGTEXT`
|`STRING`
a| _n/a_