DBZ-5544 Support BASE64_URL_SAFE in BinaryHandlingMode

This commit is contained in:
pkgonan 2022-08-23 14:46:01 +09:00 committed by Jiri Pechanec
parent 2e6b877bd8
commit b031770300
11 changed files with 217 additions and 0 deletions

View File

@ -175,4 +175,44 @@ public void shouldReceiveBase64Binary() throws SQLException, InterruptedExceptio
// Check that all records are valid, can be serialized and deserialized ...
sourceRecords.forEach(this::validate);
}
@Test
@FixFor("DBZ-5544")
public void shouldReceiveBase64UrlSafeBinary() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with(MySqlConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int createDatabaseCount = 1;
int createTableCount = 1;
int insertCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount);
stopConnector();
assertThat(sourceRecords).isNotNull();
List<SourceRecord> topicSourceRecords = sourceRecords.recordsForTopic(DATABASE.topicForTable("dbz_1814_binary_mode_test"));
assertThat(topicSourceRecords).hasSize(1);
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
String expectedValue = "AQID";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("longblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("binary_col"));
assertEquals(expectedValue, kafkaDataStructure.get("varbinary_col"));
// Check that all records are valid, can be serialized and deserialized ...
sourceRecords.forEach(this::validate);
}
}

View File

@ -198,6 +198,54 @@ public void base64Mode() throws SQLException, InterruptedException {
records.forEach(this::validate);
}
@Test
@FixFor("DBZ-5544")
public void base64UrlSafeMode() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with(MySqlConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int numCreateDatabase = 1;
int numCreateTables = 1;
int numInserts = 4;
SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts);
stopConnector();
assertThat(records).isNotNull();
List<SourceRecord> dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_254_binary_column_test"));
assertThat(dmls).hasSize(4);
// source value has a trailing "00" which is not distinguishable from a value that needs padding
SourceRecord insert = dmls.get(0);
Struct after = (Struct) ((Struct) insert.value()).get("after");
assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0AA==");
insert = dmls.get(1);
after = (Struct) ((Struct) insert.value()).get("after");
assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0qw==");
// the value which isn't using the full length of the BINARY column is right-padded with 0x00 (zero bytes) - converted to AA in Base64
insert = dmls.get(2);
after = (Struct) ((Struct) insert.value()).get("after");
assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0AA==");
// the value which isn't using the full length of the BINARY column is right-padded with 0x00 (zero bytes)
insert = dmls.get(3);
after = (Struct) ((Struct) insert.value()).get("after");
assertThat(after.get("file_uuid")).isEqualTo("AAAAAAAAAAAAAAAAAAAAAA==");
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
}
private String encodeToBase64String(ByteBuffer bytes) {
return Base64.getEncoder().encodeToString(bytes.array());
}

View File

@ -79,6 +79,14 @@ public void shouldReceiveBase64Binary() throws InterruptedException {
assertEquals(expectedValue, data.get("BLOB_COL"));
}
@Test
public void shouldReceiveBase64UrlSafeBinary() throws InterruptedException {
Struct data = consume(BinaryHandlingMode.BASE64_URL_SAFE);
String expectedValue = "AQID";
assertEquals(expectedValue, data.get("BLOB_COL"));
}
private Struct consume(BinaryHandlingMode binaryMode) throws InterruptedException {
final Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL)

View File

@ -50,6 +50,7 @@ public class LogicalDecodingMessageMonitor {
private final String topicName;
private final BinaryHandlingMode binaryMode;
private final Encoder base64Encoder;
private final Encoder base64UrlSafeEncoder;
/**
* The key schema; a struct like this:
@ -70,6 +71,7 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl
this.topicName = connectorConfig.getLogicalName() + LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX;
this.binaryMode = connectorConfig.binaryHandlingMode();
this.base64Encoder = Base64.getEncoder();
this.base64UrlSafeEncoder = Base64.getUrlEncoder();
this.keySchema = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageKey"))
@ -123,6 +125,8 @@ private Object convertContent(byte[] content) {
switch (binaryMode) {
case BASE64:
return new String(base64Encoder.encode(content), StandardCharsets.UTF_8);
case BASE64_URL_SAFE:
return new String(base64UrlSafeEncoder.encode(content), StandardCharsets.UTF_8);
case HEX:
return HexConverter.convertToHexString(content);
case BYTES:

View File

@ -1100,6 +1100,11 @@ protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object da
return super.convertBinaryToBase64(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
}
@Override
protected Object convertBinaryToBase64UrlSafe(Column column, Field fieldDefn, Object data) {
return super.convertBinaryToBase64UrlSafe(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
}
@Override
protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) {
return super.convertBinaryToHex(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);

View File

@ -420,6 +420,10 @@ protected List<SchemaAndValueField> schemaAndValueForByteaBase64() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID"));
}
protected List<SchemaAndValueField> schemaAndValueForByteaBase64UrlSafe() {
return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID"));
}
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnBytes() {
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("<(10.0,20.0),10.0>".getBytes(StandardCharsets.UTF_8))));
}
@ -428,6 +432,10 @@ protected List<SchemaAndValueField> schemaAndValueForUnknownColumnBase64() {
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "PCgxMC4wLDIwLjApLDEwLjA+"));
}
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnBase64UrlSafe() {
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "PCgxMC4wLDIwLjApLDEwLjA-"));
}
protected List<SchemaAndValueField> schemaAndValueForUnknownColumnHex() {
return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "3c2831302e302c32302e30292c31302e303e"));
}

View File

@ -973,6 +973,24 @@ public void shouldGenerateSnapshotForByteaAsBase64String() throws Exception {
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-5544")
public void shouldGenerateSnapshotForByteaAsBase64UrlSafeString() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(INSERT_BYTEA_BINMODE_STMT);
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE));
TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBase64UrlSafe());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-1814")
public void shouldGenerateSnapshotForByteaAsHexString() throws Exception {
@ -1028,6 +1046,25 @@ public void shouldGenerateSnapshotForUnknownColumnAsBase64() throws Exception {
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-5544")
public void shouldGenerateSnapshotForUnknownColumnAsBase64UrlSafe() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.execute(INSERT_CIRCLE_STMT);
buildNoStreamProducer(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE));
TestConsumer consumer = testConsumer(1, "public");
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBase64UrlSafe());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
@Test
@FixFor("DBZ-1814")
public void shouldGenerateSnapshotForUnknownColumnAsHex() throws Exception {

View File

@ -1242,6 +1242,16 @@ public void shouldReceiveByteaBase64String() throws Exception {
assertInsert(INSERT_BYTEA_BINMODE_STMT, 1, schemaAndValueForByteaBase64());
}
@Test
@FixFor("DBZ-5544")
public void shouldReceiveByteaBase64UrlSafeString() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
startConnector(config -> config.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE));
assertInsert(INSERT_BYTEA_BINMODE_STMT, 1, schemaAndValueForByteaBase64UrlSafe());
}
@Test
@FixFor("DBZ-1814")
public void shouldReceiveByteaHexString() throws Exception {
@ -1273,6 +1283,17 @@ public void shouldReceiveUnknownTypeAsBase64() throws Exception {
assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBase64());
}
@Test
@FixFor("DBZ-5544")
public void shouldReceiveUnknownTypeAsBase64UrlSafe() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
startConnector(config -> config.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE));
assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBase64UrlSafe());
}
@Test
@FixFor("DBZ-1814")
public void shouldReceiveUnknownTypeAsHex() throws Exception {

View File

@ -78,6 +78,15 @@ public void shouldReceiveBase64Binary() throws InterruptedException {
assertEquals(expectedValue, data.get("varbinary_col"));
}
@Test
public void shouldReceiveBase64UrlSafeBinary() throws InterruptedException {
Struct data = consume(BinaryHandlingMode.BASE64_URL_SAFE);
String expectedValue = "AQID";
assertEquals(expectedValue, data.get("binary_col"));
assertEquals(expectedValue, data.get("varbinary_col"));
}
private Struct consume(BinaryHandlingMode binaryMode) throws InterruptedException {
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)

View File

@ -186,6 +186,11 @@ public enum BinaryHandlingMode implements EnumeratedValue {
*/
BASE64("base64", SchemaBuilder::string),
/**
* Represent binary values as base64-url-safe-encoded string
*/
BASE64_URL_SAFE("base64-url-safe", SchemaBuilder::string),
/**
* Represents binary values as hex-encoded (base16) string
*/

View File

@ -706,6 +706,8 @@ protected Object convertBinary(Column column, Field fieldDefn, Object data, Bina
switch (mode) {
case BASE64:
return convertBinaryToBase64(column, fieldDefn, data);
case BASE64_URL_SAFE:
return convertBinaryToBase64UrlSafe(column, fieldDefn, data);
case HEX:
return convertBinaryToHex(column, fieldDefn, data);
case BYTES:
@ -772,6 +774,36 @@ else if (data instanceof byte[]) {
});
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.
*
* @param column the column definition describing the {@code data} value; never null
* @param fieldDefn the field definition; never null
* @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null
* @return the converted value, or null if the conversion could not be made and the column allows nulls
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
*/
protected Object convertBinaryToBase64UrlSafe(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, "", (r) -> {
Encoder base64UrlSafeEncoder = Base64.getUrlEncoder();
if (data instanceof String) {
r.deliver(new String(base64UrlSafeEncoder.encode(((String) data).getBytes(StandardCharsets.UTF_8))));
}
else if (data instanceof char[]) {
r.deliver(new String(base64UrlSafeEncoder.encode(toByteArray((char[]) data)), StandardCharsets.UTF_8));
}
else if (data instanceof byte[]) {
r.deliver(new String(base64UrlSafeEncoder.encode(normalizeBinaryData(column, (byte[]) data)), StandardCharsets.UTF_8));
}
else {
// An unexpected value
r.deliver(unexpectedBinary(data, fieldDefn));
}
});
}
/**
* Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY},
* {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.