From b031770300ec260ffc6bb89a6c45b33e1162e6a2 Mon Sep 17 00:00:00 2001 From: pkgonan Date: Tue, 23 Aug 2022 14:46:01 +0900 Subject: [PATCH] DBZ-5544 Support BASE64_URL_SAFE in BinaryHandlingMode --- .../connector/mysql/MySqlBinaryModeIT.java | 40 ++++++++++++++++ .../mysql/MySqlFixedLengthBinaryColumnIT.java | 48 +++++++++++++++++++ .../connector/oracle/OracleBinaryModeIT.java | 8 ++++ .../LogicalDecodingMessageMonitor.java | 4 ++ .../postgresql/PostgresValueConverter.java | 5 ++ .../AbstractRecordsProducerTest.java | 8 ++++ .../postgresql/RecordsSnapshotProducerIT.java | 37 ++++++++++++++ .../postgresql/RecordsStreamProducerIT.java | 21 ++++++++ .../sqlserver/SqlServerBinaryModeIT.java | 9 ++++ .../config/CommonConnectorConfig.java | 5 ++ .../io/debezium/jdbc/JdbcValueConverters.java | 32 +++++++++++++ 11 files changed, 217 insertions(+) diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java index c2a3706f7..cb813fe79 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlBinaryModeIT.java @@ -175,4 +175,44 @@ public void shouldReceiveBase64Binary() throws SQLException, InterruptedExceptio // Check that all records are valid, can be serialized and deserialized ... sourceRecords.forEach(this::validate); } + + @Test + @FixFor("DBZ-5544") + public void shouldReceiveBase64UrlSafeBinary() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER) + .with(MySqlConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE) + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + int createDatabaseCount = 1; + int createTableCount = 1; + int insertCount = 1; + SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount); + stopConnector(); + assertThat(sourceRecords).isNotNull(); + + List topicSourceRecords = sourceRecords.recordsForTopic(DATABASE.topicForTable("dbz_1814_binary_mode_test")); + assertThat(topicSourceRecords).hasSize(1); + + SourceRecord topicSourceRecord = topicSourceRecords.get(0); + Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after"); + String expectedValue = "AQID"; + assertEquals(expectedValue, kafkaDataStructure.get("blob_col")); + assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col")); + assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col")); + assertEquals(expectedValue, kafkaDataStructure.get("longblob_col")); + assertEquals(expectedValue, kafkaDataStructure.get("binary_col")); + assertEquals(expectedValue, kafkaDataStructure.get("varbinary_col")); + + // Check that all records are valid, can be serialized and deserialized ... + sourceRecords.forEach(this::validate); + } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlFixedLengthBinaryColumnIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlFixedLengthBinaryColumnIT.java index 7662e37a0..15dc345be 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlFixedLengthBinaryColumnIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlFixedLengthBinaryColumnIT.java @@ -198,6 +198,54 @@ public void base64Mode() throws SQLException, InterruptedException { records.forEach(this::validate); } + @Test + @FixFor("DBZ-5544") + public void base64UrlSafeMode() throws SQLException, InterruptedException { + // Use the DB configuration to define the connector's configuration ... + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER) + .with(MySqlConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE) + .build(); + + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + int numCreateDatabase = 1; + int numCreateTables = 1; + int numInserts = 4; + SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numInserts); + stopConnector(); + assertThat(records).isNotNull(); + List dmls = records.recordsForTopic(DATABASE.topicForTable("dbz_254_binary_column_test")); + assertThat(dmls).hasSize(4); + + // source value has a trailing "00" which is not distinguishable from a value that needs padding + SourceRecord insert = dmls.get(0); + Struct after = (Struct) ((Struct) insert.value()).get("after"); + assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0AA=="); + + insert = dmls.get(1); + after = (Struct) ((Struct) insert.value()).get("after"); + assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0qw=="); + + // the value which isn't using the full length of the BINARY column is right-padded with 0x00 (zero bytes) - converted to AA in Base64 + insert = dmls.get(2); + after = (Struct) ((Struct) insert.value()).get("after"); + assertThat(after.get("file_uuid")).isEqualTo("ZRrtCDkPSJOy8TaSPnt0AA=="); + + // the value which isn't using the full length of the BINARY column is right-padded with 0x00 (zero bytes) + insert = dmls.get(3); + after = (Struct) ((Struct) insert.value()).get("after"); + assertThat(after.get("file_uuid")).isEqualTo("AAAAAAAAAAAAAAAAAAAAAA=="); + + // Check that all records are valid, can be serialized and deserialized ... + records.forEach(this::validate); + } + private String encodeToBase64String(ByteBuffer bytes) { return Base64.getEncoder().encodeToString(bytes.array()); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBinaryModeIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBinaryModeIT.java index f5737f6c3..464792911 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBinaryModeIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleBinaryModeIT.java @@ -79,6 +79,14 @@ public void shouldReceiveBase64Binary() throws InterruptedException { assertEquals(expectedValue, data.get("BLOB_COL")); } + @Test + public void shouldReceiveBase64UrlSafeBinary() throws InterruptedException { + Struct data = consume(BinaryHandlingMode.BASE64_URL_SAFE); + + String expectedValue = "AQID"; + assertEquals(expectedValue, data.get("BLOB_COL")); + } + private Struct consume(BinaryHandlingMode binaryMode) throws InterruptedException { final Configuration config = TestHelper.defaultConfig() .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java index 3e16428ec..27f5e9f8f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.java @@ -50,6 +50,7 @@ public class LogicalDecodingMessageMonitor { private final String topicName; private final BinaryHandlingMode binaryMode; private final Encoder base64Encoder; + private final Encoder base64UrlSafeEncoder; /** * The key schema; a struct like this: @@ -70,6 +71,7 @@ public LogicalDecodingMessageMonitor(PostgresConnectorConfig connectorConfig, Bl this.topicName = connectorConfig.getLogicalName() + LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX; this.binaryMode = connectorConfig.binaryHandlingMode(); this.base64Encoder = Base64.getEncoder(); + this.base64UrlSafeEncoder = Base64.getUrlEncoder(); this.keySchema = SchemaBuilder.struct() .name(schemaNameAdjuster.adjust("io.debezium.connector.postgresql.MessageKey")) @@ -123,6 +125,8 @@ private Object convertContent(byte[] content) { switch (binaryMode) { case BASE64: return new String(base64Encoder.encode(content), StandardCharsets.UTF_8); + case BASE64_URL_SAFE: + return new String(base64UrlSafeEncoder.encode(content), StandardCharsets.UTF_8); case HEX: return HexConverter.convertToHexString(content); case BYTES: diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 4c66c2a66..599c31173 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -1100,6 +1100,11 @@ protected Object convertBinaryToBase64(Column column, Field fieldDefn, Object da return super.convertBinaryToBase64(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data); } + @Override + protected Object convertBinaryToBase64UrlSafe(Column column, Field fieldDefn, Object data) { + return super.convertBinaryToBase64UrlSafe(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data); + } + @Override protected Object convertBinaryToHex(Column column, Field fieldDefn, Object data) { return super.convertBinaryToHex(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index e673c25d8..ce8c01881 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -420,6 +420,10 @@ protected List schemaAndValueForByteaBase64() { return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID")); } + protected List schemaAndValueForByteaBase64UrlSafe() { + return Arrays.asList(new SchemaAndValueField("ba", Schema.OPTIONAL_STRING_SCHEMA, "AQID")); + } + protected List schemaAndValueForUnknownColumnBytes() { return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("<(10.0,20.0),10.0>".getBytes(StandardCharsets.UTF_8)))); } @@ -428,6 +432,10 @@ protected List schemaAndValueForUnknownColumnBase64() { return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "PCgxMC4wLDIwLjApLDEwLjA+")); } + protected List schemaAndValueForUnknownColumnBase64UrlSafe() { + return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "PCgxMC4wLDIwLjApLDEwLjA-")); + } + protected List schemaAndValueForUnknownColumnHex() { return Arrays.asList(new SchemaAndValueField("ccircle", Schema.OPTIONAL_STRING_SCHEMA, "3c2831302e302c32302e30292c31302e303e")); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index abf47ba61..e09abfaf0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -973,6 +973,24 @@ public void shouldGenerateSnapshotForByteaAsBase64String() throws Exception { consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); } + @Test + @FixFor("DBZ-5544") + public void shouldGenerateSnapshotForByteaAsBase64UrlSafeString() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + TestHelper.execute(INSERT_BYTEA_BINMODE_STMT); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE)); + + TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final Map> expectedValueByTopicName = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBase64UrlSafe()); + + consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); + } + @Test @FixFor("DBZ-1814") public void shouldGenerateSnapshotForByteaAsHexString() throws Exception { @@ -1028,6 +1046,25 @@ public void shouldGenerateSnapshotForUnknownColumnAsBase64() throws Exception { consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); } + @Test + @FixFor("DBZ-5544") + public void shouldGenerateSnapshotForUnknownColumnAsBase64UrlSafe() throws Exception { + TestHelper.dropAllSchemas(); + TestHelper.executeDDL("postgres_create_tables.ddl"); + TestHelper.execute(INSERT_CIRCLE_STMT); + + buildNoStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE)); + + TestConsumer consumer = testConsumer(1, "public"); + consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS); + + final Map> expectedValueByTopicName = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBase64UrlSafe()); + + consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); + } + @Test @FixFor("DBZ-1814") public void shouldGenerateSnapshotForUnknownColumnAsHex() throws Exception { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java index 583f6753e..d5fee01f6 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsStreamProducerIT.java @@ -1242,6 +1242,16 @@ public void shouldReceiveByteaBase64String() throws Exception { assertInsert(INSERT_BYTEA_BINMODE_STMT, 1, schemaAndValueForByteaBase64()); } + @Test + @FixFor("DBZ-5544") + public void shouldReceiveByteaBase64UrlSafeString() throws Exception { + TestHelper.executeDDL("postgres_create_tables.ddl"); + + startConnector(config -> config.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, PostgresConnectorConfig.BinaryHandlingMode.BASE64_URL_SAFE)); + + assertInsert(INSERT_BYTEA_BINMODE_STMT, 1, schemaAndValueForByteaBase64UrlSafe()); + } + @Test @FixFor("DBZ-1814") public void shouldReceiveByteaHexString() throws Exception { @@ -1273,6 +1283,17 @@ public void shouldReceiveUnknownTypeAsBase64() throws Exception { assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBase64()); } + @Test + @FixFor("DBZ-5544") + public void shouldReceiveUnknownTypeAsBase64UrlSafe() throws Exception { + TestHelper.executeDDL("postgres_create_tables.ddl"); + + startConnector(config -> config.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true) + .with(PostgresConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE)); + + assertInsert(INSERT_CIRCLE_STMT, 1, schemaAndValueForUnknownColumnBase64UrlSafe()); + } + @Test @FixFor("DBZ-1814") public void shouldReceiveUnknownTypeAsHex() throws Exception { diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerBinaryModeIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerBinaryModeIT.java index 7c9b7df7e..6c19154cb 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerBinaryModeIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SqlServerBinaryModeIT.java @@ -78,6 +78,15 @@ public void shouldReceiveBase64Binary() throws InterruptedException { assertEquals(expectedValue, data.get("varbinary_col")); } + @Test + public void shouldReceiveBase64UrlSafeBinary() throws InterruptedException { + Struct data = consume(BinaryHandlingMode.BASE64_URL_SAFE); + + String expectedValue = "AQID"; + assertEquals(expectedValue, data.get("binary_col")); + assertEquals(expectedValue, data.get("varbinary_col")); + } + private Struct consume(BinaryHandlingMode binaryMode) throws InterruptedException { final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL) diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 6c262f02b..9d93b2c56 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -186,6 +186,11 @@ public enum BinaryHandlingMode implements EnumeratedValue { */ BASE64("base64", SchemaBuilder::string), + /** + * Represent binary values as base64-url-safe-encoded string + */ + BASE64_URL_SAFE("base64-url-safe", SchemaBuilder::string), + /** * Represents binary values as hex-encoded (base16) string */ diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java index 8e9d3b3d9..6a4027c55 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java @@ -706,6 +706,8 @@ protected Object convertBinary(Column column, Field fieldDefn, Object data, Bina switch (mode) { case BASE64: return convertBinaryToBase64(column, fieldDefn, data); + case BASE64_URL_SAFE: + return convertBinaryToBase64UrlSafe(column, fieldDefn, data); case HEX: return convertBinaryToHex(column, fieldDefn, data); case BYTES: @@ -772,6 +774,36 @@ else if (data instanceof byte[]) { }); } + /** + * Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY}, + * {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}. + * + * @param column the column definition describing the {@code data} value; never null + * @param fieldDefn the field definition; never null + * @param data the data object to be converted into a {@link Date Kafka Connect date} type; never null + * @return the converted value, or null if the conversion could not be made and the column allows nulls + * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls + */ + protected Object convertBinaryToBase64UrlSafe(Column column, Field fieldDefn, Object data) { + return convertValue(column, fieldDefn, data, "", (r) -> { + Encoder base64UrlSafeEncoder = Base64.getUrlEncoder(); + + if (data instanceof String) { + r.deliver(new String(base64UrlSafeEncoder.encode(((String) data).getBytes(StandardCharsets.UTF_8)))); + } + else if (data instanceof char[]) { + r.deliver(new String(base64UrlSafeEncoder.encode(toByteArray((char[]) data)), StandardCharsets.UTF_8)); + } + else if (data instanceof byte[]) { + r.deliver(new String(base64UrlSafeEncoder.encode(normalizeBinaryData(column, (byte[]) data)), StandardCharsets.UTF_8)); + } + else { + // An unexpected value + r.deliver(unexpectedBinary(data, fieldDefn)); + } + }); + } + /** * Converts a value object for an expected JDBC type of {@link Types#BLOB}, {@link Types#BINARY}, * {@link Types#VARBINARY}, {@link Types#LONGVARBINARY}.