From 236ea1a15fdc6141501521d8a1040c1b350bf972 Mon Sep 17 00:00:00 2001 From: Anton Kondratev Date: Mon, 27 Apr 2020 22:17:48 +0300 Subject: [PATCH] DBZ-1990 Support for Postgres Json and Jsonb array columns --- .../postgresql/PostgresValueConverter.java | 9 +++++---- .../AbstractRecordsProducerTest.java | 18 ++++++++++++------ .../test/resources/postgres_create_tables.ddl | 4 ++-- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 22fb4632c..14a92a0bb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -228,6 +228,9 @@ public SchemaBuilder schemaBuilder(Column column) { return SchemaBuilder.array(org.apache.kafka.connect.data.Date.builder().optional().build()); case PgOid.UUID_ARRAY: return SchemaBuilder.array(Uuid.builder().optional().build()); + case PgOid.JSONB_ARRAY: + case PgOid.JSON_ARRAY: + return SchemaBuilder.array(Json.builder().optional().build()); case PgOid.TIME_ARRAY: case PgOid.TIMETZ_ARRAY: case PgOid.TIMESTAMP_ARRAY: @@ -240,8 +243,6 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.VARBIT_ARRAY: case PgOid.XML_ARRAY: case PgOid.POINT_ARRAY: - case PgOid.JSONB_ARRAY: - case PgOid.JSON_ARRAY: case PgOid.REF_CURSOR_ARRAY: // These array types still need to be implemented. The superclass won't handle them so // we return null here until we can code schema implementations for them. @@ -377,6 +378,8 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.NUM_RANGE_ARRAY: case PgOid.INT8RANGE_ARRAY: case PgOid.UUID_ARRAY: + case PgOid.JSONB_ARRAY: + case PgOid.JSON_ARRAY: return createArrayConverter(column, fieldDefn); // TODO DBZ-459 implement support for these array types; for now we just fall back to the default, i.e. @@ -393,8 +396,6 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.VARBIT_ARRAY: case PgOid.XML_ARRAY: case PgOid.POINT_ARRAY: - case PgOid.JSONB_ARRAY: - case PgOid.JSON_ARRAY: case PgOid.REF_CURSOR_ARRAY: return super.converter(column, fieldDefn); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 6f9b9e3a2..c75b49f6a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -155,15 +155,17 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest + "VALUES ('[2019-03-31 15:30:00, infinity)', '[2019-03-31 15:30:00, 2019-04-30 15:30:00]', '[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]', '[2019-03-31, infinity)', '[2019-03-31, 2019-04-30)', '[1000,6000)', '[5.3,6.3)', '[1000000,6000000)')"; - protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array) " + protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) " + "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}', '{\"192.168.2.0/12\",\"192.168.1.1\",\"192.168.0.2/1\"}', '{\"192.168.100.128/25\", \"192.168.0.0/25\", \"192.168.1.0/24\"}', '{\"08:00:2b:01:02:03\", \"08-00-2b-01-02-03\", \"08002b:010203\"}'," + - "'{\"[2019-03-31 15:30:00, infinity)\", \"[2019-03-31 15:30:00, 2019-04-30 15:30:00]\"}', '{\"[2017-06-05 11:29:12.549426+00,)\", \"[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]\"}', '{\"[2019-03-31, infinity)\", \"[2019-03-31, 2019-04-30)\"}', '{\"[1,6)\", \"[1,4)\"}', '{\"[5.3,6.3)\", \"[10.0,20.0)\"}', '{\"[1000000,6000000)\", \"[5000,9000)\"}', '{\"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\", \"f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\"}')"; - - protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array) " + "'{\"[2019-03-31 15:30:00, infinity)\", \"[2019-03-31 15:30:00, 2019-04-30 15:30:00]\"}', '{\"[2017-06-05 11:29:12.549426+00,)\", \"[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]\"}', '{\"[2019-03-31, infinity)\", \"[2019-03-31, 2019-04-30)\"}', '{\"[1,6)\", \"[1,4)\"}', '{\"[5.3,6.3)\", \"[10.0,20.0)\"}', '{\"[1000000,6000000)\", \"[5000,9000)\"}', '{\"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\", \"f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\"}'," + - "VALUES (null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"; + "array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::json[], array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::jsonb[])"; + + protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) " + + + "VALUES (null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)"; protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " + "VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)"; @@ -695,7 +697,11 @@ protected List schemasAndValuesForArrayTypes() { new SchemaAndValueField("int8range_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), Arrays.asList("[1000000,6000000)", "[5000,9000)")), new SchemaAndValueField("uuid_array", SchemaBuilder.array(Uuid.builder().optional().build()).optional().build(), - Arrays.asList("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"))); + Arrays.asList("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11")), + new SchemaAndValueField("json_array", SchemaBuilder.array(Json.builder().optional().build()).optional().build(), + Arrays.asList("{\"bar\": \"baz\"}", "{\"foo\": \"qux\"}")), + new SchemaAndValueField("jsonb_array", SchemaBuilder.array(Json.builder().optional().build()).optional().build(), + Arrays.asList("{\"bar\": \"baz\"}", "{\"foo\": \"qux\"}"))); } protected List schemasAndValuesForArrayTypesWithNullValues() { diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 785f32929..af37c59d7 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -35,8 +35,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk)); CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk)); CREATE TABLE range_table (pk SERIAL, unbounded_exclusive_tsrange TSRANGE, bounded_inclusive_tsrange TSRANGE, unbounded_exclusive_tstzrange TSTZRANGE, bounded_inclusive_tstzrange TSTZRANGE, unbounded_exclusive_daterange DATERANGE, bounded_exclusive_daterange DATERANGE, int4_number_range INT4RANGE, numerange NUMRANGE, int8_number_range INT8RANGE, PRIMARY KEY(pk)); -CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], PRIMARY KEY(pk)); -CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], PRIMARY KEY(pk)); +CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], PRIMARY KEY(pk)); +CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], PRIMARY KEY(pk)); CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, lt_array ltree[], PRIMARY KEY(pk)); CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_mul (pk serial, hs hstore, hsarr hstore[], PRIMARY KEY(pk));