diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index e37c0024b..38d234ced 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -422,13 +422,15 @@ private Object[] columnValues(List columns, TableId t List columnNames = table.columnNames(); // JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT Object[] values = new Object[columns.size() < columnNames.size() ? columnNames.size() : columns.size()]; - columns.forEach(message -> { + + for (ReplicationMessage.Column column : columns) { //DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names - String columnName = Strings.unquoteIdentifierPart(message.getName()); + String columnName = Strings.unquoteIdentifierPart(column.getName()); int position = columnNames.indexOf(columnName); assert position >= 0; - values[position] = message.getValue(this::typeResolverConnection, taskContext.config().includeUnknownDatatypes()); - }); + values[position] = column.getValue(this::typeResolverConnection, taskContext.config().includeUnknownDatatypes()); + } + return values; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java index 23e483acb..6412ef6ab 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgproto/PgProtoReplicationMessage.java @@ -110,6 +110,11 @@ private List transform(List mes public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) { return PgProtoReplicationMessage.this.getValue(datum, connection, includeUnknownDatatypes); } + + @Override + public String toString() { + return datum.toString(); + } }; }) .collect(Collectors.toList()); @@ -240,14 +245,14 @@ else if (datumMessage.hasDatumString()) { default: PostgresType type = typeRegistry.get(columnType); - if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() ) { + if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid() ) { return datumMessage.getDatumBytes().toByteArray(); } - if (type.getOid() == typeRegistry.geometryArrayOid() || type.getOid() == typeRegistry.geographyArrayOid() ) { + if (type.getOid() == typeRegistry.geometryArrayOid() || type.getOid() == typeRegistry.geographyArrayOid() || type.getOid() == typeRegistry.citextArrayOid() ) { return getArray(datumMessage, connection, columnType); } - // unknown datatype is sent by decoder as binary value + // unknown data type is sent by decoder as binary value if (includeUnknownDatatypes && datumMessage.hasDatumBytes()) { return datumMessage.getDatumBytes().toByteArray(); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java index 910d83073..b7447f9a0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/AbstractRecordsProducerTest.java @@ -117,11 +117,11 @@ public abstract class AbstractRecordsProducerTest { protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " + "VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')"; - protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array) " + - "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}')"; + protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array) " + + "VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}')"; - protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array) " + - "VALUES (null, null, null, null, null, null)"; + protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array) " + + "VALUES (null, null, null, null, null, null, null)"; protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " + "VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)"; @@ -383,7 +383,9 @@ protected List schemasAndValuesForArrayTypes() { new BigDecimal("5.60") )), new SchemaAndValueField("varnumeric_array", SchemaBuilder.array(VariableScaleDecimal.builder().optional().build()).optional().build(), - varnumArray) + varnumArray), + new SchemaAndValueField("citext_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(), + Arrays.asList("four", "five", "six")) ); } @@ -395,7 +397,8 @@ protected List schemasAndValuesForArrayTypesWithNullValues( new SchemaAndValueField("char_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), new SchemaAndValueField("varchar_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null), new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null), - new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null) + new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null), + new SchemaAndValueField("citext_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null) ); } diff --git a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl index 28cb7dea1..c3527c1dc 100644 --- a/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl +++ b/debezium-connector-postgres/src/test/resources/postgres_create_tables.ddl @@ -23,8 +23,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk)); CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk)); CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk)); -CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); -CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); +CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk)); +CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk)); CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, ct CITEXT, PRIMARY KEY(pk)); DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE;