DBZ-8183 Use unified vector dataype names

This commit is contained in:
Jiri Pechanec 2024-08-29 10:40:54 +02:00
parent e1ae60e756
commit be9c70d631
8 changed files with 23 additions and 200 deletions

View File

@ -10,9 +10,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.connector.postgresql.data.vector.HalfVector;
import io.debezium.connector.postgresql.data.vector.SparseVector;
import io.debezium.connector.postgresql.data.vector.Vector;
import io.debezium.data.Envelope;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
@ -76,18 +74,6 @@ public SchemaBuilder datatypeLtreeSchema() {
.version(Ltree.SCHEMA_VERSION);
}
public SchemaBuilder datatypeVectorSchema() {
return SchemaBuilder.array(Schema.FLOAT64_SCHEMA)
.name(Vector.LOGICAL_NAME)
.version(Vector.SCHEMA_VERSION);
}
public SchemaBuilder datatypeHalfVectorSchema() {
return SchemaBuilder.array(Schema.FLOAT32_SCHEMA)
.name(HalfVector.LOGICAL_NAME)
.version(HalfVector.SCHEMA_VERSION);
}
public SchemaBuilder datatypeSparseVectorSchema() {
return SchemaBuilder.struct()
.name(SparseVector.LOGICAL_NAME)

View File

@ -55,9 +55,7 @@
import io.debezium.connector.postgresql.PostgresConnectorConfig.HStoreHandlingMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode;
import io.debezium.connector.postgresql.data.Ltree;
import io.debezium.connector.postgresql.data.vector.HalfVector;
import io.debezium.connector.postgresql.data.vector.SparseVector;
import io.debezium.connector.postgresql.data.vector.Vector;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.Bits;
import io.debezium.data.Json;
@ -67,6 +65,8 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
import io.debezium.data.vector.DoubleVector;
import io.debezium.data.vector.FloatVector;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
@ -325,10 +325,10 @@ else if (oidValue == typeRegistry.ltreeOid()) {
return Ltree.builder();
}
else if (oidValue == typeRegistry.vectorOid()) {
return Vector.builder();
return DoubleVector.builder();
}
else if (oidValue == typeRegistry.halfVectorOid()) {
return HalfVector.builder();
return FloatVector.builder();
}
else if (oidValue == typeRegistry.sparseVectorOid()) {
return SparseVector.builder();
@ -683,13 +683,13 @@ else if (data instanceof PGobject) {
private Object convertPgVector(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, Collections.emptyList(), r -> {
if (data instanceof byte[] typedData) {
r.deliver(Vector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset)));
r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset)));
}
if (data instanceof String typedData) {
r.deliver(Vector.fromLogical(fieldDefn.schema(), typedData));
r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), typedData));
}
else if (data instanceof PGobject typedData) {
r.deliver(Vector.fromLogical(fieldDefn.schema(), typedData.getValue()));
r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), typedData.getValue()));
}
});
}
@ -697,13 +697,13 @@ else if (data instanceof PGobject typedData) {
private Object convertPgHalfVector(Column column, Field fieldDefn, Object data) {
return convertValue(column, fieldDefn, data, Collections.emptyList(), r -> {
if (data instanceof byte[] typedData) {
r.deliver(HalfVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset)));
r.deliver(FloatVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset)));
}
if (data instanceof String typedData) {
r.deliver(HalfVector.fromLogical(fieldDefn.schema(), typedData));
r.deliver(FloatVector.fromLogical(fieldDefn.schema(), typedData));
}
else if (data instanceof PGobject typedData) {
r.deliver(HalfVector.fromLogical(fieldDefn.schema(), typedData.getValue()));
r.deliver(FloatVector.fromLogical(fieldDefn.schema(), typedData.getValue()));
}
});
}

View File

@ -1,57 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.data.vector;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.connector.postgresql.PostgresSchemaFactory;
/**
* A semantic type for a PgVector halfvec type.
*
* @author Jiri Pechanec
*/
public class HalfVector {
public static final String LOGICAL_NAME = "io.debezium.data.HalfVector";
public static int SCHEMA_VERSION = 1;
/**
* Returns a {@link SchemaBuilder} for a halfvec field. You can use the resulting SchemaBuilder
* to set additional schema settings such as required/optional, default value, and documentation.
*
* @return the schema builder
*/
public static SchemaBuilder builder() {
return PostgresSchemaFactory.get().datatypeHalfVectorSchema();
}
/**
* Returns a {@link SchemaBuilder} for a halfvec field, with all other default Schema settings.
*
* @return the schema
* @see #builder()
*/
public static Schema schema() {
return builder().build();
}
/**
* Converts a value from its logical format - {@link String} of {@code [x,y,z,...]}
* to its encoded format - a Connect array represented by list of numbers.
*
* @param schema of the encoded value
* @param value the value of the vector
*
* @return the encoded value
*/
public static List<Float> fromLogical(Schema schema, String value) {
return Vectors.fromVectorString(schema, value, Float::parseFloat);
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.data.vector;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import io.debezium.connector.postgresql.PostgresSchemaFactory;
/**
* A semantic type for a PgVector vector type.
*
* @author Jiri Pechanec
*/
public class Vector {
public static final String LOGICAL_NAME = "io.debezium.data.Vector";
public static int SCHEMA_VERSION = 1;
/**
* Returns a {@link SchemaBuilder} for a vector field. You can use the resulting SchemaBuilder
* to set additional schema settings such as required/optional, default value, and documentation.
*
* @return the schema builder
*/
public static SchemaBuilder builder() {
return PostgresSchemaFactory.get().datatypeVectorSchema();
}
/**
* Returns a {@link SchemaBuilder} for a vector field, with all other default Schema settings.
*
* @return the schema
* @see #builder()
*/
public static Schema schema() {
return builder().build();
}
/**
* Converts a value from its logical format - {@link String} of {@code [x,y,z,...]}
* to its encoded format - a Connect array represented by list of numbers.
*
* @param schema of the encoded value
* @param value the value of the vector
*
* @return the encoded value
*/
public static List<Double> fromLogical(Schema schema, String value) {
return Vectors.fromVectorString(schema, value, Double::parseDouble);
}
}

View File

@ -67,8 +67,8 @@ public void shouldSnapshotAndStreamData() throws Exception {
Assertions.assertThat(recs).hasSize(2);
var rec = ((Struct) recs.get(0).value());
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.HalfVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.FloatVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_sparsevec").schema().name()).isEqualTo("io.debezium.data.SparseVector");
Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(1.0, 2.0, 3.0));
Assertions.assertThat(rec.getStruct("after").getArray("f_halfvec")).isEqualTo(List.of(101.0f, 102.0f, 103.0f));
@ -76,8 +76,8 @@ public void shouldSnapshotAndStreamData() throws Exception {
Assertions.assertThat(rec.getStruct("after").getStruct("f_sparsevec").getMap("vector")).isEqualTo(Map.of((short) 1, 201.0, (short) 9, 209.0));
rec = ((Struct) recs.get(1).value());
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.HalfVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.FloatVector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_sparsevec").schema().name()).isEqualTo("io.debezium.data.SparseVector");
Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(10.0, 20.0, 30.0));
Assertions.assertThat(rec.getStruct("after").getArray("f_halfvec")).isEqualTo(List.of(110.0f, 120.0f, 130.0f));
@ -104,7 +104,7 @@ public void shouldStreamData() throws Exception {
Assertions.assertThat(recs).hasSize(1);
var rec = ((Struct) recs.get(0).value());
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector");
Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector");
Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(1.0, 2.0, 3.0));
}
}

View File

@ -6,64 +6,15 @@
package io.debezium.connector.postgresql;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import io.debezium.connector.postgresql.data.vector.HalfVector;
import io.debezium.connector.postgresql.data.vector.SparseVector;
import io.debezium.connector.postgresql.data.vector.Vector;
public class VectorDatabaseTest {
@Test
public void shouldParseVector() {
final var expectedVector = List.of(10.0, 20.0, 30.0);
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
Assertions.assertThat(Vector.fromLogical(Vector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector);
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector);
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2, 20.0, 30.0));
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02, 20.0, 30.0));
}
@Test
public void shouldIgnoreErrorInVectorFormat() {
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "10,20,30]")).isNull();
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10,20,30")).isNull();
Assertions.assertThat(Vector.fromLogical(Vector.schema(), "{10,20,30}")).isNull();
}
@Test(expected = NumberFormatException.class)
public void shouldFailOnNumberInVectorFormat() {
Vector.fromLogical(Vector.schema(), "[a10,20,30]");
}
@Test
public void shouldParseHalfVector() {
final var expectedVector = List.of(10.0f, 20.0f, 30.0f);
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector);
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector);
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2f, 20.0f, 30.0f));
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02f, 20.0f, 30.0f));
}
@Test
public void shouldIgnoreErrorInHalfVectorFormat() {
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "10,20,30]")).isNull();
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10,20,30")).isNull();
Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "{10,20,30}")).isNull();
}
@Test(expected = NumberFormatException.class)
public void shouldFailOnNumberInHalfVectorFormat() {
HalfVector.fromLogical(HalfVector.schema(), "[a10,20,30]");
}
@Test
public void shouldParseSparseVector() {
final var expectedVector = Map.of((short) 1, 10.0, (short) 11, 20.0, (short) 111, 30.0);

View File

@ -14,7 +14,7 @@
public class VectorDatatypeTest {
@Test
public void shouldParseVector() {
public void shouldParseDoubleVector() {
final var expectedVector = List.of(10.0, 20.0, 30.0);
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
@ -25,19 +25,19 @@ public void shouldParseVector() {
}
@Test
public void shouldIgnoreErrorInVectorFormat() {
public void shouldIgnoreErrorInDoubleVectorFormat() {
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "10,20,30]")).isNull();
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30")).isNull();
Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "{10,20,30}")).isNull();
}
@Test(expected = NumberFormatException.class)
public void shouldFailOnNumberInVectorFormat() {
public void shouldFailOnNumberInDoubleVectorFormat() {
DoubleVector.fromLogical(DoubleVector.schema(), "[a10,20,30]");
}
@Test
public void shouldParseHalfVector() {
public void shouldParseFloatVector() {
final var expectedVector = List.of(10.0f, 20.0f, 30.0f);
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30]")).isEqualTo(expectedVector);
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector);
@ -48,14 +48,14 @@ public void shouldParseHalfVector() {
}
@Test
public void shouldIgnoreErrorInHalfVectorFormat() {
public void shouldIgnoreErrorInFloatVectorFormat() {
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "10,20,30]")).isNull();
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30")).isNull();
Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "{10,20,30}")).isNull();
}
@Test(expected = NumberFormatException.class)
public void shouldFailOnNumberInHalfVectorFormat() {
public void shouldFailOnNumberInFloatVectorFormat() {
FloatVector.fromLogical(FloatVector.schema(), "[a10,20,30]");
}
}

View File

@ -1926,11 +1926,11 @@ The PostgreSQL connector supports all link:https://github.com/pgvector/pgvector[
|`VECTOR`
|`ARRAY (FLOAT64)`
|`io.debezium.data.Vector` +
|`io.debezium.data.DoubleVector` +
|`HALFVEC`
|`ARRAY (FLOAT32)`
|`io.debezium.data.HalfVector` +
|`io.debezium.data.FloatVector` +
|`SPARSEVEC`
|`STRUCT`