DBZ-6967: Handle bytea target field with Postgres

This commit is contained in:
Bertrand Paquet 2023-09-26 11:43:06 +02:00 committed by Fiore Mario Vitale
parent 7a340989b8
commit b26a98b27c
3 changed files with 104 additions and 0 deletions

View File

@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.dialect.postgres;
import java.nio.ByteBuffer;
import java.sql.Types;
import org.apache.kafka.connect.data.Schema;
import org.hibernate.engine.jdbc.Size;
import org.hibernate.query.Query;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.ByteArrayUtils;
/**
* An implementation of {@link Type} for {@code BYTES} column types.
*
* @author Bertrand Paquet
*/
class BytesType extends AbstractType {
public static final BytesType INSTANCE = new BytesType();
@Override
public String[] getRegistrationKeys() {
return new String[]{ "BYTES" };
}
@Override
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
return String.format(dialect.getByteArrayFormat(), ByteArrayUtils.getByteArrayAsHex(value));
}
@Override
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
final int columnSize = Integer.parseInt(getSourceColumnSize(schema).orElse("0"));
if (columnSize > 0) {
return dialect.getTypeName(Types.VARBINARY, Size.length(columnSize));
}
else if (key) {
return dialect.getTypeName(Types.VARBINARY, Size.length(dialect.getMaxVarbinaryLength()));
}
return dialect.getTypeName(Types.VARBINARY);
}
@Override
public int bind(Query<?> query, int index, Schema schema, Object value) {
if (value instanceof ByteBuffer) {
value = ((ByteBuffer) value).array();
}
query.setParameter(index, value);
return 1;
}
}

View File

@ -154,6 +154,7 @@ protected void registerTypes() {
registerType(IntervalType.INSTANCE);
registerType(SerialType.INSTANCE);
registerType(BitType.INSTANCE);
registerType(BytesType.INSTANCE);
registerType(JsonType.INSTANCE);
registerType(UuidType.INSTANCE);
registerType(EnumType.INSTANCE);

View File

@ -5,6 +5,9 @@
*/
package io.debezium.connector.jdbc.integration.postgres;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
@ -96,4 +99,44 @@ private void shouldCoerceStringTypeToColumnType(SinkRecordFactory factory, Strin
getSink().assertColumn(destinationTable, "data", columnType);
}
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-6967")
public void testShouldCoerceNioByteBufferTypeToByteArrayColumnType(SinkRecordFactory factory) throws Exception {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
startSinkConnector(properties);
assertSinkConnectorIsRunning();
final String tableName = randomTableName();
final String topicName = topicName("server2", "schema", tableName);
ByteBuffer buffer = ByteBuffer.allocate(3);
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 3);
final SinkRecord createRecord = factory.createRecordWithSchemaValue(
topicName,
(byte) 1,
"data",
Schema.OPTIONAL_BYTES_SCHEMA,
buffer);
final String destinationTable = destinationTableName(createRecord);
final String sql = "CREATE TABLE %s (id int not null, data bytea, primary key(id))";
getSink().execute(String.format(sql, destinationTable));
consume(createRecord);
getSink().assertRows(destinationTable, rs -> {
assertThat(rs.getInt(1)).isEqualTo(1);
assertThat(rs.getBytes(2)).isEqualTo(new byte[]{ 1, 2, 3 });
return null;
});
}
}