DBZ-1990 Support for Postgres Json and Jsonb array columns

This commit is contained in:
Anton Kondratev 2020-04-27 22:17:48 +03:00
parent e9632c2227
commit 236ea1a15f
3 changed files with 19 additions and 12 deletions

View File

@ -228,6 +228,9 @@ public SchemaBuilder schemaBuilder(Column column) {
return SchemaBuilder.array(org.apache.kafka.connect.data.Date.builder().optional().build());
case PgOid.UUID_ARRAY:
return SchemaBuilder.array(Uuid.builder().optional().build());
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
return SchemaBuilder.array(Json.builder().optional().build());
case PgOid.TIME_ARRAY:
case PgOid.TIMETZ_ARRAY:
case PgOid.TIMESTAMP_ARRAY:
@ -240,8 +243,6 @@ public SchemaBuilder schemaBuilder(Column column) {
case PgOid.VARBIT_ARRAY:
case PgOid.XML_ARRAY:
case PgOid.POINT_ARRAY:
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
case PgOid.REF_CURSOR_ARRAY:
// These array types still need to be implemented. The superclass won't handle them so
// we return null here until we can code schema implementations for them.
@ -377,6 +378,8 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.NUM_RANGE_ARRAY:
case PgOid.INT8RANGE_ARRAY:
case PgOid.UUID_ARRAY:
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
return createArrayConverter(column, fieldDefn);
// TODO DBZ-459 implement support for these array types; for now we just fall back to the default, i.e.
@ -393,8 +396,6 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.VARBIT_ARRAY:
case PgOid.XML_ARRAY:
case PgOid.POINT_ARRAY:
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
case PgOid.REF_CURSOR_ARRAY:
return super.converter(column, fieldDefn);

View File

@ -155,15 +155,17 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
+
"VALUES ('[2019-03-31 15:30:00, infinity)', '[2019-03-31 15:30:00, 2019-04-30 15:30:00]', '[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]', '[2019-03-31, infinity)', '[2019-03-31, 2019-04-30)', '[1000,6000)', '[5.3,6.3)', '[1000000,6000000)')";
protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array) "
protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) "
+
"VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}', '{\"192.168.2.0/12\",\"192.168.1.1\",\"192.168.0.2/1\"}', '{\"192.168.100.128/25\", \"192.168.0.0/25\", \"192.168.1.0/24\"}', '{\"08:00:2b:01:02:03\", \"08-00-2b-01-02-03\", \"08002b:010203\"}',"
+
"'{\"[2019-03-31 15:30:00, infinity)\", \"[2019-03-31 15:30:00, 2019-04-30 15:30:00]\"}', '{\"[2017-06-05 11:29:12.549426+00,)\", \"[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]\"}', '{\"[2019-03-31, infinity)\", \"[2019-03-31, 2019-04-30)\"}', '{\"[1,6)\", \"[1,4)\"}', '{\"[5.3,6.3)\", \"[10.0,20.0)\"}', '{\"[1000000,6000000)\", \"[5000,9000)\"}', '{\"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\", \"f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\"}')";
protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array) "
"'{\"[2019-03-31 15:30:00, infinity)\", \"[2019-03-31 15:30:00, 2019-04-30 15:30:00]\"}', '{\"[2017-06-05 11:29:12.549426+00,)\", \"[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]\"}', '{\"[2019-03-31, infinity)\", \"[2019-03-31, 2019-04-30)\"}', '{\"[1,6)\", \"[1,4)\"}', '{\"[5.3,6.3)\", \"[10.0,20.0)\"}', '{\"[1000000,6000000)\", \"[5000,9000)\"}', '{\"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\", \"f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\"}',"
+
"VALUES (null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)";
"array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::json[], array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::jsonb[])";
protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) "
+
"VALUES (null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)";
protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " +
"VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)";
@ -695,7 +697,11 @@ protected List<SchemaAndValueField> schemasAndValuesForArrayTypes() {
new SchemaAndValueField("int8range_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(),
Arrays.asList("[1000000,6000000)", "[5000,9000)")),
new SchemaAndValueField("uuid_array", SchemaBuilder.array(Uuid.builder().optional().build()).optional().build(),
Arrays.asList("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11")));
Arrays.asList("a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", "f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11")),
new SchemaAndValueField("json_array", SchemaBuilder.array(Json.builder().optional().build()).optional().build(),
Arrays.asList("{\"bar\": \"baz\"}", "{\"foo\": \"qux\"}")),
new SchemaAndValueField("jsonb_array", SchemaBuilder.array(Json.builder().optional().build()).optional().build(),
Arrays.asList("{\"bar\": \"baz\"}", "{\"foo\": \"qux\"}")));
}
protected List<SchemaAndValueField> schemasAndValuesForArrayTypesWithNullValues() {

View File

@ -35,8 +35,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));
CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE range_table (pk SERIAL, unbounded_exclusive_tsrange TSRANGE, bounded_inclusive_tsrange TSRANGE, unbounded_exclusive_tstzrange TSTZRANGE, bounded_inclusive_tstzrange TSTZRANGE, unbounded_exclusive_daterange DATERANGE, bounded_exclusive_daterange DATERANGE, int4_number_range INT4RANGE, numerange NUMRANGE, int8_number_range INT8RANGE, PRIMARY KEY(pk));
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], PRIMARY KEY(pk));
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], PRIMARY KEY(pk));
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], PRIMARY KEY(pk));
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[], numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, lt_array ltree[], PRIMARY KEY(pk));
CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk));
CREATE TABLE hstore_table_mul (pk serial, hs hstore, hsarr hstore[], PRIMARY KEY(pk));