diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java index f37d06216..701891f93 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSchemaFactory.java @@ -10,9 +10,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.postgresql.data.Ltree; -import io.debezium.connector.postgresql.data.vector.HalfVector; import io.debezium.connector.postgresql.data.vector.SparseVector; -import io.debezium.connector.postgresql.data.vector.Vector; import io.debezium.data.Envelope; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; @@ -76,18 +74,6 @@ public SchemaBuilder datatypeLtreeSchema() { .version(Ltree.SCHEMA_VERSION); } - public SchemaBuilder datatypeVectorSchema() { - return SchemaBuilder.array(Schema.FLOAT64_SCHEMA) - .name(Vector.LOGICAL_NAME) - .version(Vector.SCHEMA_VERSION); - } - - public SchemaBuilder datatypeHalfVectorSchema() { - return SchemaBuilder.array(Schema.FLOAT32_SCHEMA) - .name(HalfVector.LOGICAL_NAME) - .version(HalfVector.SCHEMA_VERSION); - } - public SchemaBuilder datatypeSparseVectorSchema() { return SchemaBuilder.struct() .name(SparseVector.LOGICAL_NAME) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 960f9df02..37ed2b5bd 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -55,9 +55,7 @@ import io.debezium.connector.postgresql.PostgresConnectorConfig.HStoreHandlingMode; import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode; import io.debezium.connector.postgresql.data.Ltree; -import io.debezium.connector.postgresql.data.vector.HalfVector; import io.debezium.connector.postgresql.data.vector.SparseVector; -import io.debezium.connector.postgresql.data.vector.Vector; import io.debezium.connector.postgresql.proto.PgProto; import io.debezium.data.Bits; import io.debezium.data.Json; @@ -67,6 +65,8 @@ import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; import io.debezium.data.geometry.Point; +import io.debezium.data.vector.DoubleVector; +import io.debezium.data.vector.FloatVector; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; @@ -325,10 +325,10 @@ else if (oidValue == typeRegistry.ltreeOid()) { return Ltree.builder(); } else if (oidValue == typeRegistry.vectorOid()) { - return Vector.builder(); + return DoubleVector.builder(); } else if (oidValue == typeRegistry.halfVectorOid()) { - return HalfVector.builder(); + return FloatVector.builder(); } else if (oidValue == typeRegistry.sparseVectorOid()) { return SparseVector.builder(); @@ -683,13 +683,13 @@ else if (data instanceof PGobject) { private Object convertPgVector(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, Collections.emptyList(), r -> { if (data instanceof byte[] typedData) { - r.deliver(Vector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset))); + r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset))); } if (data instanceof String typedData) { - r.deliver(Vector.fromLogical(fieldDefn.schema(), typedData)); + r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), typedData)); } else if (data instanceof PGobject typedData) { - r.deliver(Vector.fromLogical(fieldDefn.schema(), typedData.getValue())); + r.deliver(DoubleVector.fromLogical(fieldDefn.schema(), typedData.getValue())); } }); } @@ -697,13 +697,13 @@ else if (data instanceof PGobject typedData) { private Object convertPgHalfVector(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, Collections.emptyList(), r -> { if (data instanceof byte[] typedData) { - r.deliver(HalfVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset))); + r.deliver(FloatVector.fromLogical(fieldDefn.schema(), new String(typedData, databaseCharset))); } if (data instanceof String typedData) { - r.deliver(HalfVector.fromLogical(fieldDefn.schema(), typedData)); + r.deliver(FloatVector.fromLogical(fieldDefn.schema(), typedData)); } else if (data instanceof PGobject typedData) { - r.deliver(HalfVector.fromLogical(fieldDefn.schema(), typedData.getValue())); + r.deliver(FloatVector.fromLogical(fieldDefn.schema(), typedData.getValue())); } }); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/HalfVector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/HalfVector.java deleted file mode 100644 index aa9873711..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/HalfVector.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.data.vector; - -import java.util.List; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; - -import io.debezium.connector.postgresql.PostgresSchemaFactory; - -/** - * A semantic type for a PgVector halfvec type. - * - * @author Jiri Pechanec - */ -public class HalfVector { - - public static final String LOGICAL_NAME = "io.debezium.data.HalfVector"; - public static int SCHEMA_VERSION = 1; - - /** - * Returns a {@link SchemaBuilder} for a halfvec field. You can use the resulting SchemaBuilder - * to set additional schema settings such as required/optional, default value, and documentation. - * - * @return the schema builder - */ - public static SchemaBuilder builder() { - return PostgresSchemaFactory.get().datatypeHalfVectorSchema(); - } - - /** - * Returns a {@link SchemaBuilder} for a halfvec field, with all other default Schema settings. - * - * @return the schema - * @see #builder() - */ - public static Schema schema() { - return builder().build(); - } - - /** - * Converts a value from its logical format - {@link String} of {@code [x,y,z,...]} - * to its encoded format - a Connect array represented by list of numbers. - * - * @param schema of the encoded value - * @param value the value of the vector - * - * @return the encoded value - */ - public static List fromLogical(Schema schema, String value) { - return Vectors.fromVectorString(schema, value, Float::parseFloat); - } -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/Vector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/Vector.java deleted file mode 100644 index bd8269ee7..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/data/vector/Vector.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.data.vector; - -import java.util.List; - -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; - -import io.debezium.connector.postgresql.PostgresSchemaFactory; - -/** - * A semantic type for a PgVector vector type. - * - * @author Jiri Pechanec - */ -public class Vector { - - public static final String LOGICAL_NAME = "io.debezium.data.Vector"; - public static int SCHEMA_VERSION = 1; - - /** - * Returns a {@link SchemaBuilder} for a vector field. You can use the resulting SchemaBuilder - * to set additional schema settings such as required/optional, default value, and documentation. - * - * @return the schema builder - */ - public static SchemaBuilder builder() { - return PostgresSchemaFactory.get().datatypeVectorSchema(); - } - - /** - * Returns a {@link SchemaBuilder} for a vector field, with all other default Schema settings. - * - * @return the schema - * @see #builder() - */ - public static Schema schema() { - return builder().build(); - } - - /** - * Converts a value from its logical format - {@link String} of {@code [x,y,z,...]} - * to its encoded format - a Connect array represented by list of numbers. - * - * @param schema of the encoded value - * @param value the value of the vector - * - * @return the encoded value - */ - public static List fromLogical(Schema schema, String value) { - return Vectors.fromVectorString(schema, value, Double::parseDouble); - } -} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseIT.java index 4e9dd1586..c6ef4bf06 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseIT.java @@ -67,8 +67,8 @@ public void shouldSnapshotAndStreamData() throws Exception { Assertions.assertThat(recs).hasSize(2); var rec = ((Struct) recs.get(0).value()); - Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector"); - Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.HalfVector"); + Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector"); + Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.FloatVector"); Assertions.assertThat(rec.schema().field("after").schema().field("f_sparsevec").schema().name()).isEqualTo("io.debezium.data.SparseVector"); Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(1.0, 2.0, 3.0)); Assertions.assertThat(rec.getStruct("after").getArray("f_halfvec")).isEqualTo(List.of(101.0f, 102.0f, 103.0f)); @@ -76,8 +76,8 @@ public void shouldSnapshotAndStreamData() throws Exception { Assertions.assertThat(rec.getStruct("after").getStruct("f_sparsevec").getMap("vector")).isEqualTo(Map.of((short) 1, 201.0, (short) 9, 209.0)); rec = ((Struct) recs.get(1).value()); - Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector"); - Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.HalfVector"); + Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector"); + Assertions.assertThat(rec.schema().field("after").schema().field("f_halfvec").schema().name()).isEqualTo("io.debezium.data.FloatVector"); Assertions.assertThat(rec.schema().field("after").schema().field("f_sparsevec").schema().name()).isEqualTo("io.debezium.data.SparseVector"); Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(10.0, 20.0, 30.0)); Assertions.assertThat(rec.getStruct("after").getArray("f_halfvec")).isEqualTo(List.of(110.0f, 120.0f, 130.0f)); @@ -104,7 +104,7 @@ public void shouldStreamData() throws Exception { Assertions.assertThat(recs).hasSize(1); var rec = ((Struct) recs.get(0).value()); - Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.Vector"); + Assertions.assertThat(rec.schema().field("after").schema().field("f_vector").schema().name()).isEqualTo("io.debezium.data.DoubleVector"); Assertions.assertThat(rec.getStruct("after").getArray("f_vector")).isEqualTo(List.of(1.0, 2.0, 3.0)); } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseTest.java index 2e04a7643..852b49593 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/VectorDatabaseTest.java @@ -6,64 +6,15 @@ package io.debezium.connector.postgresql; -import java.util.List; import java.util.Map; import org.assertj.core.api.Assertions; import org.junit.Test; -import io.debezium.connector.postgresql.data.vector.HalfVector; import io.debezium.connector.postgresql.data.vector.SparseVector; -import io.debezium.connector.postgresql.data.vector.Vector; public class VectorDatabaseTest { - @Test - public void shouldParseVector() { - final var expectedVector = List.of(10.0, 20.0, 30.0); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10,20,30]")).isEqualTo(expectedVector); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2, 20.0, 30.0)); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02, 20.0, 30.0)); - } - - @Test - public void shouldIgnoreErrorInVectorFormat() { - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "10,20,30]")).isNull(); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "[10,20,30")).isNull(); - Assertions.assertThat(Vector.fromLogical(Vector.schema(), "{10,20,30}")).isNull(); - } - - @Test(expected = NumberFormatException.class) - public void shouldFailOnNumberInVectorFormat() { - Vector.fromLogical(Vector.schema(), "[a10,20,30]"); - } - - @Test - public void shouldParseHalfVector() { - final var expectedVector = List.of(10.0f, 20.0f, 30.0f); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10,20,30]")).isEqualTo(expectedVector); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), " [ 10,20,30 ]")).isEqualTo(expectedVector); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10 ,20 ,30]")).isEqualTo(expectedVector); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10.2 , 20, 30]")).isEqualTo(List.of(10.2f, 20.0f, 30.0f)); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10.2e-1 , 20, 30]")).isEqualTo(List.of(1.02f, 20.0f, 30.0f)); - } - - @Test - public void shouldIgnoreErrorInHalfVectorFormat() { - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "10,20,30]")).isNull(); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "[10,20,30")).isNull(); - Assertions.assertThat(HalfVector.fromLogical(HalfVector.schema(), "{10,20,30}")).isNull(); - } - - @Test(expected = NumberFormatException.class) - public void shouldFailOnNumberInHalfVectorFormat() { - HalfVector.fromLogical(HalfVector.schema(), "[a10,20,30]"); - } - @Test public void shouldParseSparseVector() { final var expectedVector = Map.of((short) 1, 10.0, (short) 11, 20.0, (short) 111, 30.0); diff --git a/debezium-core/src/test/java/io/debezium/data/vector/VectorDatatypeTest.java b/debezium-core/src/test/java/io/debezium/data/vector/VectorDatatypeTest.java index 73c308ca0..21b84da7b 100644 --- a/debezium-core/src/test/java/io/debezium/data/vector/VectorDatatypeTest.java +++ b/debezium-core/src/test/java/io/debezium/data/vector/VectorDatatypeTest.java @@ -14,7 +14,7 @@ public class VectorDatatypeTest { @Test - public void shouldParseVector() { + public void shouldParseDoubleVector() { final var expectedVector = List.of(10.0, 20.0, 30.0); Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30]")).isEqualTo(expectedVector); Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector); @@ -25,19 +25,19 @@ public void shouldParseVector() { } @Test - public void shouldIgnoreErrorInVectorFormat() { + public void shouldIgnoreErrorInDoubleVectorFormat() { Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "10,20,30]")).isNull(); Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "[10,20,30")).isNull(); Assertions.assertThat(DoubleVector.fromLogical(DoubleVector.schema(), "{10,20,30}")).isNull(); } @Test(expected = NumberFormatException.class) - public void shouldFailOnNumberInVectorFormat() { + public void shouldFailOnNumberInDoubleVectorFormat() { DoubleVector.fromLogical(DoubleVector.schema(), "[a10,20,30]"); } @Test - public void shouldParseHalfVector() { + public void shouldParseFloatVector() { final var expectedVector = List.of(10.0f, 20.0f, 30.0f); Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30]")).isEqualTo(expectedVector); Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[ 10,20,30] ")).isEqualTo(expectedVector); @@ -48,14 +48,14 @@ public void shouldParseHalfVector() { } @Test - public void shouldIgnoreErrorInHalfVectorFormat() { + public void shouldIgnoreErrorInFloatVectorFormat() { Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "10,20,30]")).isNull(); Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "[10,20,30")).isNull(); Assertions.assertThat(FloatVector.fromLogical(FloatVector.schema(), "{10,20,30}")).isNull(); } @Test(expected = NumberFormatException.class) - public void shouldFailOnNumberInHalfVectorFormat() { + public void shouldFailOnNumberInFloatVectorFormat() { FloatVector.fromLogical(FloatVector.schema(), "[a10,20,30]"); } } diff --git a/documentation/modules/ROOT/pages/connectors/postgresql.adoc b/documentation/modules/ROOT/pages/connectors/postgresql.adoc index ed352d38b..6200c711a 100644 --- a/documentation/modules/ROOT/pages/connectors/postgresql.adoc +++ b/documentation/modules/ROOT/pages/connectors/postgresql.adoc @@ -1926,11 +1926,11 @@ The PostgreSQL connector supports all link:https://github.com/pgvector/pgvector[ |`VECTOR` |`ARRAY (FLOAT64)` -|`io.debezium.data.Vector` + +|`io.debezium.data.DoubleVector` + |`HALFVEC` |`ARRAY (FLOAT32)` -|`io.debezium.data.HalfVector` + +|`io.debezium.data.FloatVector` + |`SPARSEVEC` |`STRUCT`