From 2c74dd74a30b5c59a541c2d9e4ac7e6854c3ed34 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 29 Jan 2018 04:39:39 +0100 Subject: [PATCH] DBZ-577 Numeric arrays correctly working --- .../postgresql/PostgresValueConverter.java | 4 +- .../AbstractRecordsProducerTest.java | 43 ++++++++++++++++--- .../test/resources/postgres_create_tables.ddl | 4 +- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index a011915f7..6d2b50b7c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -127,7 +127,7 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.BPCHAR_ARRAY: return SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA); case PgOid.NUMERIC_ARRAY: - return SchemaBuilder.array(numericSchema(column).optional()); + return SchemaBuilder.array(numericSchema(column).optional().build()); case PgOid.FLOAT4_ARRAY: return SchemaBuilder.array(Schema.OPTIONAL_FLOAT32_SCHEMA); case PgOid.FLOAT8_ARRAY: @@ -278,6 +278,8 @@ private ValueConverter createArrayConverter(Column column, Field fieldDefn) { .jdbcType(schema.columnTypeNameToJdbcTypeId(PostgresSchema.parse(elementTypeName).table())) .type(elementTypeName) .optional(true) + .scale(column.scale()) + .length(column.length()) .create(); final Field elementField = new Field(elementColumnName, 0, schemaBuilder(elementColumn).build()); final ValueConverter elementConverter = converter(elementColumn, elementField); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 3433809ad..187fac0b2 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -22,6 +22,7 @@ import java.time.Month; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -94,11 +95,11 @@ public abstract class AbstractRecordsProducerTest { "VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')"; - protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array) " + - "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}')"; + protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array) " + + "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}')"; - protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array) " + - "VALUES (null, null, null, null)"; + protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array) " + + "VALUES (null, null, null, null, null, null)"; protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " + "VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)"; @@ -240,6 +241,18 @@ protected List schemaAndValuesForMoneyTypes() { } protected List schemasAndValuesForArrayTypes() { + Struct element; + final List varnumArray = new ArrayList<>(); + element = new Struct(VariableScaleDecimal.schema()); + element.put("scale", 1).put("value", new BigDecimal("1.1").unscaledValue().toByteArray()); + varnumArray.add(element); + element = new Struct(VariableScaleDecimal.schema()); + element.put("scale", 2).put("value", new BigDecimal("2.22").unscaledValue().toByteArray()); + varnumArray.add(element); + element = new Struct(VariableScaleDecimal.schema()); + element.put("scale", 3).put("value", new BigDecimal("3.333").unscaledValue().toByteArray()); + varnumArray.add(element); + return Arrays.asList(new SchemaAndValueField("int_array", SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build(), Arrays.asList(1, 2, 3)), new SchemaAndValueField("bigint_array", SchemaBuilder.array(Schema.OPTIONAL_INT64_SCHEMA).optional().build(), @@ -255,7 +268,15 @@ protected List schemasAndValuesForArrayTypes() { (int)LocalDate.of(2016, Month.NOVEMBER, 4).toEpochDay(), (int)LocalDate.of(2016, Month.NOVEMBER, 5).toEpochDay(), (int)LocalDate.of(2016, Month.NOVEMBER, 6).toEpochDay() - )) + )), + new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), + Arrays.asList( + new BigDecimal("1.20"), + new BigDecimal("3.40"), + new BigDecimal("5.60") + )), + new SchemaAndValueField("varnumeric_array", SchemaBuilder.array(VariableScaleDecimal.builder().optional().build()).optional().build(), + varnumArray) ); } @@ -266,7 +287,8 @@ protected List schemasAndValuesForArrayTypesWithNullValues( new SchemaAndValueField("text_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), new SchemaAndValueField("char_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), new SchemaAndValueField("varchar_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), - new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null) + new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null), + new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null) ); } @@ -442,6 +464,15 @@ private void assertValue(Struct content) { // assert the value type; for List all implementation types (e.g. immutable ones) are acceptable if(actualValue instanceof List) { assertTrue("Incorrect value type for " + fieldName, value instanceof List); + final List actualValueList = (List)actualValue; + final List valueList = (List)value; + assertEquals("List size don't match for " + fieldName, valueList.size(), actualValueList.size()); + if (!valueList.isEmpty() && valueList.iterator().next() instanceof Struct) { + for (int i = 0; i < valueList.size(); i++) { + assertStruct((Struct)valueList.get(i), (Struct)actualValueList.get(i)); + } + return; + } } else { assertEquals("Incorrect value type for " + fieldName, value.getClass(), actualValue.getClass()); diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 96d15872a..0363a7fca 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -18,8 +18,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tz TIMESTAMPTZ, date DATE, ti CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk)); CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk)); CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk)); -CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], PRIMARY KEY(pk)); -CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], PRIMARY KEY(pk)); +CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); +CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, PRIMARY KEY(pk)); DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE;