DBZ-762 Added support for CITEXT column type

This commit is contained in:
Olavi Mustanoja 2018-06-25 06:02:33 +03:00 committed by Gunnar Morling
parent 4f2e7d24b8
commit c401a98166
6 changed files with 72 additions and 9 deletions

View File

@ -99,6 +99,7 @@ protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode
@Override @Override
public SchemaBuilder schemaBuilder(Column column) { public SchemaBuilder schemaBuilder(Column column) {
int oidValue = column.nativeType(); int oidValue = column.nativeType();
switch (oidValue) { switch (oidValue) {
case PgOid.BIT: case PgOid.BIT:
case PgOid.BIT_ARRAY: case PgOid.BIT_ARRAY:
@ -180,12 +181,18 @@ public SchemaBuilder schemaBuilder(Column column) {
else if (oidValue == typeRegistry.geographyOid()) { else if (oidValue == typeRegistry.geographyOid()) {
return Geography.builder(); return Geography.builder();
} }
else if (oidValue == typeRegistry.citextOid()) {
return SchemaBuilder.string();
}
else if (oidValue == typeRegistry.geometryArrayOid()) { else if (oidValue == typeRegistry.geometryArrayOid()) {
return SchemaBuilder.array(Geometry.builder().optional().build()); return SchemaBuilder.array(Geometry.builder().optional().build());
} }
else if (oidValue == typeRegistry.geographyArrayOid()) { else if (oidValue == typeRegistry.geographyArrayOid()) {
return SchemaBuilder.array(Geography.builder().optional().build()); return SchemaBuilder.array(Geography.builder().optional().build());
} }
else if (oidValue == typeRegistry.citextArrayOid()) {
return SchemaBuilder.array(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
}
final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column); final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
if (jdbcSchemaBuilder == null) { if (jdbcSchemaBuilder == null) {
return includeUnknownDatatypes ? SchemaBuilder.bytes() : null; return includeUnknownDatatypes ? SchemaBuilder.bytes() : null;
@ -275,7 +282,10 @@ public ValueConverter converter(Column column, Field fieldDefn) {
else if (oidValue == typeRegistry.geographyOid()) { else if (oidValue == typeRegistry.geographyOid()) {
return data -> convertGeography(column, fieldDefn, data); return data -> convertGeography(column, fieldDefn, data);
} }
else if (oidValue == typeRegistry.geometryArrayOid() || oidValue == typeRegistry.geographyArrayOid()) { else if (oidValue == typeRegistry.citextOid()) {
return data -> convertCitext(column, fieldDefn, data);
}
else if (oidValue == typeRegistry.geometryArrayOid() || oidValue == typeRegistry.geographyArrayOid() || oidValue == typeRegistry.citextArrayOid()) {
return createArrayConverter(column, fieldDefn); return createArrayConverter(column, fieldDefn);
} }
final ValueConverter jdbcConverter = super.converter(column, fieldDefn); final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
@ -544,6 +554,27 @@ protected Object convertGeography(Column column, Field fieldDefn, Object data) {
return handleUnknownData(column, fieldDefn, data); return handleUnknownData(column, fieldDefn, data);
} }
protected Object convertCitext(Column column, Field fieldDefn, Object data) {
if (data == null) {
data = fieldDefn.schema().defaultValue();
}
if (data == null) {
if (column.isOptional()) return null;
return "";
}
if (data instanceof byte[]) {
return new String((byte[]) data);
} else if (data instanceof String) {
return data;
} else if (data instanceof PGobject) {
return ((PGobject) data).getValue();
}
return handleUnknownData(column, fieldDefn, data);
}
/** /**
* Converts a value representing a Postgres point for a column, to a Kafka Connect value. * Converts a value representing a Postgres point for a column, to a Kafka Connect value.
* *

View File

@ -26,8 +26,10 @@ public class TypeRegistry {
public static final String TYPE_NAME_GEOGRAPHY = "geography"; public static final String TYPE_NAME_GEOGRAPHY = "geography";
public static final String TYPE_NAME_GEOMETRY = "geometry"; public static final String TYPE_NAME_GEOMETRY = "geometry";
public static final String TYPE_NAME_CITEXT = "citext";
public static final String TYPE_NAME_GEOGRAPHY_ARRAY = "_geography"; public static final String TYPE_NAME_GEOGRAPHY_ARRAY = "_geography";
public static final String TYPE_NAME_GEOMETRY_ARRAY = "_geometry"; public static final String TYPE_NAME_GEOMETRY_ARRAY = "_geometry";
public static final String TYPE_NAME_CITEXT_ARRAY = "_citext";
public static final int NO_TYPE_MODIFIER = -1; public static final int NO_TYPE_MODIFIER = -1;
public static final int UNKNOWN_LENGTH = -1; public static final int UNKNOWN_LENGTH = -1;
@ -63,8 +65,10 @@ public static final class Builder {
private final Map<Integer, PostgresType> oidToType = new HashMap<>(); private final Map<Integer, PostgresType> oidToType = new HashMap<>();
private int geometryOid = Integer.MIN_VALUE; private int geometryOid = Integer.MIN_VALUE;
private int geographyOid = Integer.MIN_VALUE; private int geographyOid = Integer.MIN_VALUE;
private int citextOid = Integer.MIN_VALUE;
private int geometryArrayOid = Integer.MIN_VALUE; private int geometryArrayOid = Integer.MIN_VALUE;
private int geographyArrayOid = Integer.MIN_VALUE; private int geographyArrayOid = Integer.MIN_VALUE;
private int citextArrayOid = Integer.MIN_VALUE;
private Builder() { private Builder() {
} }
@ -86,12 +90,18 @@ public Builder addType(PostgresType type) {
else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) { else if (TYPE_NAME_GEOGRAPHY.equals(type.getName())) {
geographyOid = type.getOid(); geographyOid = type.getOid();
} }
else if (TYPE_NAME_CITEXT.equals(type.getName())) {
citextOid = type.getOid();
}
else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) { else if (TYPE_NAME_GEOMETRY_ARRAY.equals(type.getName())) {
geometryArrayOid = type.getOid(); geometryArrayOid = type.getOid();
} }
else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) { else if (TYPE_NAME_GEOGRAPHY_ARRAY.equals(type.getName())) {
geographyArrayOid = type.getOid(); geographyArrayOid = type.getOid();
} }
else if (TYPE_NAME_CITEXT_ARRAY.equals(type.getName())) {
citextArrayOid = type.getOid();
}
return this; return this;
} }
@ -109,7 +119,7 @@ public PostgresType get(int oid) {
* @return initialized type registry * @return initialized type registry
*/ */
public TypeRegistry build() { public TypeRegistry build() {
return new TypeRegistry(nameToType, oidToType, geometryOid, geographyOid, geometryArrayOid, geographyArrayOid); return new TypeRegistry(nameToType, oidToType, geometryOid, geographyOid, citextOid, geometryArrayOid, geographyArrayOid, citextArrayOid);
} }
} }
@ -121,18 +131,22 @@ public static Builder create(TypeInfo typeInfo) {
private final Map<Integer, PostgresType> oidToType; private final Map<Integer, PostgresType> oidToType;
private final int geometryOid; private final int geometryOid;
private final int geographyOid; private final int geographyOid;
private final int citextOid;
private final int geometryArrayOid; private final int geometryArrayOid;
private final int geographyArrayOid; private final int geographyArrayOid;
private final int citextArrayOid;
private TypeRegistry(Map<String, PostgresType> nameToType, Map<Integer, PostgresType> oidToType, private TypeRegistry(Map<String, PostgresType> nameToType, Map<Integer, PostgresType> oidToType,
int geometryOid, int geographyOid, int geometryArrayOid, int geographyArrayOid) { int geometryOid, int geographyOid, int citextOid, int geometryArrayOid, int geographyArrayOid, int citextArrayOid) {
this.nameToType = Collections.unmodifiableMap(nameToType); this.nameToType = Collections.unmodifiableMap(nameToType);
this.oidToType = Collections.unmodifiableMap(oidToType); this.oidToType = Collections.unmodifiableMap(oidToType);
this.geometryOid = geometryOid; this.geometryOid = geometryOid;
this.geographyOid = geographyOid; this.geographyOid = geographyOid;
this.citextOid = citextOid;
this.geometryArrayOid = geometryArrayOid; this.geometryArrayOid = geometryArrayOid;
this.geographyArrayOid = geographyArrayOid; this.geographyArrayOid = geographyArrayOid;
this.citextArrayOid = citextArrayOid;
} }
/** /**
@ -197,6 +211,14 @@ public int geographyOid() {
return geographyOid; return geographyOid;
} }
/**
*
* @return OID for {@code CITEXT} type of this PostgreSQL instance
*/
public int citextOid() {
return citextOid;
}
/** /**
* *
* @return OID for array of {@code GEOMETRY} type of this PostgreSQL instance * @return OID for array of {@code GEOMETRY} type of this PostgreSQL instance
@ -213,6 +235,14 @@ public int geographyArrayOid() {
return geographyArrayOid; return geographyArrayOid;
} }
/**
*
* @return OID for array of {@code CITEXT} type of this PostgreSQL instance
*/
public int citextArrayOid() {
return citextArrayOid;
}
/** /**
* Converts a type name in long (readable) format like <code>boolean</code> to s standard * Converts a type name in long (readable) format like <code>boolean</code> to s standard
* data type name like <code>bool</code>. * data type name like <code>bool</code>.

View File

@ -135,8 +135,8 @@ public abstract class AbstractRecordsProducerTest {
protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " + protected static final String INSERT_QUOTED_TYPES_STMT = "INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") " +
"VALUES ('some text')"; "VALUES ('some text')";
protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n) " + protected static final String INSERT_CUSTOM_TYPES_STMT = "INSERT INTO custom_table (lt, i, n, ct) " +
"VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL)"; "VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, 'Hello World')";
protected static final Set<String> ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_NUMERIC_DECIMAL_TYPES_STMT_NO_NAN, protected static final Set<String> ALL_STMTS = new HashSet<>(Arrays.asList(INSERT_NUMERIC_TYPES_STMT, INSERT_NUMERIC_DECIMAL_TYPES_STMT_NO_NAN,
INSERT_DATE_TIME_TYPES_STMT, INSERT_DATE_TIME_TYPES_STMT,
@ -467,7 +467,8 @@ protected List<SchemaAndValueField> schemasAndValuesForNumericTypesUsingStringEn
protected List<SchemaAndValueField> schemasAndValuesForCustomTypes() { protected List<SchemaAndValueField> schemasAndValuesForCustomTypes() {
return Arrays.asList(new SchemaAndValueField("lt", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("Top.Collections.Pictures.Astronomy.Galaxies".getBytes())), return Arrays.asList(new SchemaAndValueField("lt", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("Top.Collections.Pictures.Astronomy.Galaxies".getBytes())),
new SchemaAndValueField("i", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())), new SchemaAndValueField("i", Schema.OPTIONAL_BYTES_SCHEMA, ByteBuffer.wrap("0-393-04002-X".getBytes())),
new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null)); new SchemaAndValueField("n", Schema.OPTIONAL_STRING_SCHEMA, null),
new SchemaAndValueField("ct", Schema.OPTIONAL_STRING_SCHEMA, "Hello World"));
} }

View File

@ -127,8 +127,8 @@ public void shouldLoadSchemaForExtensionPostgresTypes() throws Exception {
try (PostgresConnection connection = TestHelper.create()) { try (PostgresConnection connection = TestHelper.create()) {
schema.refresh(connection, false); schema.refresh(connection, false);
assertTablesIncluded(TEST_TABLES); assertTablesIncluded(TEST_TABLES);
assertTableSchema("public.custom_table", "lt, i", assertTableSchema("public.custom_table", "lt, i, ct",
Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA); Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_BYTES_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
} }
} }

View File

@ -7,3 +7,4 @@ CREATE SCHEMA public;
-- load contrib extensions for testing non-builtin types -- load contrib extensions for testing non-builtin types
CREATE EXTENSION IF NOT EXISTS ltree SCHEMA public; CREATE EXTENSION IF NOT EXISTS ltree SCHEMA public;
CREATE EXTENSION IF NOT EXISTS isn SCHEMA public; CREATE EXTENSION IF NOT EXISTS isn SCHEMA public;
CREATE EXTENSION IF NOT EXISTS citext SCHEMA public;

View File

@ -25,7 +25,7 @@ CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk)); CREATE TABLE tstzrange_table (pk serial, unbounded_exclusive_range tstzrange, bounded_inclusive_range tstzrange, PRIMARY KEY(pk));
CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); CREATE TABLE array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk));
CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk)); CREATE TABLE array_table_with_nulls (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], PRIMARY KEY(pk));
CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, PRIMARY KEY(pk)); CREATE TABLE custom_table (pk serial, lt ltree, i isbn, n TEXT, ct CITEXT, PRIMARY KEY(pk));
DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE; DROP SCHEMA IF EXISTS "Quoted_"" . Schema" CASCADE;
CREATE SCHEMA "Quoted_"" . Schema"; CREATE SCHEMA "Quoted_"" . Schema";