DBZ-8076 Support capturing BLOB during snapshots

This commit is contained in:
Chris Cranford 2024-07-18 12:21:49 -04:00 committed by Chris Cranford
parent 60478be66f
commit 7486707787
2 changed files with 55 additions and 113 deletions

View File

@ -13,6 +13,8 @@
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Duration;
@ -334,6 +336,21 @@ protected Object convertFloat(Column column, Field fieldDefn, Object data) {
return super.convertFloat(column, fieldDefn, data);
}
@Override
protected Object convertBinary(Column column, Field fieldDefn, Object data, BinaryHandlingMode mode) {
// During snapshots, the JDBC ResultSet returns Blob instances
if (data instanceof Blob) {
try {
final Blob blob = (Blob) data;
data = blob.getBytes(1, (int) blob.length());
}
catch (SQLException e) {
throw new DebeziumException("Failed to parse and read BLOB data for column " + column.name(), e);
}
}
return super.convertBinary(column, fieldDefn, data, mode);
}
@Override
protected byte[] normalizeBinaryData(Column column, byte[] data) {
// DBZ-254 right-pad fixed-length binary column values with 0x00 (zero byte)

View File

@ -10,7 +10,6 @@
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
@ -22,6 +21,7 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotMode;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.doc.FixFor;
@ -58,132 +58,59 @@ public void afterEach() {
@Test
@FixFor("DBZ-1814")
public void shouldReceiveRawBinary() throws SQLException, InterruptedException {
public void shouldReceiveRawBinaryStreaming() throws InterruptedException {
consume(SnapshotMode.NEVER, BinaryHandlingMode.BYTES, 1, ByteBuffer.wrap(new byte[]{ 1, 2, 3 }));
}
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BYTES)
.build();
// Start the connector ...
start(getConnectorClass(), config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int createDatabaseCount = 1;
int createTableCount = 1;
int insertCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount);
stopConnector();
assertThat(sourceRecords).isNotNull();
List<SourceRecord> topicSourceRecords = sourceRecords.recordsForTopic(DATABASE.topicForTable("dbz_1814_binary_mode_test"));
assertThat(topicSourceRecords).hasSize(1);
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
ByteBuffer expectedValue = ByteBuffer.wrap(new byte[]{ 1, 2, 3 });
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("longblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("binary_col"));
assertEquals(expectedValue, kafkaDataStructure.get("varbinary_col"));
// Check that all records are valid, can be serialized and deserialized ...
sourceRecords.forEach(this::validate);
@Test
@FixFor("DBZ-8076")
public void shouldReceiveRawBinarySnapshot() throws InterruptedException {
// SET CHARSET, DROP TABLE, DROP DATABASE, CREATE DATABASE, USE DATABASE
consume(SnapshotMode.INITIAL, BinaryHandlingMode.BYTES, 5, ByteBuffer.wrap(new byte[]{ 1, 2, 3 }));
}
@Test
@FixFor("DBZ-1814")
public void shouldReceiveHexBinary() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.HEX)
.build();
public void shouldReceiveHexBinaryStreaming() throws InterruptedException {
consume(SnapshotMode.NEVER, BinaryHandlingMode.HEX, 1, "010203");
}
// Start the connector ...
start(getConnectorClass(), config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int createDatabaseCount = 1;
int createTableCount = 1;
int insertCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount);
stopConnector();
assertThat(sourceRecords).isNotNull();
List<SourceRecord> topicSourceRecords = sourceRecords.recordsForTopic(DATABASE.topicForTable("dbz_1814_binary_mode_test"));
assertThat(topicSourceRecords).hasSize(1);
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
String expectedValue = "010203";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("longblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("binary_col"));
assertEquals(expectedValue, kafkaDataStructure.get("varbinary_col"));
// Check that all records are valid, can be serialized and deserialized ...
sourceRecords.forEach(this::validate);
@Test
@FixFor("DBZ-8076")
public void shouldReceiveHexBinarySnapshot() throws InterruptedException {
// SET CHARSET, DROP TABLE, DROP DATABASE, CREATE DATABASE, USE DATABASE
consume(SnapshotMode.INITIAL, BinaryHandlingMode.HEX, 5, "010203");
}
@Test
@FixFor("DBZ-1814")
public void shouldReceiveBase64Binary() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64)
.build();
public void shouldReceiveBase64BinaryStream() throws InterruptedException {
consume(SnapshotMode.NEVER, BinaryHandlingMode.BASE64, 1, "AQID");
}
// Start the connector ...
start(getConnectorClass(), config);
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int createDatabaseCount = 1;
int createTableCount = 1;
int insertCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount);
stopConnector();
assertThat(sourceRecords).isNotNull();
List<SourceRecord> topicSourceRecords = sourceRecords.recordsForTopic(DATABASE.topicForTable("dbz_1814_binary_mode_test"));
assertThat(topicSourceRecords).hasSize(1);
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
String expectedValue = "AQID";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("longblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("binary_col"));
assertEquals(expectedValue, kafkaDataStructure.get("varbinary_col"));
// Check that all records are valid, can be serialized and deserialized ...
sourceRecords.forEach(this::validate);
@Test
@FixFor("DBZ-8076")
public void shouldReceiveBase64BinarySnapshot() throws InterruptedException {
consume(SnapshotMode.INITIAL, BinaryHandlingMode.BASE64, 5, "AQID");
}
@Test
@FixFor("DBZ-5544")
public void shouldReceiveBase64UrlSafeBinary() throws SQLException, InterruptedException {
public void shouldReceiveBase64UrlSafeBinaryStream() throws InterruptedException {
consume(SnapshotMode.NEVER, BinaryHandlingMode.BASE64_URL_SAFE, 1, "AQID");
}
@Test
@FixFor("DBZ-8076")
public void shouldReceiveBase64UrlSafeBinarySnapshot() throws InterruptedException {
consume(SnapshotMode.INITIAL, BinaryHandlingMode.BASE64_URL_SAFE, 5, "AQID");
}
private void consume(SnapshotMode snapshotMode, BinaryHandlingMode binaryHandlingMode, int metadataEventCount, Object expectedValue) throws InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, BinaryHandlingMode.BASE64_URL_SAFE)
.with(BinlogConnectorConfig.SNAPSHOT_MODE, snapshotMode)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, binaryHandlingMode)
.build();
// Start the connector ...
@ -193,10 +120,9 @@ public void shouldReceiveBase64UrlSafeBinary() throws SQLException, InterruptedE
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
int createDatabaseCount = 1;
int createTableCount = 1;
int insertCount = 1;
SourceRecords sourceRecords = consumeRecordsByTopic(createDatabaseCount + createTableCount + insertCount);
SourceRecords sourceRecords = consumeRecordsByTopic(metadataEventCount + createTableCount + insertCount);
stopConnector();
assertThat(sourceRecords).isNotNull();
@ -205,7 +131,6 @@ public void shouldReceiveBase64UrlSafeBinary() throws SQLException, InterruptedE
SourceRecord topicSourceRecord = topicSourceRecords.get(0);
Struct kafkaDataStructure = (Struct) ((Struct) topicSourceRecord.value()).get("after");
String expectedValue = "AQID";
assertEquals(expectedValue, kafkaDataStructure.get("blob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("tinyblob_col"));
assertEquals(expectedValue, kafkaDataStructure.get("mediumblob_col"));