From badae96e1d132f0c5d71458599e3b5e1c857811a Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Tue, 18 Jun 2019 10:50:59 +0200 Subject: [PATCH] DBZ-1335 Process unknown datatypes for snapshotted arrays --- .../postgresql/PostgresValueConverter.java | 22 +++++++++++++++++-- .../postgresql/RecordsSnapshotProducer.java | 10 +-------- .../wal2json/Wal2JsonReplicationMessage.java | 5 +---- .../AbstractRecordsProducerTest.java | 7 +++--- .../test/resources/postgres_create_tables.ddl | 2 +- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 90a8b8a57..0f982c98c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -23,6 +23,7 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.temporal.ChronoField; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; @@ -36,7 +37,9 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.geometric.PGpoint; +import org.postgresql.jdbc.PgArray; import org.postgresql.util.HStoreConverter; import org.postgresql.util.PGInterval; import org.postgresql.util.PGobject; @@ -786,12 +789,24 @@ else if (data instanceof PgProto.Point) { protected Object convertArray(Column column, Field fieldDefn, ValueConverter elementConverter, Object data) { return convertValue(column, fieldDefn, data, Collections.emptyList(), (r) -> { - // RecordStreamProducer and RecordsSnapshotProducer should ensure this arrives as a list if (data instanceof List) { r.deliver(((List) data).stream() .map(elementConverter::convert) .collect(Collectors.toList())); } + else if (data instanceof PgArray) { + try { + final Object[] values = (Object[]) ((PgArray) data).getArray(); + final List converted = new ArrayList<>(values.length); + for (Object value: values) { + converted.add(elementConverter.convert(value)); + } + r.deliver(converted); + } + catch (SQLException e) { + throw new ConnectException("Failed to read value of array " + column.name()); + } + } }); } @@ -841,6 +856,9 @@ protected int getTimePrecision(Column column) { */ @Override protected Object convertBinary(Column column, Field fieldDefn, Object data) { - return super.convertBinary(column, fieldDefn, (data instanceof PGobject)?((PGobject) data).getValue():data); + if (data instanceof PgArray) { + data = ((PgArray) data).toString(); + } + return super.convertBinary(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java index 2c0a38956..fbf58347b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsSnapshotProducer.java @@ -6,14 +6,12 @@ package io.debezium.connector.postgresql; -import java.sql.Array; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -357,13 +355,7 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa logger.trace("Type is: {}", type); if (type.isArrayType()) { - Array array = rs.getArray(colIdx); - - if (array == null) { - return null; - } - - return Arrays.asList((Object[]) array.getArray()); + return rs.getArray(colIdx); } switch (type.getOid()) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java index 287081cb2..088957805 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java @@ -12,7 +12,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.regex.Matcher; @@ -190,9 +189,7 @@ public Object getValue(String columnName, PostgresType type, String fullType, Va if (type.isArrayType()) { try { final String dataString = rawValue.asString(); - PgArray arrayData = new PgArray(connection.get(), type.getOid(), dataString); - Object deserializedArray = arrayData.getArray(); - return Arrays.asList((Object[]) deserializedArray); + return new PgArray(connection.get(), type.getOid(), dataString); } catch (SQLException e) { LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index ec15e564b..187213261 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -156,8 +156,8 @@ public abstract class AbstractRecordsProducerTest { protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " + "VALUES ('some text')"; - protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n) " + - "VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL)"; + protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n, lt_array) " + + "VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')"; protected static final String INSERT_HSTORE_TYPE_STMT = "INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)"; @@ -707,7 +707,8 @@ protected List schemasAndValuesForNumericTypesUsingStringEn protected List schemasAndValuesForCustomTypes() { return Arrays.asList(new SchemaAndValueField("lt", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("Top.Collections.Pictures.Astronomy.Galaxies".getBytes())), new SchemaAndValueField("i", Schema.BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())), - new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null)); + new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null), + new SchemaAndValueField("lt_array", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("{Ship.Frigate,Ship.Destroyer}".getBytes()))); } diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 2d1fbe182..b3565d774 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -28,7 +28,7 @@ CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk)); CREATE TABLE range_table (pk SERIAL, unbounded_exclusive_tsrange TSRANGE, bounded_inclusive_tsrange TSRANGE, unbounded_exclusive_tstzrange TSTZRANGE, bounded_inclusive_tstzrange TSTZRANGE, unbounded_exclusive_daterange DATERANGE, bounded_exclusive_daterange DATERANGE, int4_number_range INT4RANGE, numerange NUMRANGE, int8_number_range INT8RANGE, PRIMARY KEY(pk)); CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], PRIMARY KEY(pk)); CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], PRIMARY KEY(pk)); -CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, PRIMARY KEY(pk)); +CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, lt_array ltree[], PRIMARY KEY(pk)); CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk));