DBZ-1335 Process unknown datatypes for snapshotted arrays

This commit is contained in:
Jiri Pechanec 2019-06-18 10:50:59 +02:00 committed by Gunnar Morling
parent 851a3ef676
commit badae96e1d
5 changed files with 27 additions and 19 deletions

View File

@ -23,6 +23,7 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -36,7 +37,9 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.util.HStoreConverter;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;
@ -786,12 +789,24 @@ else if (data instanceof PgProto.Point) {
protected Object convertArray(Column column, Field fieldDefn, ValueConverter elementConverter, Object data) {
return convertValue(column, fieldDefn, data, Collections.emptyList(), (r) -> {
// RecordStreamProducer and RecordsSnapshotProducer should ensure this arrives as a list
if (data instanceof List) {
r.deliver(((List<?>) data).stream()
.map(elementConverter::convert)
.collect(Collectors.toList()));
}
else if (data instanceof PgArray) {
try {
final Object[] values = (Object[]) ((PgArray) data).getArray();
final List<Object> converted = new ArrayList<>(values.length);
for (Object value: values) {
converted.add(elementConverter.convert(value));
}
r.deliver(converted);
}
catch (SQLException e) {
throw new ConnectException("Failed to read value of array " + column.name());
}
}
});
}
@ -841,6 +856,9 @@ protected int getTimePrecision(Column column) {
*/
@Override
protected Object convertBinary(Column column, Field fieldDefn, Object data) {
if (data instanceof PgArray) {
data = ((PgArray) data).toString();
}
return super.convertBinary(column, fieldDefn, (data instanceof PGobject) ? ((PGobject) data).getValue() : data);
}
}

View File

@ -6,14 +6,12 @@
package io.debezium.connector.postgresql;
import java.sql.Array;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -357,13 +355,7 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa
logger.trace("Type is: {}", type);
if (type.isArrayType()) {
Array array = rs.getArray(colIdx);
if (array == null) {
return null;
}
return Arrays.asList((Object[]) array.getArray());
return rs.getArray(colIdx);
}
switch (type.getOid()) {

View File

@ -12,7 +12,6 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
@ -190,9 +189,7 @@ public Object getValue(String columnName, PostgresType type, String fullType, Va
if (type.isArrayType()) {
try {
final String dataString = rawValue.asString();
PgArray arrayData = new PgArray(connection.get(), type.getOid(), dataString);
Object deserializedArray = arrayData.getArray();
return Arrays.asList((Object[]) deserializedArray);
return new PgArray(connection.get(), type.getOid(), dataString);
}
catch (SQLException e) {
LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e);

View File

@ -156,8 +156,8 @@ public abstract class AbstractRecordsProducerTest {
protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " +
"VALUES ('some text')";
protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n) " +
"VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL)";
protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n, lt_array) " +
"VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')";
protected static final String INSERT_HSTORE_TYPE_STMT = "INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)";
@ -707,7 +707,8 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericTypesUsingStringEn
protected List<SchemaAndValueField> schemasAndValuesForCustomTypes() {
return Arrays.asList(new SchemaAndValueField("lt", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("Top.Collections.Pictures.Astronomy.Galaxies".getBytes())),
new SchemaAndValueField("i", Schema.BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())),
new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null));
new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("lt_array", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("{Ship.Frigate,Ship.Destroyer}".getBytes())));
}

View File

@ -28,7 +28,7 @@ CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE range_table (pk SERIAL, unbounded_exclusive_tsrange TSRANGE, bounded_inclusive_tsrange TSRANGE, unbounded_exclusive_tstzrange TSTZRANGE, bounded_inclusive_tstzrange TSTZRANGE, unbounded_exclusive_daterange DATERANGE, bounded_exclusive_daterange DATERANGE, int4_number_range INT4RANGE, numerange NUMRANGE, int8_number_range INT8RANGE, PRIMARY KEY(pk));
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], PRIMARY KEY(pk));
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, lt_array ltree[], PRIMARY KEY(pk));
CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_with_null (pk serial, hs hstore, PRIMARY KEY(pk));