DBZ-762 Adding support for CITEXT array columns
This commit is contained in:
parent
daeeaa74cd
commit
f9c1cfc134
@ -422,13 +422,15 @@ private Object[] columnValues(List<ReplicationMessage.Column> columns, TableId t
|
||||
List<String> columnNames = table.columnNames();
|
||||
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
|
||||
Object[] values = new Object[columns.size() < columnNames.size() ? columnNames.size() : columns.size()];
|
||||
columns.forEach(message -> {
|
||||
|
||||
for (ReplicationMessage.Column column : columns) {
|
||||
//DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names
|
||||
String columnName = Strings.unquoteIdentifierPart(message.getName());
|
||||
String columnName = Strings.unquoteIdentifierPart(column.getName());
|
||||
int position = columnNames.indexOf(columnName);
|
||||
assert position >= 0;
|
||||
values[position] = message.getValue(this::typeResolverConnection, taskContext.config().includeUnknownDatatypes());
|
||||
});
|
||||
values[position] = column.getValue(this::typeResolverConnection, taskContext.config().includeUnknownDatatypes());
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
|
@ -110,6 +110,11 @@ private List<ReplicationMessage.Column> transform(List<PgProto.DatumMessage> mes
|
||||
public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
|
||||
return PgProtoReplicationMessage.this.getValue(datum, connection, includeUnknownDatatypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return datum.toString();
|
||||
}
|
||||
};
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
@ -240,14 +245,14 @@ else if (datumMessage.hasDatumString()) {
|
||||
|
||||
default:
|
||||
PostgresType type = typeRegistry.get(columnType);
|
||||
if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() ) {
|
||||
if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid() ) {
|
||||
return datumMessage.getDatumBytes().toByteArray();
|
||||
}
|
||||
if (type.getOid() == typeRegistry.geometryArrayOid() || type.getOid() == typeRegistry.geographyArrayOid() ) {
|
||||
if (type.getOid() == typeRegistry.geometryArrayOid() || type.getOid() == typeRegistry.geographyArrayOid() || type.getOid() == typeRegistry.citextArrayOid() ) {
|
||||
return getArray(datumMessage, connection, columnType);
|
||||
}
|
||||
|
||||
// unknown datatype is sent by decoder as binary value
|
||||
// unknown data type is sent by decoder as binary value
|
||||
if (includeUnknownDatatypes && datumMessage.hasDatumBytes()) {
|
||||
return datumMessage.getDatumBytes().toByteArray();
|
||||
}
|
||||
|
@ -117,11 +117,11 @@ public abstract class AbstractRecordsProducerTest {
|
||||
protected static final String INSERT_TSTZRANGE_TYPES_STMT = "INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) " +
|
||||
"VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')";
|
||||
|
||||
protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array) " +
|
||||
"VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}')";
|
||||
protected static final String INSERT_ARRAY_TYPES_STMT = "INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array) " +
|
||||
"VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}')";
|
||||
|
||||
protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array) " +
|
||||
"VALUES (null, null, null, null, null, null)";
|
||||
protected static final String INSERT_ARRAY_TYPES_WITH_NULL_VALUES_STMT = "INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array) " +
|
||||
"VALUES (null, null, null, null, null, null, null)";
|
||||
|
||||
protected static final String INSERT_POSTGIS_TYPES_STMT = "INSERT INTO public.postgis_table (p, ml) " +
|
||||
"VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)";
|
||||
@ -383,7 +383,9 @@ protected List<SchemaAndValueField> schemasAndValuesForArrayTypes() {
|
||||
new BigDecimal("5.60")
|
||||
)),
|
||||
new SchemaAndValueField("varnumeric_array", SchemaBuilder.array(VariableScaleDecimal.builder().optional().build()).optional().build(),
|
||||
varnumArray)
|
||||
varnumArray),
|
||||
new SchemaAndValueField("citext_array", SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA).optional().build(),
|
||||
Arrays.asList("four", "five", "six"))
|
||||
);
|
||||
}
|
||||
|
||||
@ -395,7 +397,8 @@ protected List<SchemaAndValueField> schemasAndValuesForArrayTypesWithNullValues(
|
||||
new SchemaAndValueField("char_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null),
|
||||
new SchemaAndValueField("varchar_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null),
|
||||
new SchemaAndValueField("date_array", SchemaBuilder.array(Date.builder().optional().schema()).optional().build(), null),
|
||||
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null)
|
||||
new SchemaAndValueField("numeric_array", SchemaBuilder.array(Decimal.builder(2).optional().build()).optional().build(), null),
|
||||
new SchemaAndValueField("citext_array", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), null)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -23,8 +23,8 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM
|
||||
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));
|
||||
CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
|
||||
CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk));
|
||||
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk));
|
||||
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk));
|
||||
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk));
|
||||
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], PRIMARY KEY(pk));
|
||||
CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, ct CITEXT, PRIMARY KEY(pk));
|
||||
|
||||
DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE;
|
||||
|
Loading…
Reference in New Issue
Block a user