diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgOid.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgOid.java index c792df689..7d03c29f1 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgOid.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PgOid.java @@ -23,4 +23,6 @@ public final class PgOid extends Oid { public static final int TSTZRANGE_OID = 3910; public static final int INET_OID = 869; public static final int INET_ARRAY = 1041; + public static final int CIDR_OID=650; + public static final int CIDR_ARRAY=651; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java index 4f3ed9afe..2df8b3b61 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java @@ -143,6 +143,7 @@ public SchemaBuilder schemaBuilder(Column column) { return Json.builder(); case PgOid.TSTZRANGE_OID: case PgOid.INET_OID: + case PgOid.CIDR_OID: return SchemaBuilder.string(); case PgOid.UUID: return Uuid.builder(); @@ -166,6 +167,7 @@ public SchemaBuilder schemaBuilder(Column column) { case PgOid.TEXT_ARRAY: case PgOid.BPCHAR_ARRAY: case PgOid.INET_ARRAY: + case PgOid.CIDR_ARRAY: return SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA); case PgOid.NUMERIC_ARRAY: return SchemaBuilder.array(numericSchema(column).optional().build()); @@ -275,6 +277,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.TSTZRANGE_OID: case PgOid.JSON: case PgOid.INET_OID: + case PgOid.CIDR_OID: return data -> super.convertString(column, fieldDefn, data); case PgOid.POINT: return data -> convertPoint(column, fieldDefn, data); @@ -297,6 +300,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case PgOid.BOOL_ARRAY: case PgOid.DATE_ARRAY: case PgOid.INET_ARRAY: + case PgOid.CIDR_ARRAY: return createArrayConverter(column, fieldDefn); // TODO DBZ-459 implement support for these array types; for now we just fall back to the default, i.e. diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java index aacb2308a..5bbc993bb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java @@ -176,6 +176,7 @@ else if (datumMessage.hasDatumString()) { case PgOid.BIT: case PgOid.VARBIT: case PgOid.INET_OID: + case PgOid.CIDR_OID: return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null; case PgOid.DATE: return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null; @@ -242,6 +243,7 @@ else if (datumMessage.hasDatumString()) { case PgOid.JSON_ARRAY: case PgOid.REF_CURSOR_ARRAY: case PgOid.INET_ARRAY: + case PgOid.CIDR_ARRAY: return getArray(datumMessage, connection, columnType); case PgOid.UNSPECIFIED: diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java index c100fadf4..23928f068 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/Wal2JsonReplicationMessage.java @@ -362,10 +362,10 @@ else if (rawValue.isBigInteger()) { case "uuid": case "tstzrange": case "inet": + case "cidr": return rawValue.asString(); // catch-all for other known/builtin PG types // TODO: improve with more specific/useful classes here? - case "cidr": case "macaddr": case "macaddr8": case "pg_lsn": diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index e15c2f670..6f177832f 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -94,6 +94,8 @@ public abstract class AbstractRecordsProducerTest { "VALUES ('\u017E\u0161', 'bb', 'cdef', 'abc', 'some text', E'\\\\000\\\\001\\\\002'::bytea, E'\\\\003\\\\004\\\\005'::bytea, 'Hello World')"; protected static final String INSERT_NETWORK_ADDRESS_TYPES_STMT = "INSERT INTO network_address_table (i) " + "VALUES ('192.168.2.0/12')"; + protected static final String INSERT_CIDR_NETWORK_ADDRESS_TYPE_STMT = "INSERT INTO cidr_network_address_table (i) " + + "VALUES ('192.168.100.128/25');"; protected static final String INSERT_NUMERIC_TYPES_STMT = "INSERT INTO numeric_table (si, i, bi, r, db, r_int, db_int, r_nan, db_nan, r_pinf, db_pinf, r_ninf, db_ninf, ss, bs, b) " + "VALUES (1, 123456, 1234567890123, 3.3, 4.44, 3, 4, 'NaN', 'NaN', 'Infinity', 'Infinity', '-Infinity', '-Infinity', 1, 123, true)"; @@ -121,11 +123,11 @@ public abstract class AbstractRecordsProducerTest { protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " + "VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')"; - protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array) " + - "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}', '{\"192.168.2.0/12\",\"192.168.1.1\",\"192.168.0.2/1\"}')"; + protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array) " + + "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}', '{\"192.168.2.0/12\",\"192.168.1.1\",\"192.168.0.2/1\"}', '{\"192.168.100.128/25\", \"192.168.0.0/25\", \"192.168.1.0/24\"}')"; - protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array) " + - "VALUES (null, null, null, null, null, null, null, null)"; + protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array) " + + "VALUES (null, null, null, null, null, null, null, null, null)"; protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " + "VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)"; @@ -162,7 +164,7 @@ public abstract class AbstractRecordsProducerTest { protected static final Set ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_NUMERIC_DECIMAL_TYPES_STMT_NO_NAN, INSERT_DATE_TIME_TYPES_STMT, INSERT_BIN_TYPES_STMT, INSERT_GEOM_TYPES_STMT, INSERT_TEXT_TYPES_STMT, - INSERT_CASH_TYPES_STMT, INSERT_STRING_TYPES_STMT, INSERT_NETWORK_ADDRESS_TYPES_STMT, + INSERT_CASH_TYPES_STMT, INSERT_STRING_TYPES_STMT, INSERT_CIDR_NETWORK_ADDRESS_TYPE_STMT, INSERT_NETWORK_ADDRESS_TYPES_STMT, INSERT_ARRAY_TYPES_STMT, INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT, INSERT_QUOTED_TYPES_STMT, INSERT_POSTGIS_TYPES_STMT, INSERT_POSTGIS_ARRAY_TYPES_STMT)); @@ -365,6 +367,9 @@ protected List schemasAndValuesForStringTypesWithSourceColu protected List schemasAndValuesForNetworkAddressTypes() { return Arrays.asList(new SchemaAndValueField("i", Schema.OPTIONAL_STRING_SCHEMA, "192.168.2.0/12")); } + protected List schemasAndValueForCidrAddressType() { + return Arrays.asList(new SchemaAndValueField("i", Schema.OPTIONAL_STRING_SCHEMA, "192.168.100.128/25")); + } protected List schemasAndValuesForNumericTypesWithSourceColumnTypeInfo() { return Arrays.asList(new SchemaAndValueField("d", @@ -519,7 +524,10 @@ protected List schemasAndValuesForArrayTypes() { new SchemaAndValueField("citext_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), Arrays.asList("four", "five", "six")), new SchemaAndValueField("inet_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), - Arrays.asList("192.168.2.0/12", "192.168.1.1", "192.168.0.2/1")) + Arrays.asList("192.168.2.0/12", "192.168.1.1", "192.168.0.2/1")), + + new SchemaAndValueField("cidr_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), + Arrays.asList("192.168.100.128/25", "192.168.0.0/25", "192.168.1.0/24")) ); } @@ -533,7 +541,9 @@ protected List schemasAndValuesForArrayTypesWithNullValues( new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null), new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).parameter(TestHelper.PRECISION_PARAMETER_KEY, "10").optional().build()).optional().build(), null), new SchemaAndValueField("citext_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), - new SchemaAndValueField("inet_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null) + new SchemaAndValueField("inet_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), + new SchemaAndValueField("cidr_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null) + ); } @@ -627,6 +637,8 @@ protected List schemasAndValuesForTable(String insertTableS return schemasAndValuesForStringTypes(); case INSERT_NETWORK_ADDRESS_TYPES_STMT: return schemasAndValuesForNetworkAddressTypes(); + case INSERT_CIDR_NETWORK_ADDRESS_TYPE_STMT: + return schemasAndValueForCidrAddressType(); case INSERT_TEXT_TYPES_STMT: return schemasAndValuesForTextTypes(); case INSERT_ARRAY_TYPES_STMT: diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java index 8afbdef6a..9b537c006 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java @@ -53,7 +53,7 @@ public class PostgresSchemaIT { private static final String[] TEST_TABLES = new String[] { "public.numeric_table", "public.numeric_decimal_table", "public.string_table", - "public.cash_table", "public.bitbin_table", "public.network_address_table", + "public.cash_table", "public.bitbin_table", "public.network_address_table", "public.cidr_network_address_table", "public.time_table", "public.text_table", "public.geom_table", "public.tstzrange_table", "public.array_table", "\"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\"", "public.custom_table" @@ -92,6 +92,7 @@ public void shouldLoadSchemaForBuiltinPostgresTypes() throws Exception { Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); assertTableSchema("public.network_address_table", "i", Schema.OPTIONAL_STRING_SCHEMA); + assertTableSchema("public.cidr_network_address_table", "i", Schema.OPTIONAL_STRING_SCHEMA); assertTableSchema("public.cash_table", "csh", Decimal.builder(2).optional().build()); assertTableSchema("public.bitbin_table", "ba, bol, bs, bv", Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BOOLEAN_SCHEMA, Bits.builder(2).optional().build(), diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 76221ca04..18aa1f9a3 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -15,6 +15,7 @@ CREATE TABLE numeric_decimal_table (pk SERIAL, PRIMARY KEY(pk)); CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b BYTEA, bnn BYTEA NOT NULL, ct CITEXT, PRIMARY KEY(pk)); CREATE TABLE network_address_table (pk SERIAL, i INET, PRIMARY KEY(pk)); +CREATE TABLE cidr_network_address_table(pk SERIAL, i CIDR, PRIMARY KEY(pk)); CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk)); CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk)); CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIME ZONE, ts_ms TIMESTAMP(3), ts_us TIMESTAMP(6), tz TIMESTAMPTZ, date DATE, @@ -24,8 +25,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk)); CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk)); CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk)); -CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], PRIMARY KEY(pk)); -CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], PRIMARY KEY(pk)); +CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], PRIMARY KEY(pk)); +CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], PRIMARY KEY(pk)); CREATE TABLE custom_table (pk serial, lt ltree, i isbn NOT NULL, n TEXT, PRIMARY KEY(pk)); CREATE TABLE hstore_table (pk serial, hs hstore, PRIMARY KEY(pk)); CREATE TABLE hstore_table_mul (pk serial, hs hstore, PRIMARY KEY(pk));