DBZ-8157 Support for MySQL VECTOR datatype
This commit is contained in:
parent
c026ddd172
commit
4406c78a2b
@ -49,6 +49,7 @@
|
|||||||
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
|
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
|
||||||
import io.debezium.data.Json;
|
import io.debezium.data.Json;
|
||||||
import io.debezium.data.SpecialValueDecimal;
|
import io.debezium.data.SpecialValueDecimal;
|
||||||
|
import io.debezium.data.vector.FloatVector;
|
||||||
import io.debezium.jdbc.JdbcValueConverters;
|
import io.debezium.jdbc.JdbcValueConverters;
|
||||||
import io.debezium.jdbc.TemporalPrecisionMode;
|
import io.debezium.jdbc.TemporalPrecisionMode;
|
||||||
import io.debezium.relational.Column;
|
import io.debezium.relational.Column;
|
||||||
@ -183,6 +184,9 @@ public SchemaBuilder schemaBuilder(Column column) {
|
|||||||
&& column.scale().isEmpty() && column.length() <= 24) {
|
&& column.scale().isEmpty() && column.length() <= 24) {
|
||||||
return SchemaBuilder.float32();
|
return SchemaBuilder.float32();
|
||||||
}
|
}
|
||||||
|
if (matches(typeName, "VECTOR")) {
|
||||||
|
return FloatVector.builder();
|
||||||
|
}
|
||||||
// Otherwise, let the base class handle it ...
|
// Otherwise, let the base class handle it ...
|
||||||
return super.schemaBuilder(column);
|
return super.schemaBuilder(column);
|
||||||
}
|
}
|
||||||
@ -251,6 +255,9 @@ public ValueConverter converter(Column column, Field fieldDefn) {
|
|||||||
return (data) -> convertUnsignedBigint(column, fieldDefn, data);
|
return (data) -> convertUnsignedBigint(column, fieldDefn, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (matches(typeName, "VECTOR")) {
|
||||||
|
return (data) -> convertVector(column, fieldDefn, data);
|
||||||
|
}
|
||||||
|
|
||||||
// We have to convert bytes encoded in the column's character set ...
|
// We have to convert bytes encoded in the column's character set ...
|
||||||
switch (column.jdbcType()) {
|
switch (column.jdbcType()) {
|
||||||
@ -836,6 +843,26 @@ protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn,
|
|||||||
return ((Timestamp) data).toLocalDateTime();
|
return ((Timestamp) data).toLocalDateTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a value representing a Vector {@code float[]} value to a FloatVector value used in a {@link SourceRecord}.
|
||||||
|
*
|
||||||
|
* @param column the column in which the value appears
|
||||||
|
* @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null
|
||||||
|
* @param rawData the data; may be null
|
||||||
|
* @return the converted value, or null if the conversion could not be made and the column allows nulls
|
||||||
|
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
|
||||||
|
*/
|
||||||
|
protected Object convertVector(Column column, Field fieldDefn, Object rawData) {
|
||||||
|
return convertValue(column, fieldDefn, rawData, new float[0], (r) -> {
|
||||||
|
if (rawData instanceof float[] data) {
|
||||||
|
r.deliver(FloatVector.fromLogical(fieldDefn.schema(), data));
|
||||||
|
}
|
||||||
|
else if (rawData instanceof byte[] data) {
|
||||||
|
r.deliver(FloatVector.fromLogical(fieldDefn, data));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract List<String> extractEnumAndSetOptions(Column column);
|
protected abstract List<String> extractEnumAndSetOptions(Column column);
|
||||||
|
|
||||||
protected String getJavaEncodingForCharSet(String charSetName) {
|
protected String getJavaEncodingForCharSet(String charSetName) {
|
||||||
|
@ -0,0 +1,180 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.connector.binlog;
|
||||||
|
|
||||||
|
import static io.debezium.junit.EqualityCheck.LESS_THAN;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.apache.kafka.connect.source.SourceConnector;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.connector.binlog.util.BinlogTestConnection;
|
||||||
|
import io.debezium.connector.binlog.util.TestHelper;
|
||||||
|
import io.debezium.connector.binlog.util.UniqueDatabase;
|
||||||
|
import io.debezium.data.vector.FloatVector;
|
||||||
|
import io.debezium.jdbc.JdbcConnection;
|
||||||
|
import io.debezium.junit.SkipWhenDatabaseVersion;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Jiri Pechanec
|
||||||
|
*/
|
||||||
|
@SkipWhenDatabaseVersion(check = LESS_THAN, major = 9, minor = 0, reason = "VECTOR datatype not added until MySQL 9.0")
|
||||||
|
public abstract class BinlogVectorIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
|
||||||
|
|
||||||
|
private static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-json.txt")
|
||||||
|
.toAbsolutePath();
|
||||||
|
private UniqueDatabase DATABASE;
|
||||||
|
|
||||||
|
private Configuration config;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void beforeEach() {
|
||||||
|
stopConnector();
|
||||||
|
|
||||||
|
DATABASE = TestHelper.getUniqueDatabase("vectorit", "vector_test")
|
||||||
|
.withDbHistoryPath(SCHEMA_HISTORY_PATH);
|
||||||
|
DATABASE.createAndInitialize();
|
||||||
|
|
||||||
|
initializeConnectorTestFramework();
|
||||||
|
Files.delete(SCHEMA_HISTORY_PATH);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void afterEach() {
|
||||||
|
try {
|
||||||
|
stopConnector();
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
Files.delete(SCHEMA_HISTORY_PATH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* @Test
|
||||||
|
* public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {
|
||||||
|
* // Use the DB configuration to define the connector's configuration ...
|
||||||
|
* config = DATABASE.defaultConfig()
|
||||||
|
* .with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
|
||||||
|
* .build();
|
||||||
|
*
|
||||||
|
* // Start the connector ...
|
||||||
|
* start(getConnectorClass(), config);
|
||||||
|
*
|
||||||
|
* // ---------------------------------------------------------------------------------------------------------------
|
||||||
|
* // Consume all of the events due to startup and initialization of the database
|
||||||
|
* // ---------------------------------------------------------------------------------------------------------------
|
||||||
|
* // Testing.Debug.enable();
|
||||||
|
* int numCreateDatabase = 1;
|
||||||
|
* int numCreateTables = 2;
|
||||||
|
* int numDataRecords = databaseDifferences.geometryPointTableRecords() + 2;
|
||||||
|
* SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables + numDataRecords);
|
||||||
|
* stopConnector();
|
||||||
|
* assertThat(records).isNotNull();
|
||||||
|
* assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numCreateDatabase + numCreateTables);
|
||||||
|
* assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_222_point")).size()).isEqualTo(databaseDifferences.geometryPointTableRecords());
|
||||||
|
* assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_507_geometry")).size()).isEqualTo(2);
|
||||||
|
* assertThat(records.topics().size()).isEqualTo(1 + numCreateTables);
|
||||||
|
* assertThat(records.databaseNames().size()).isEqualTo(1);
|
||||||
|
* assertThat(records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).size()).isEqualTo(
|
||||||
|
* numCreateDatabase + numCreateTables);
|
||||||
|
* assertThat(records.ddlRecordsForDatabase("regression_test")).isNull();
|
||||||
|
* assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
|
||||||
|
* assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
|
||||||
|
* assertThat(records.ddlRecordsForDatabase("json_test")).isNull();
|
||||||
|
* records.ddlRecordsForDatabase(DATABASE.getDatabaseName()).forEach(this::print);
|
||||||
|
*
|
||||||
|
* // Check that all records are valid, can be serialized and deserialized ...
|
||||||
|
* records.forEach(this::validate);
|
||||||
|
* records.forEach(record -> {
|
||||||
|
* Struct value = (Struct) record.value();
|
||||||
|
* if (record.topic().endsWith("dbz_222_point")) {
|
||||||
|
* assertPoint(value);
|
||||||
|
* }
|
||||||
|
* else if (record.topic().endsWith("dbz_507_geometry")) {
|
||||||
|
* assertGeomRecord(value);
|
||||||
|
* }
|
||||||
|
* });
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
|
||||||
|
// Use the DB configuration to define the connector's configuration ...
|
||||||
|
config = DATABASE.defaultConfig().build();
|
||||||
|
|
||||||
|
// Start the connector ...
|
||||||
|
start(getConnectorClass(), config);
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
|
// Consume all of the events due to startup and initialization of the database
|
||||||
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
|
// Testing.Debug.enable();
|
||||||
|
int numTables = 1;
|
||||||
|
int numDataRecords = 1;
|
||||||
|
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||||
|
int numSetVariables = 1;
|
||||||
|
var records = consumeRecordsByTopic(numDdlRecords + numSetVariables + numDataRecords);
|
||||||
|
|
||||||
|
assertThat(records).isNotNull();
|
||||||
|
final var dataRecords = records.recordsForTopic(DATABASE.topicForTable("dbz_8157"));
|
||||||
|
assertThat(dataRecords).hasSize(1);
|
||||||
|
var record = dataRecords.get(0);
|
||||||
|
var after = ((Struct) record.value()).getStruct("after");
|
||||||
|
assertThat(after.schema().field("f_vector_null").schema().name()).isEqualTo(FloatVector.LOGICAL_NAME);
|
||||||
|
assertThat(after.getArray("f_vector_null")).containsExactly(1.1f, 2.2f);
|
||||||
|
assertThat(after.getArray("f_vector_default")).containsExactly(11.5f, 22.6f);
|
||||||
|
assertThat(after.getArray("f_vector_cons")).containsExactly(31f, 32f);
|
||||||
|
|
||||||
|
stopConnector();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldConsumeAllEventsFromDatabaseUsingStreaming() throws SQLException, InterruptedException {
|
||||||
|
// Use the DB configuration to define the connector's configuration ...
|
||||||
|
config = DATABASE.defaultConfig()
|
||||||
|
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Start the connector ...
|
||||||
|
start(getConnectorClass(), config);
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
|
// Consume all of the events due to startup and initialization of the database
|
||||||
|
// ---------------------------------------------------------------------------------------------------------------
|
||||||
|
// Testing.Debug.enable();
|
||||||
|
int numTables = 1;
|
||||||
|
int numDdlRecords = numTables * 2 + 3; // for each table (1 drop + 1 create) + for each db (1 create + 1 drop + 1 use)
|
||||||
|
int numSetVariables = 1;
|
||||||
|
var records = consumeRecordsByTopic(numDdlRecords + numSetVariables);
|
||||||
|
|
||||||
|
try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName());) {
|
||||||
|
try (JdbcConnection connection = db.connect()) {
|
||||||
|
connection.execute(
|
||||||
|
"INSERT INTO dbz_8157 VALUES (default, string_to_vector('[10.1,10.2]'),string_to_vector('[20.1,20.2]'),string_to_vector('[30.1,30.2]'));");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
records = consumeRecordsByTopic(1);
|
||||||
|
|
||||||
|
assertThat(records).isNotNull();
|
||||||
|
final var dataRecords = records.recordsForTopic(DATABASE.topicForTable("dbz_8157"));
|
||||||
|
assertThat(dataRecords).hasSize(1);
|
||||||
|
var record = dataRecords.get(0);
|
||||||
|
var after = ((Struct) record.value()).getStruct("after");
|
||||||
|
assertThat(after.schema().field("f_vector_null").schema().name()).isEqualTo(FloatVector.LOGICAL_NAME);
|
||||||
|
assertThat(after.getArray("f_vector_null")).containsExactly(10.1f, 10.2f);
|
||||||
|
assertThat(after.getArray("f_vector_default")).containsExactly(20.1f, 20.2f);
|
||||||
|
assertThat(after.getArray("f_vector_cons")).containsExactly(30.1f, 30.2f);
|
||||||
|
|
||||||
|
stopConnector();
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
-- ----------------------------------------------------------------------------------------------------------------
|
||||||
|
-- DATABASE: vector_test
|
||||||
|
-- ----------------------------------------------------------------------------------------------------------------
|
||||||
|
-- The integration test for this database expects to scan all of the binlog events associated with this database
|
||||||
|
-- without error or problems. The integration test does not modify any records in this database, so this script
|
||||||
|
-- must contain all operations to these tables.
|
||||||
|
--
|
||||||
|
-- This relies upon MySQL 9.0's vector datatypes.
|
||||||
|
|
||||||
|
CREATE TABLE dbz_8157 (
|
||||||
|
id INT AUTO_INCREMENT NOT NULL,
|
||||||
|
f_vector_null VECTOR DEFAULT NULL,
|
||||||
|
f_vector_default VECTOR DEFAULT NULL,
|
||||||
|
f_vector_cons VECTOR(128) DEFAULT NULL,
|
||||||
|
PRIMARY KEY (id)
|
||||||
|
) DEFAULT CHARSET=utf8;
|
||||||
|
INSERT INTO dbz_8157 VALUES (default, string_to_vector('[1.1,2.2]'),string_to_vector('[11.5,22.6]'),string_to_vector('[31,32]'));
|
@ -190,6 +190,8 @@ protected DataTypeResolver initializeDataTypeResolver() {
|
|||||||
.setDefaultLengthScaleDimension(10, 0),
|
.setDefaultLengthScaleDimension(10, 0),
|
||||||
new DataTypeEntry(Types.BIT, MySqlParser.BIT)
|
new DataTypeEntry(Types.BIT, MySqlParser.BIT)
|
||||||
.setDefaultLengthDimension(1),
|
.setDefaultLengthDimension(1),
|
||||||
|
new DataTypeEntry(Types.OTHER, MySqlParser.VECTOR)
|
||||||
|
.setDefaultLengthDimension(2048),
|
||||||
new DataTypeEntry(Types.TIME, MySqlParser.TIME),
|
new DataTypeEntry(Types.TIME, MySqlParser.TIME),
|
||||||
new DataTypeEntry(Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP),
|
new DataTypeEntry(Types.TIMESTAMP_WITH_TIMEZONE, MySqlParser.TIMESTAMP),
|
||||||
new DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME),
|
new DataTypeEntry(Types.TIMESTAMP, MySqlParser.DATETIME),
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.connector.mysql;
|
||||||
|
|
||||||
|
import io.debezium.connector.binlog.BinlogVectorIT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Jiri Pechanec
|
||||||
|
*/
|
||||||
|
public class MySqlVectorIT extends BinlogVectorIT<MySqlConnector> implements MySqlCommon {
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.data.vector;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Schema;
|
||||||
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
|
|
||||||
|
import io.debezium.schema.SchemaFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A semantic type for a vector type with 8-byte elements.
|
||||||
|
*
|
||||||
|
* @author Jiri Pechanec
|
||||||
|
*/
|
||||||
|
public class DoubleVector {
|
||||||
|
|
||||||
|
public static final String LOGICAL_NAME = "io.debezium.data.DoubleVector";
|
||||||
|
public static int SCHEMA_VERSION = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link SchemaBuilder} for a 8-byte vector field. You can use the resulting SchemaBuilder
|
||||||
|
* to set additional schema settings such as required/optional, default value, and documentation.
|
||||||
|
*
|
||||||
|
* @return the schema builder
|
||||||
|
*/
|
||||||
|
public static SchemaBuilder builder() {
|
||||||
|
return SchemaFactory.get().datatypeDoubleVectorSchema();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link SchemaBuilder} for a 8-byte ector field, with all other default Schema settings.
|
||||||
|
*
|
||||||
|
* @return the schema
|
||||||
|
* @see #builder()
|
||||||
|
*/
|
||||||
|
public static Schema schema() {
|
||||||
|
return builder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a value from its logical format - {@link String} of {@code [x,y,z,...]}
|
||||||
|
* to its encoded format - a Connect array represented by list of numbers.
|
||||||
|
*
|
||||||
|
* @param schema of the encoded value
|
||||||
|
* @param value the value of the vector
|
||||||
|
*
|
||||||
|
* @return the encoded value
|
||||||
|
*/
|
||||||
|
public static List<Double> fromLogical(Schema schema, String value) {
|
||||||
|
return Vectors.fromVectorString(schema, value, Double::parseDouble);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,107 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.data.vector;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Field;
|
||||||
|
import org.apache.kafka.connect.data.Schema;
|
||||||
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import io.debezium.schema.SchemaFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A semantic type for a vector type with 4-byte elemnents.
|
||||||
|
*
|
||||||
|
* @author Jiri Pechanec
|
||||||
|
*/
|
||||||
|
public class FloatVector {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(FloatVector.class);
|
||||||
|
|
||||||
|
public static final String LOGICAL_NAME = "io.debezium.data.FloatVector";
|
||||||
|
public static int SCHEMA_VERSION = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link SchemaBuilder} for a 4-byte vector field. You can use the resulting SchemaBuilder
|
||||||
|
* to set additional schema settings such as required/optional, default value, and documentation.
|
||||||
|
*
|
||||||
|
* @return the schema builder
|
||||||
|
*/
|
||||||
|
public static SchemaBuilder builder() {
|
||||||
|
return SchemaFactory.get().datatypeFloatVectorSchema();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a {@link SchemaBuilder} for a 4-byte vector field, with all other default Schema settings.
|
||||||
|
*
|
||||||
|
* @return the schema
|
||||||
|
* @see #builder()
|
||||||
|
*/
|
||||||
|
public static Schema schema() {
|
||||||
|
return builder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a value from its logical format - {@link String} of {@code [x,y,z,...]}
|
||||||
|
* to its encoded format - a Connect array represented by list of numbers.
|
||||||
|
*
|
||||||
|
* @param schema of the encoded value
|
||||||
|
* @param value the value of the vector
|
||||||
|
*
|
||||||
|
* @return the encoded value
|
||||||
|
*/
|
||||||
|
public static List<Float> fromLogical(Schema schema, String value) {
|
||||||
|
return Vectors.fromVectorString(schema, value, Float::parseFloat);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a value from its raw array of floats format
|
||||||
|
* to its encoded format - a Connect array represented by list of numbers.
|
||||||
|
*
|
||||||
|
* @param schema of the encoded value
|
||||||
|
* @param value the value of the vector
|
||||||
|
*
|
||||||
|
* @return the encoded value
|
||||||
|
*/
|
||||||
|
public static List<Float> fromLogical(Schema schema, float[] value) {
|
||||||
|
final List<Float> ret = new ArrayList<>(value.length);
|
||||||
|
for (float v : value) {
|
||||||
|
ret.add(v);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a value from its octet stream of 4-byte values
|
||||||
|
* to its encoded format - a Connect array represented by list of numbers.
|
||||||
|
*
|
||||||
|
* @param schema of the encoded value
|
||||||
|
* @param value the value of the vector
|
||||||
|
*
|
||||||
|
* @return the encoded value
|
||||||
|
*/
|
||||||
|
public static List<Float> fromLogical(Field fieldDfn, byte[] value) {
|
||||||
|
if (value.length % Float.BYTES != 0) {
|
||||||
|
LOGGER.warn("Cannot convert field '{}', the octet stream is not multiply of {}", fieldDfn.name(), Float.BYTES);
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<Float> ret = new ArrayList<>(value.length);
|
||||||
|
|
||||||
|
for (int i = 0; i < value.length;) {
|
||||||
|
final var intValue = (value[i++] & 0xff)
|
||||||
|
| ((value[i++] & 0xff) << 8)
|
||||||
|
| ((value[i++] & 0xff) << 16)
|
||||||
|
| ((value[i++] & 0xff) << 24);
|
||||||
|
ret.add(Float.intBitsToFloat(intValue));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.data.vector;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.data.Schema;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public final class Vectors {
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(Vectors.class);
|
||||||
|
|
||||||
|
static <T> List<T> fromVectorString(Schema schema, String value, Function<String, T> elementMapper) {
|
||||||
|
Objects.requireNonNull(value, "value may not be null");
|
||||||
|
|
||||||
|
value = value.trim();
|
||||||
|
if (!value.startsWith("[") || !value.endsWith("]")) {
|
||||||
|
LOGGER.warn("Cannot convert vector {}, expected format is [x,y,z,...]", value);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
value = value.substring(1, value.length() - 1);
|
||||||
|
final var strValues = value.split(",");
|
||||||
|
final List<T> result = new ArrayList<>(strValues.length);
|
||||||
|
for (String element : strValues) {
|
||||||
|
result.add(elementMapper.apply(element.trim()));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
@ -21,6 +21,8 @@
|
|||||||
import io.debezium.data.Uuid;
|
import io.debezium.data.Uuid;
|
||||||
import io.debezium.data.VariableScaleDecimal;
|
import io.debezium.data.VariableScaleDecimal;
|
||||||
import io.debezium.data.Xml;
|
import io.debezium.data.Xml;
|
||||||
|
import io.debezium.data.vector.DoubleVector;
|
||||||
|
import io.debezium.data.vector.FloatVector;
|
||||||
import io.debezium.heartbeat.HeartbeatImpl;
|
import io.debezium.heartbeat.HeartbeatImpl;
|
||||||
import io.debezium.pipeline.notification.Notification;
|
import io.debezium.pipeline.notification.Notification;
|
||||||
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
|
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
|
||||||
@ -334,6 +336,18 @@ public SchemaBuilder datatypeXmlSchema() {
|
|||||||
.version(Xml.SCHEMA_VERSION);
|
.version(Xml.SCHEMA_VERSION);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SchemaBuilder datatypeDoubleVectorSchema() {
|
||||||
|
return SchemaBuilder.array(Schema.FLOAT64_SCHEMA)
|
||||||
|
.name(DoubleVector.LOGICAL_NAME)
|
||||||
|
.version(DoubleVector.SCHEMA_VERSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SchemaBuilder datatypeFloatVectorSchema() {
|
||||||
|
return SchemaBuilder.array(Schema.FLOAT32_SCHEMA)
|
||||||
|
.name(FloatVector.LOGICAL_NAME)
|
||||||
|
.version(FloatVector.SCHEMA_VERSION);
|
||||||
|
}
|
||||||
|
|
||||||
public Envelope.Builder datatypeEnvelopeSchema() {
|
public Envelope.Builder datatypeEnvelopeSchema() {
|
||||||
return new Envelope.Builder() {
|
return new Envelope.Builder() {
|
||||||
private final SchemaBuilder builder = SchemaBuilder.struct()
|
private final SchemaBuilder builder = SchemaBuilder.struct()
|
||||||
|
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.debezium.data.vector;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class VectorDatatypeTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldParseVector() {
|
||||||
|
final var expectedVector = List.of(10.0, 20.0, 30.0);
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2, 20.0, 30.0));
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02, 20.0, 30.0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldIgnoreErrorInVectorFormat() {
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "10,20,30]")).isNull();
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30")).isNull();
|
||||||
|
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "{10,20,30}")).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = NumberFormatException.class)
|
||||||
|
public void shouldFailOnNumberInVectorFormat() {
|
||||||
|
DoubleVector.fromLogical(DoubleVector.schema(), "[a10,20,30]");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldParseHalfVector() {
|
||||||
|
final var expectedVector = List.of(10.0f, 20.0f, 30.0f);
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector);
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2f, 20.0f, 30.0f));
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02f, 20.0f, 30.0f));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldIgnoreErrorInHalfVectorFormat() {
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "10,20,30]")).isNull();
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30")).isNull();
|
||||||
|
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "{10,20,30}")).isNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = NumberFormatException.class)
|
||||||
|
public void shouldFailOnNumberInHalfVectorFormat() {
|
||||||
|
FloatVector.fromLogical(FloatVector.schema(), "[a10,20,30]");
|
||||||
|
}
|
||||||
|
}
|
4
pom.xml
4
pom.xml
@ -137,8 +137,8 @@
|
|||||||
|
|
||||||
<!-- Database drivers, should align with databases -->
|
<!-- Database drivers, should align with databases -->
|
||||||
<version.postgresql.driver>42.6.1</version.postgresql.driver>
|
<version.postgresql.driver>42.6.1</version.postgresql.driver>
|
||||||
<version.mysql.driver>8.3.0</version.mysql.driver>
|
<version.mysql.driver>9.0.0</version.mysql.driver>
|
||||||
<version.mysql.binlog>0.30.0</version.mysql.binlog>
|
<version.mysql.binlog>0.31.0</version.mysql.binlog>
|
||||||
<version.mongo.driver>4.11.0</version.mongo.driver>
|
<version.mongo.driver>4.11.0</version.mongo.driver>
|
||||||
<version.sqlserver.driver>12.4.2.jre8</version.sqlserver.driver>
|
<version.sqlserver.driver>12.4.2.jre8</version.sqlserver.driver>
|
||||||
<version.db2.driver>11.5.0.0</version.db2.driver>
|
<version.db2.driver>11.5.0.0</version.db2.driver>
|
||||||
|
Loading…
Reference in New Issue
Block a user