DBZ-1413 Align domain type support across all 3 decoders
This commit is contained in:
parent
9586be7d4e
commit
547ba8e0db
@ -102,19 +102,30 @@ public PostgresType getBaseType() {
|
||||
*
|
||||
* @return the default length of the type
|
||||
*/
|
||||
public int getDefaultLength() {
|
||||
if (typeInfo == null) {
|
||||
return TypeRegistry.UNKNOWN_LENGTH;
|
||||
}
|
||||
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER && baseType != null) {
|
||||
return baseType.getDefaultLength();
|
||||
}
|
||||
int size = typeInfo.getPrecision(oid, modifiers);
|
||||
if (size == 0) {
|
||||
size = typeInfo.getDisplaySize(oid, modifiers);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
public int getDefaultLength() {
|
||||
if (typeInfo == null) {
|
||||
return TypeRegistry.UNKNOWN_LENGTH;
|
||||
}
|
||||
if (baseType != null) {
|
||||
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) {
|
||||
return baseType.getDefaultLength();
|
||||
}
|
||||
else {
|
||||
int size = typeInfo.getPrecision(baseType.getOid(), modifiers);
|
||||
if (size == 0) {
|
||||
size = typeInfo.getDisplaySize(baseType.getOid(), modifiers);
|
||||
}
|
||||
if (size != 0 && size != Integer.MAX_VALUE) {
|
||||
return size;
|
||||
}
|
||||
}
|
||||
}
|
||||
int size = typeInfo.getPrecision(oid, modifiers);
|
||||
if (size == 0) {
|
||||
size = typeInfo.getDisplaySize(oid, modifiers);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
@ -124,8 +135,13 @@ public int getDefaultScale() {
|
||||
if (typeInfo == null) {
|
||||
return TypeRegistry.UNKNOWN_LENGTH;
|
||||
}
|
||||
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER && baseType != null) {
|
||||
return baseType.getDefaultScale();
|
||||
if (baseType != null) {
|
||||
if (modifiers == TypeRegistry.NO_TYPE_MODIFIER) {
|
||||
return baseType.getDefaultScale();
|
||||
}
|
||||
else {
|
||||
return typeInfo.getScale(baseType.getOid(), modifiers);
|
||||
}
|
||||
}
|
||||
return typeInfo.getScale(oid, modifiers);
|
||||
}
|
||||
|
@ -12,7 +12,6 @@
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Timestamp;
|
||||
import java.sql.Types;
|
||||
import java.time.Duration;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.OffsetTime;
|
||||
@ -283,16 +282,6 @@ else if (oidValue == typeRegistry.ltreeArrayOid()) {
|
||||
return SchemaBuilder.array(Ltree.builder().optional().build());
|
||||
}
|
||||
|
||||
if (column.jdbcType() == Types.DISTINCT) {
|
||||
final PostgresType domainType = typeRegistry.get(oidValue);
|
||||
if (domainType != null) {
|
||||
final PostgresType baseType = domainType.getBaseType();
|
||||
if (baseType != null) {
|
||||
return schemaBuilder(baseType.getOid(), column);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final SchemaBuilder jdbcSchemaBuilder = super.schemaBuilder(column);
|
||||
if (jdbcSchemaBuilder == null) {
|
||||
return includeUnknownDatatypes ? SchemaBuilder.bytes() : null;
|
||||
@ -437,17 +426,6 @@ else if (oidValue == typeRegistry.geometryArrayOid() ||
|
||||
return createArrayConverter(column, fieldDefn);
|
||||
}
|
||||
|
||||
if (column.jdbcType() == Types.DISTINCT) {
|
||||
// The driver represents domain types as 'd', DISTINCT
|
||||
final PostgresType domainType = typeRegistry.get(oidValue);
|
||||
if (domainType != null) {
|
||||
final PostgresType baseType = domainType.getBaseType();
|
||||
if (baseType != null) {
|
||||
return converter(baseType.getOid(), column, fieldDefn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final ValueConverter jdbcConverter = super.converter(column, fieldDefn);
|
||||
if (jdbcConverter == null) {
|
||||
return includeUnknownDatatypes ? data -> convertBinary(column, fieldDefn, data) : null;
|
||||
@ -935,4 +913,54 @@ protected Object convertString(Column column, Field fieldDefn, Object data) {
|
||||
}
|
||||
return super.convertString(column, fieldDefn, data);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param column
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
protected SchemaBuilder distinctSchema(Column column) {
|
||||
return schemaBuilder(getColumnWithDomainJdbcType(column, typeRegistry.get(column.nativeType())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides a ValueConverter that properly resolves the domain type to base type for data of a given column
|
||||
*
|
||||
* @param column the column definition; never null
|
||||
* @param fieldDefn the field definition; never null
|
||||
* @return the value converter to convert the supplied data
|
||||
*/
|
||||
@Override
|
||||
protected ValueConverter convertDistinct(Column column, Field fieldDefn) {
|
||||
return converter(getColumnWithDomainJdbcType(column, typeRegistry.get(column.nativeType())), fieldDefn);
|
||||
}
|
||||
|
||||
/**
|
||||
* For a given column and type, traverse type hierarchy and return a column based on base type's JDBC type
|
||||
*
|
||||
* @param column the column
|
||||
* @param postgresType the column's postgres type
|
||||
* @return A new {@link Column} instance with appropriate native JDBC type
|
||||
*/
|
||||
private static Column getColumnWithDomainJdbcType(Column column, PostgresType postgresType) {
|
||||
PostgresType baseType = postgresType;
|
||||
while (!baseType.isBaseType()) {
|
||||
baseType = baseType.getBaseType();
|
||||
}
|
||||
|
||||
// This is necessary for situations where PostgresValueConverter delegates schema and converter resolution
|
||||
// to JdbcValueConverters where the resolution is based on the column's jdbcType. For columns that use
|
||||
// domain alias types, this is the OID to the alias type, not the actual base type.
|
||||
//
|
||||
// For example:
|
||||
// CREATE DOMAIN bool2 bool default false;
|
||||
// CREATE TABLE (pk serial, data bool2 not null);
|
||||
//
|
||||
// This guarantees that when the data column's schema and value converter is resolved, it's based on the
|
||||
// fact the resolved base type is bool, not some oid that resolves to an unhandled type.
|
||||
//
|
||||
// Perhaps there are better ways - TBD.
|
||||
return column.edit().jdbcType(baseType.getJdbcId()).nativeType(baseType.getOid()).create();
|
||||
}
|
||||
}
|
||||
|
@ -325,8 +325,7 @@ private void prime() {
|
||||
modifiers,
|
||||
typeInfo,
|
||||
baseType,
|
||||
null
|
||||
));
|
||||
null));
|
||||
}
|
||||
}
|
||||
|
||||
@ -352,8 +351,7 @@ private void prime() {
|
||||
modifiers,
|
||||
typeInfo,
|
||||
baseType,
|
||||
get((int) rs.getLong("element"))
|
||||
));
|
||||
get((int) rs.getLong("element"))));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -376,9 +374,9 @@ private PostgresType resolveUnknownType(String name) {
|
||||
final TypeInfo typeInfo = ((BaseConnection) connection).getTypeInfo();
|
||||
final SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(connection, typeInfo);
|
||||
|
||||
try(final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_NAME_LOOKUP)) {
|
||||
try (final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_NAME_LOOKUP)) {
|
||||
statement.setString(1, name);
|
||||
try(final ResultSet rs = statement.executeQuery()) {
|
||||
try (final ResultSet rs = statement.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
final int oid = (int) rs.getLong("oid");
|
||||
final int baseOid = (int) rs.getLong("baseoid");
|
||||
@ -397,8 +395,7 @@ private PostgresType resolveUnknownType(String name) {
|
||||
modifiers,
|
||||
typeInfo,
|
||||
baseType,
|
||||
null
|
||||
);
|
||||
null);
|
||||
|
||||
addType(result);
|
||||
return result;
|
||||
@ -420,9 +417,9 @@ private PostgresType resolveUnknownType(int lookupOid) {
|
||||
final TypeInfo typeInfo = ((BaseConnection) connection).getTypeInfo();
|
||||
final SqlTypeMapper sqlTypeMapper = new SqlTypeMapper(connection, typeInfo);
|
||||
|
||||
try(final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_OID_LOOKUP)) {
|
||||
try (final PreparedStatement statement = connection.prepareStatement(SQL_NON_ARRAY_TYPE_OID_LOOKUP)) {
|
||||
statement.setLong(1, lookupOid);
|
||||
try(final ResultSet rs = statement.executeQuery()) {
|
||||
try (final ResultSet rs = statement.executeQuery()) {
|
||||
while (rs.next()) {
|
||||
final int oid = (int) rs.getLong("oid");
|
||||
final int baseOid = (int) rs.getLong("baseoid");
|
||||
@ -441,8 +438,7 @@ private PostgresType resolveUnknownType(int lookupOid) {
|
||||
modifiers,
|
||||
typeInfo,
|
||||
baseType,
|
||||
null
|
||||
);
|
||||
null);
|
||||
|
||||
addType(result);
|
||||
return result;
|
||||
@ -511,7 +507,7 @@ public int getSqlType(String typeName) throws SQLException {
|
||||
try {
|
||||
return sqlTypesByPgTypeNames.get(typeName);
|
||||
}
|
||||
catch(Exception e) {
|
||||
catch (Exception e) {
|
||||
LOGGER.warn("Failed to obtain SQL type information for type {} via custom statement, falling back to TypeInfo#getSQLType()", typeName, e);
|
||||
return typeInfo.getSQLType(typeName);
|
||||
}
|
||||
|
@ -7,8 +7,6 @@
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalTime;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.OffsetTime;
|
||||
import java.time.ZoneOffset;
|
||||
@ -21,11 +19,15 @@
|
||||
import org.postgresql.geometric.PGpath;
|
||||
import org.postgresql.geometric.PGpoint;
|
||||
import org.postgresql.geometric.PGpolygon;
|
||||
import org.postgresql.jdbc.PgArray;
|
||||
import org.postgresql.util.PGInterval;
|
||||
import org.postgresql.util.PGmoney;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
|
||||
|
||||
/**
|
||||
@ -36,12 +38,17 @@ public abstract class AbstractColumnValue<T> implements ReplicationMessage.Colum
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractColumnValue.class);
|
||||
|
||||
@Override
|
||||
public LocalDate asLocalDate() {
|
||||
public Object asLocalDate() {
|
||||
return DateTimeFormat.get().date(asString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public LocalTime asLocalTime() {
|
||||
public Object asTime() {
|
||||
return asString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asLocalTime() {
|
||||
return DateTimeFormat.get().time(asString());
|
||||
}
|
||||
|
||||
@ -61,7 +68,7 @@ public Instant asInstant() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGbox asBox() {
|
||||
public Object asBox() {
|
||||
try {
|
||||
return new PGbox(asString());
|
||||
}
|
||||
@ -72,7 +79,7 @@ public PGbox asBox() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGcircle asCircle() {
|
||||
public Object asCircle() {
|
||||
try {
|
||||
return new PGcircle(asString());
|
||||
}
|
||||
@ -83,7 +90,7 @@ public PGcircle asCircle() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGInterval asInterval() {
|
||||
public Object asInterval() {
|
||||
try {
|
||||
return new PGInterval(asString());
|
||||
}
|
||||
@ -94,7 +101,7 @@ public PGInterval asInterval() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGline asLine() {
|
||||
public Object asLine() {
|
||||
try {
|
||||
return new PGline(asString());
|
||||
}
|
||||
@ -105,7 +112,7 @@ public PGline asLine() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGlseg asLseg() {
|
||||
public Object asLseg() {
|
||||
try {
|
||||
return new PGlseg(asString());
|
||||
}
|
||||
@ -116,7 +123,7 @@ public PGlseg asLseg() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGmoney asMoney() {
|
||||
public Object asMoney() {
|
||||
try {
|
||||
return new PGmoney(asString());
|
||||
}
|
||||
@ -127,7 +134,7 @@ public PGmoney asMoney() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGpath asPath() {
|
||||
public Object asPath() {
|
||||
try {
|
||||
return new PGpath(asString());
|
||||
}
|
||||
@ -138,7 +145,7 @@ public PGpath asPath() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGpoint asPoint() {
|
||||
public Object asPoint() {
|
||||
try {
|
||||
return new PGpoint(asString());
|
||||
}
|
||||
@ -149,7 +156,7 @@ public PGpoint asPoint() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public PGpolygon asPolygon() {
|
||||
public Object asPolygon() {
|
||||
try {
|
||||
return new PGpolygon(asString());
|
||||
}
|
||||
@ -158,4 +165,34 @@ public PGpolygon asPolygon() {
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isArray(PostgresType type) {
|
||||
return type.isArrayType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection) {
|
||||
try {
|
||||
final String dataString = asString();
|
||||
return new PgArray(connection.get(), type.getOid(), dataString);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes,
|
||||
PgConnectionSupplier connection) {
|
||||
if (includeUnknownDatatypes) {
|
||||
// this includes things like PostGIS geoemetries or other custom types
|
||||
// leave up to the downstream message recipient to deal with
|
||||
LOGGER.debug("processing column '{}' with unknown data type '{}' as byte array", columnName, fullType);
|
||||
return asString();
|
||||
}
|
||||
LOGGER.debug("Unknown column type {} for column {} – ignoring", fullType, columnName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -7,25 +7,13 @@
|
||||
package io.debezium.connector.postgresql.connection;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalTime;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.OffsetTime;
|
||||
import java.util.List;
|
||||
|
||||
import org.postgresql.geometric.PGbox;
|
||||
import org.postgresql.geometric.PGcircle;
|
||||
import org.postgresql.geometric.PGline;
|
||||
import org.postgresql.geometric.PGlseg;
|
||||
import org.postgresql.geometric.PGpath;
|
||||
import org.postgresql.geometric.PGpoint;
|
||||
import org.postgresql.geometric.PGpolygon;
|
||||
import org.postgresql.util.PGInterval;
|
||||
import org.postgresql.util.PGmoney;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
|
||||
/**
|
||||
* An abstract representation of a replication message that is sent by a PostgreSQL logical decoding plugin and
|
||||
@ -93,37 +81,45 @@ public interface ColumnValue<T> {
|
||||
|
||||
Double asDouble();
|
||||
|
||||
SpecialValueDecimal asDecimal();
|
||||
Object asDecimal();
|
||||
|
||||
LocalDate asLocalDate();
|
||||
Object asLocalDate();
|
||||
|
||||
OffsetDateTime asOffsetDateTimeAtUtc();
|
||||
|
||||
Instant asInstant();
|
||||
|
||||
LocalTime asLocalTime();
|
||||
Object asTime();
|
||||
|
||||
Object asLocalTime();
|
||||
|
||||
OffsetTime asOffsetTimeUtc();
|
||||
|
||||
byte[] asByteArray();
|
||||
|
||||
PGbox asBox();
|
||||
Object asBox();
|
||||
|
||||
PGcircle asCircle();
|
||||
Object asCircle();
|
||||
|
||||
PGInterval asInterval();
|
||||
Object asInterval();
|
||||
|
||||
PGline asLine();
|
||||
Object asLine();
|
||||
|
||||
PGlseg asLseg();
|
||||
Object asLseg();
|
||||
|
||||
PGmoney asMoney();
|
||||
Object asMoney();
|
||||
|
||||
PGpath asPath();
|
||||
Object asPath();
|
||||
|
||||
PGpoint asPoint();
|
||||
Object asPoint();
|
||||
|
||||
PGpolygon asPolygon();
|
||||
Object asPolygon();
|
||||
|
||||
boolean isArray(PostgresType type);
|
||||
|
||||
Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection);
|
||||
|
||||
Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes, PgConnectionSupplier connection);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -5,14 +5,13 @@
|
||||
*/
|
||||
package io.debezium.connector.postgresql.connection;
|
||||
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.postgresql.jdbc.PgArray;
|
||||
import org.postgresql.util.PGmoney;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessage.ColumnValue;
|
||||
|
||||
/**
|
||||
@ -31,28 +30,22 @@ public class ReplicationMessageColumnValueResolver {
|
||||
* @param value the column value
|
||||
* @param connection a postgres connection supplier
|
||||
* @param includeUnknownDatatypes true to include unknown data types, false otherwise
|
||||
* @param typeRegistry the postgres type registry
|
||||
* @return
|
||||
*/
|
||||
public static Object resolveValue(String columnName, PostgresType type, String fullType, ColumnValue value, final PgConnectionSupplier connection,
|
||||
boolean includeUnknownDatatypes) {
|
||||
boolean includeUnknownDatatypes, TypeRegistry typeRegistry) {
|
||||
if (value.isNull()) {
|
||||
// nulls are null
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!type.isBaseType()) {
|
||||
return resolveValue(columnName, type.getBaseType(), fullType, value, connection, includeUnknownDatatypes);
|
||||
return resolveValue(columnName, type.getBaseType(), fullType, value, connection, includeUnknownDatatypes, typeRegistry);
|
||||
}
|
||||
|
||||
if (type.isArrayType()) {
|
||||
try {
|
||||
final String dataString = value.asString();
|
||||
return new PgArray(connection.get(), type.getOid(), dataString);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray ({}) column '{}', {}", fullType, columnName, e);
|
||||
}
|
||||
return null;
|
||||
if (value.isArray(type)) {
|
||||
return value.asArray(columnName, type, fullType, connection);
|
||||
}
|
||||
|
||||
switch (type.getName()) {
|
||||
@ -114,7 +107,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f
|
||||
return value.asInstant();
|
||||
|
||||
case "time":
|
||||
return value.asString();
|
||||
return value.asTime();
|
||||
|
||||
case "time without time zone":
|
||||
return value.asLocalTime();
|
||||
@ -140,7 +133,8 @@ public static Object resolveValue(String columnName, PostgresType type, String f
|
||||
case "lseg":
|
||||
return value.asLseg();
|
||||
case "money":
|
||||
return value.asMoney().val;
|
||||
final Object v = value.asMoney();
|
||||
return (v instanceof PGmoney) ? ((PGmoney) v).val : v;
|
||||
case "path":
|
||||
return value.asPath();
|
||||
case "point":
|
||||
@ -185,14 +179,6 @@ public static Object resolveValue(String columnName, PostgresType type, String f
|
||||
break;
|
||||
}
|
||||
|
||||
if (includeUnknownDatatypes) {
|
||||
// this includes things like PostGIS geometries or other custom types.
|
||||
// leave up to the downstream message recipient to deal with.
|
||||
LOGGER.debug("processing column '{}' with unknown data type '{}' as byte array", columnName,
|
||||
fullType);
|
||||
return value.asString();
|
||||
}
|
||||
LOGGER.debug("Unknown column type {} for column {} – ignoring", fullType, columnName);
|
||||
return null;
|
||||
return value.asDefault(typeRegistry, type.getOid(), columnName, fullType, includeUnknownDatatypes, connection);
|
||||
}
|
||||
}
|
||||
|
@ -563,7 +563,8 @@ private static List<Column> resolveColumnsFromStreamTupleData(ByteBuffer buffer,
|
||||
new AbstractReplicationMessageColumn(columnName, columnType, typeExpression, optional, true) {
|
||||
@Override
|
||||
public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
|
||||
return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes);
|
||||
return PgOutputReplicationMessage.getValue(columnName, columnType, typeExpression, valueStr, connection, includeUnknownDatatypes,
|
||||
typeRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessage;
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver;
|
||||
|
||||
@ -90,8 +91,8 @@ public boolean shouldSchemaBeSynchronized() {
|
||||
* @return the value; may be null
|
||||
*/
|
||||
public static Object getValue(String columnName, PostgresType type, String fullType, String rawValue, final PgConnectionSupplier connection,
|
||||
boolean includeUnknownDataTypes) {
|
||||
boolean includeUnknownDataTypes, TypeRegistry typeRegistry) {
|
||||
final PgOutputColumnValue columnValue = new PgOutputColumnValue(rawValue);
|
||||
return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes);
|
||||
return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDataTypes, typeRegistry);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,364 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.connector.postgresql.connection.pgproto;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalTime;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.OffsetTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.postgresql.geometric.PGpoint;
|
||||
import org.postgresql.jdbc.PgArray;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.postgresql.PgOid;
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.PostgresValueConverter;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
import io.debezium.connector.postgresql.connection.AbstractColumnValue;
|
||||
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
|
||||
import io.debezium.connector.postgresql.proto.PgProto;
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.time.Conversions;
|
||||
|
||||
/**
|
||||
* @author Chris Cranford
|
||||
*/
|
||||
public class PgProtoColumnValue extends AbstractColumnValue<PgProto.DatumMessage> {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PgProtoColumnValue.class);
|
||||
|
||||
private PgProto.DatumMessage value;
|
||||
|
||||
public PgProtoColumnValue(PgProto.DatumMessage value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PgProto.DatumMessage getRawValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isNull() {
|
||||
return value.hasDatumMissing();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String asString() {
|
||||
if (value.hasDatumString()) {
|
||||
return value.getDatumString();
|
||||
}
|
||||
else if (value.hasDatumBytes()) {
|
||||
return new String(asByteArray(), Charset.forName("UTF-8"));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean asBoolean() {
|
||||
if (value.hasDatumBool()) {
|
||||
return value.getDatumBool();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
if (s != null) {
|
||||
if (s.equalsIgnoreCase("t")) {
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
else if (s.equalsIgnoreCase("f")) {
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer asInteger() {
|
||||
if (value.hasDatumInt32()) {
|
||||
return value.getDatumInt32();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? Integer.valueOf(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long asLong() {
|
||||
if (value.hasDatumInt64()) {
|
||||
return value.getDatumInt64();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? Long.valueOf(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Float asFloat() {
|
||||
if (value.hasDatumFloat()) {
|
||||
return value.getDatumFloat();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? Float.valueOf(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double asDouble() {
|
||||
if (value.hasDatumDouble()) {
|
||||
return value.getDatumDouble();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? Double.valueOf(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asDecimal() {
|
||||
if (value.hasDatumDouble()) {
|
||||
return value.getDatumDouble();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
if (s != null) {
|
||||
return PostgresValueConverter.toSpecialValue(s).orElseGet(() -> new SpecialValueDecimal(new BigDecimal(s)));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] asByteArray() {
|
||||
return value.hasDatumBytes() ? value.getDatumBytes().toByteArray() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asLocalDate() {
|
||||
if (value.hasDatumInt32()) {
|
||||
return (long) value.getDatumInt32();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? DateTimeFormat.get().date(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asTime() {
|
||||
if (value.hasDatumInt64()) {
|
||||
return Duration.of(value.getDatumInt64(), ChronoUnit.MICROS);
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
if (s != null) {
|
||||
LocalTime localTime = DateTimeFormat.get().time(s);
|
||||
return Duration.of(localTime.toNanoOfDay(), ChronoUnit.NANOS);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OffsetTime asOffsetTimeUtc() {
|
||||
if (value.hasDatumDouble()) {
|
||||
return Conversions.toInstantFromMicros((long) value.getDatumDouble()).atOffset(ZoneOffset.UTC).toOffsetTime();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? DateTimeFormat.get().timeWithTimeZone(s) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OffsetDateTime asOffsetDateTimeAtUtc() {
|
||||
if (value.hasDatumInt64()) {
|
||||
return Conversions.toInstantFromMicros(value.getDatumInt64()).atOffset(ZoneOffset.UTC);
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(s).withOffsetSameInstant(ZoneOffset.UTC) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant asInstant() {
|
||||
if (value.hasDatumInt64()) {
|
||||
return Conversions.toInstantFromMicros(value.getDatumInt64());
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? DateTimeFormat.get().timestampToInstant(asString()) : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asLocalTime() {
|
||||
return asTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asBox() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asCircle() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asInterval() {
|
||||
if (value.hasDatumDouble()) {
|
||||
return value.getDatumDouble();
|
||||
}
|
||||
|
||||
final String s = asString();
|
||||
return s != null ? super.asInterval() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asLine() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asLseg() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asMoney() {
|
||||
if (value.hasDatumInt64()) {
|
||||
return value.getDatumInt64();
|
||||
}
|
||||
return super.asMoney();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asPath() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asPoint() {
|
||||
if (value.hasDatumPoint()) {
|
||||
PgProto.Point datumPoint = datumPoint = value.getDatumPoint();
|
||||
return new PGpoint(datumPoint.getX(), datumPoint.getY());
|
||||
}
|
||||
else if (value.hasDatumBytes()) {
|
||||
return super.asPoint();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asPolygon() {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isArray(PostgresType type) {
|
||||
final int oidValue = type.getOid();
|
||||
switch (oidValue) {
|
||||
case PgOid.INT2_ARRAY:
|
||||
case PgOid.INT4_ARRAY:
|
||||
case PgOid.INT8_ARRAY:
|
||||
case PgOid.TEXT_ARRAY:
|
||||
case PgOid.NUMERIC_ARRAY:
|
||||
case PgOid.FLOAT4_ARRAY:
|
||||
case PgOid.FLOAT8_ARRAY:
|
||||
case PgOid.BOOL_ARRAY:
|
||||
case PgOid.DATE_ARRAY:
|
||||
case PgOid.TIME_ARRAY:
|
||||
case PgOid.TIMETZ_ARRAY:
|
||||
case PgOid.TIMESTAMP_ARRAY:
|
||||
case PgOid.TIMESTAMPTZ_ARRAY:
|
||||
case PgOid.BYTEA_ARRAY:
|
||||
case PgOid.VARCHAR_ARRAY:
|
||||
case PgOid.OID_ARRAY:
|
||||
case PgOid.BPCHAR_ARRAY:
|
||||
case PgOid.MONEY_ARRAY:
|
||||
case PgOid.NAME_ARRAY:
|
||||
case PgOid.INTERVAL_ARRAY:
|
||||
case PgOid.CHAR_ARRAY:
|
||||
case PgOid.VARBIT_ARRAY:
|
||||
case PgOid.UUID_ARRAY:
|
||||
case PgOid.XML_ARRAY:
|
||||
case PgOid.POINT_ARRAY:
|
||||
case PgOid.JSONB_ARRAY:
|
||||
case PgOid.JSON_ARRAY:
|
||||
case PgOid.REF_CURSOR_ARRAY:
|
||||
case PgOid.INET_ARRAY:
|
||||
case PgOid.CIDR_ARRAY:
|
||||
case PgOid.MACADDR_ARRAY:
|
||||
case PgOid.MACADDR8_ARRAY:
|
||||
case PgOid.TSRANGE_ARRAY:
|
||||
case PgOid.TSTZRANGE_ARRAY:
|
||||
case PgOid.DATERANGE_ARRAY:
|
||||
case PgOid.INT4RANGE_ARRAY:
|
||||
case PgOid.NUM_RANGE_ARRAY:
|
||||
case PgOid.INT8RANGE_ARRAY:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asArray(String columnName, PostgresType type, String fullType, PgConnectionSupplier connection) {
|
||||
// Currently the logical decoding plugin sends unhandled types as a byte array containing the string
|
||||
// representation (in Postgres) of the array value.
|
||||
// The approach to decode this is sub-optimal but the only way to improve this is to update the plugin.
|
||||
// Reasons for it being sub-optimal include:
|
||||
// 1. It requires a Postgres JDBC connection to deserialize
|
||||
// 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will
|
||||
// be in most cases)
|
||||
// 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string
|
||||
// representations over the wire.
|
||||
try {
|
||||
byte[] data = asByteArray();
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
String dataString = new String(data, Charset.forName("UTF-8"));
|
||||
PgArray arrayData = new PgArray(connection.get(), (int) value.getColumnType(), dataString);
|
||||
Object deserializedArray = arrayData.getArray();
|
||||
return Arrays.asList((Object[]) deserializedArray);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", value.getColumnName(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object asDefault(TypeRegistry typeRegistry, int columnType, String columnName, String fullType, boolean includeUnknownDatatypes,
|
||||
PgConnectionSupplier connection) {
|
||||
final PostgresType type = typeRegistry.get(columnType);
|
||||
if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid()) {
|
||||
return asByteArray();
|
||||
}
|
||||
if (type.getOid() == typeRegistry.hstoreOid()) {
|
||||
return asByteArray();
|
||||
}
|
||||
if (type.getOid() == typeRegistry.geometryArrayOid() ||
|
||||
type.getOid() == typeRegistry.geographyArrayOid() ||
|
||||
type.getOid() == typeRegistry.citextArrayOid() ||
|
||||
type.getOid() == typeRegistry.hstoreArrayOid()) {
|
||||
return asArray(columnName, type, fullType, connection);
|
||||
}
|
||||
// unknown data type is sent by decoder as binary value
|
||||
if (includeUnknownDatatypes) {
|
||||
return asByteArray();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -6,38 +6,23 @@
|
||||
|
||||
package io.debezium.connector.postgresql.connection.pgproto;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZoneOffset;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.apache.kafka.connect.data.Field;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.postgresql.geometric.PGpoint;
|
||||
import org.postgresql.jdbc.PgArray;
|
||||
import org.postgresql.util.PGInterval;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.connector.postgresql.PgOid;
|
||||
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.PostgresValueConverter;
|
||||
import io.debezium.connector.postgresql.TypeRegistry;
|
||||
import io.debezium.connector.postgresql.UnchangedToastedReplicationMessageColumn;
|
||||
import io.debezium.connector.postgresql.connection.AbstractReplicationMessageColumn;
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessage;
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver;
|
||||
import io.debezium.connector.postgresql.proto.PgProto;
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.time.Conversions;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
@ -113,12 +98,14 @@ private List<ReplicationMessage.Column> transform(List<PgProto.DatumMessage> mes
|
||||
return new UnchangedToastedReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null),
|
||||
typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata());
|
||||
}
|
||||
return new AbstractReplicationMessageColumn(columnName, type, typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null),
|
||||
|
||||
final String fullType = typeInfo.map(PgProto.TypeInfo::getModifier).orElse(null);
|
||||
return new AbstractReplicationMessageColumn(columnName, type, fullType,
|
||||
typeInfo.map(PgProto.TypeInfo::getValueOptional).orElse(Boolean.FALSE), hasTypeMetadata()) {
|
||||
|
||||
@Override
|
||||
public Object getValue(PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
|
||||
return PgProtoReplicationMessage.this.getValue(datum, connection, includeUnknownDatatypes);
|
||||
return PgProtoReplicationMessage.this.getValue(columnName, type, fullType, datum, connection, includeUnknownDatatypes);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,203 +122,9 @@ public boolean isLastEventForLsn() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to
|
||||
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the
|
||||
* {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects.
|
||||
*
|
||||
* Note that the logic here is tightly coupled (i.e. dependent) on the Postgres plugin logic which writes the actual
|
||||
* Protobuf messages.
|
||||
*
|
||||
* @param a supplier to get a connection to Postgres instance for array handling
|
||||
* @return the value; may be null
|
||||
*/
|
||||
public Object getValue(PgProto.DatumMessage datumMessage, PgConnectionSupplier connection, boolean includeUnknownDatatypes) {
|
||||
if (datumMessage.hasDatumMissing()) {
|
||||
return UnchangedToastedReplicationMessageColumn.UNCHANGED_TOAST_VALUE;
|
||||
}
|
||||
|
||||
int columnType = (int) datumMessage.getColumnType();
|
||||
switch (columnType) {
|
||||
case PgOid.BOOL:
|
||||
return datumMessage.hasDatumBool() ? datumMessage.getDatumBool() : null;
|
||||
case PgOid.INT2:
|
||||
case PgOid.INT4:
|
||||
return datumMessage.hasDatumInt32() ? datumMessage.getDatumInt32() : null;
|
||||
case PgOid.INT8:
|
||||
case PgOid.OID:
|
||||
case PgOid.MONEY:
|
||||
return datumMessage.hasDatumInt64() ? datumMessage.getDatumInt64() : null;
|
||||
case PgOid.FLOAT4:
|
||||
return datumMessage.hasDatumFloat() ? datumMessage.getDatumFloat() : null;
|
||||
case PgOid.FLOAT8:
|
||||
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;
|
||||
case PgOid.NUMERIC:
|
||||
if (datumMessage.hasDatumDouble()) {
|
||||
// For backwards compatibility only to enable independent upgrade of Postgres plug-in
|
||||
return datumMessage.getDatumDouble();
|
||||
}
|
||||
else if (datumMessage.hasDatumString()) {
|
||||
final String s = datumMessage.getDatumString();
|
||||
return PostgresValueConverter.toSpecialValue(s).orElseGet(() -> new SpecialValueDecimal(new BigDecimal(s)));
|
||||
}
|
||||
return null;
|
||||
case PgOid.CHAR:
|
||||
case PgOid.VARCHAR:
|
||||
case PgOid.BPCHAR:
|
||||
case PgOid.TEXT:
|
||||
case PgOid.JSON:
|
||||
case PgOid.JSONB_OID:
|
||||
case PgOid.XML:
|
||||
case PgOid.UUID:
|
||||
case PgOid.BIT:
|
||||
case PgOid.VARBIT:
|
||||
case PgOid.INET_OID:
|
||||
case PgOid.CIDR_OID:
|
||||
case PgOid.MACADDR_OID:
|
||||
case PgOid.MACADDR8_OID:
|
||||
return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;
|
||||
case PgOid.DATE:
|
||||
return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null;
|
||||
case PgOid.TIMESTAMP:
|
||||
if (!datumMessage.hasDatumInt64()) {
|
||||
return null;
|
||||
}
|
||||
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
|
||||
return Conversions.toInstantFromMicros(datumMessage.getDatumInt64());
|
||||
case PgOid.TIMESTAMPTZ:
|
||||
if (!datumMessage.hasDatumInt64()) {
|
||||
return null;
|
||||
}
|
||||
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
|
||||
return Conversions.toInstantFromMicros(datumMessage.getDatumInt64()).atOffset(ZoneOffset.UTC);
|
||||
case PgOid.TIME:
|
||||
if (!datumMessage.hasDatumInt64()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
|
||||
return Duration.of(datumMessage.getDatumInt64(), ChronoUnit.MICROS);
|
||||
case PgOid.TIMETZ:
|
||||
if (!datumMessage.hasDatumDouble()) {
|
||||
return null;
|
||||
}
|
||||
// the value is sent as a double microseconds
|
||||
return Conversions.toInstantFromMicros((long) datumMessage.getDatumDouble())
|
||||
.atOffset(ZoneOffset.UTC)
|
||||
.toOffsetTime();
|
||||
case PgOid.INTERVAL:
|
||||
// these are sent as doubles by the plugin since their storage is larger than 8 bytes
|
||||
try {
|
||||
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble()
|
||||
: datumMessage.hasDatumString() ? new PGInterval(datumMessage.getDatumString()) : null;
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ConnectException("Could not convert interval value");
|
||||
}
|
||||
// the plugin will send back a TZ formatted string
|
||||
case PgOid.BYTEA:
|
||||
return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
|
||||
case PgOid.POINT: {
|
||||
PgProto.Point datumPoint = datumMessage.getDatumPoint();
|
||||
return new PGpoint(datumPoint.getX(), datumPoint.getY());
|
||||
}
|
||||
case PgOid.TSRANGE_OID:
|
||||
case PgOid.TSTZRANGE_OID:
|
||||
case PgOid.DATERANGE_OID:
|
||||
case PgOid.INT4RANGE_OID:
|
||||
case PgOid.NUM_RANGE_OID:
|
||||
case PgOid.INT8RANGE_OID:
|
||||
return datumMessage.hasDatumBytes() ? new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8")) : null;
|
||||
case PgOid.INT2_ARRAY:
|
||||
case PgOid.INT4_ARRAY:
|
||||
case PgOid.INT8_ARRAY:
|
||||
case PgOid.TEXT_ARRAY:
|
||||
case PgOid.NUMERIC_ARRAY:
|
||||
case PgOid.FLOAT4_ARRAY:
|
||||
case PgOid.FLOAT8_ARRAY:
|
||||
case PgOid.BOOL_ARRAY:
|
||||
case PgOid.DATE_ARRAY:
|
||||
case PgOid.TIME_ARRAY:
|
||||
case PgOid.TIMETZ_ARRAY:
|
||||
case PgOid.TIMESTAMP_ARRAY:
|
||||
case PgOid.TIMESTAMPTZ_ARRAY:
|
||||
case PgOid.BYTEA_ARRAY:
|
||||
case PgOid.VARCHAR_ARRAY:
|
||||
case PgOid.OID_ARRAY:
|
||||
case PgOid.BPCHAR_ARRAY:
|
||||
case PgOid.MONEY_ARRAY:
|
||||
case PgOid.NAME_ARRAY:
|
||||
case PgOid.INTERVAL_ARRAY:
|
||||
case PgOid.CHAR_ARRAY:
|
||||
case PgOid.VARBIT_ARRAY:
|
||||
case PgOid.UUID_ARRAY:
|
||||
case PgOid.XML_ARRAY:
|
||||
case PgOid.POINT_ARRAY:
|
||||
case PgOid.JSONB_ARRAY:
|
||||
case PgOid.JSON_ARRAY:
|
||||
case PgOid.REF_CURSOR_ARRAY:
|
||||
case PgOid.INET_ARRAY:
|
||||
case PgOid.CIDR_ARRAY:
|
||||
case PgOid.MACADDR_ARRAY:
|
||||
case PgOid.MACADDR8_ARRAY:
|
||||
case PgOid.TSRANGE_ARRAY:
|
||||
case PgOid.TSTZRANGE_ARRAY:
|
||||
case PgOid.DATERANGE_ARRAY:
|
||||
case PgOid.INT4RANGE_ARRAY:
|
||||
case PgOid.NUM_RANGE_ARRAY:
|
||||
case PgOid.INT8RANGE_ARRAY:
|
||||
return getArray(datumMessage, connection, columnType);
|
||||
|
||||
case PgOid.UNSPECIFIED:
|
||||
return null;
|
||||
|
||||
default:
|
||||
PostgresType type = typeRegistry.get(columnType);
|
||||
if (type.getOid() == typeRegistry.geometryOid() || type.getOid() == typeRegistry.geographyOid() || type.getOid() == typeRegistry.citextOid()) {
|
||||
return datumMessage.getDatumBytes().toByteArray();
|
||||
}
|
||||
if (type.getOid() == typeRegistry.hstoreOid()) {
|
||||
return datumMessage.getDatumBytes().toByteArray();
|
||||
}
|
||||
if (type.getOid() == typeRegistry.geometryArrayOid() ||
|
||||
type.getOid() == typeRegistry.geographyArrayOid() ||
|
||||
type.getOid() == typeRegistry.citextArrayOid() ||
|
||||
type.getOid() == typeRegistry.hstoreArrayOid()) {
|
||||
return getArray(datumMessage, connection, columnType);
|
||||
}
|
||||
// unknown data type is sent by decoder as binary value
|
||||
if (includeUnknownDatatypes && datumMessage.hasDatumBytes()) {
|
||||
return datumMessage.getDatumBytes().toByteArray();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private Object getArray(PgProto.DatumMessage datumMessage, PgConnectionSupplier connection, int columnType) {
|
||||
// Currently the logical decoding plugin sends unhandled types as a byte array containing the string
|
||||
// representation (in Postgres) of the array value.
|
||||
// The approach to decode this is sub-optimal but the only way to improve this is to update the plugin.
|
||||
// Reasons for it being sub-optimal include:
|
||||
// 1. It requires a Postgres JDBC connection to deserialize
|
||||
// 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will
|
||||
// be in most cases)
|
||||
// 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string
|
||||
// representations over the wire.
|
||||
try {
|
||||
byte[] data = datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
|
||||
if (data == null) {
|
||||
return null;
|
||||
}
|
||||
String dataString = new String(data, Charset.forName("UTF-8"));
|
||||
PgArray arrayData = new PgArray(connection.get(), columnType, dataString);
|
||||
Object deserializedArray = arrayData.getArray();
|
||||
return Arrays.asList((Object[]) deserializedArray);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
|
||||
}
|
||||
return null;
|
||||
public Object getValue(String columnName, PostgresType type, String fullType, PgProto.DatumMessage datumMessage, final PgConnectionSupplier connection,
|
||||
boolean includeUnknownDatatypes) {
|
||||
final PgProtoColumnValue columnValue = new PgProtoColumnValue(datumMessage);
|
||||
return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, typeRegistry);
|
||||
}
|
||||
}
|
||||
|
@ -40,17 +40,41 @@ public String asString() {
|
||||
|
||||
@Override
|
||||
public Boolean asBoolean() {
|
||||
return value.asBoolean();
|
||||
if (value.isBoolean()) {
|
||||
return value.asBoolean();
|
||||
}
|
||||
else if (value.isString()) {
|
||||
return "t".equalsIgnoreCase(value.asString());
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer asInteger() {
|
||||
return value.asInteger();
|
||||
if (value.isNumber()) {
|
||||
return value.asInteger();
|
||||
}
|
||||
else if (value.isString()) {
|
||||
return Integer.valueOf(value.asString());
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long asLong() {
|
||||
return value.asLong();
|
||||
if (value.isNumber()) {
|
||||
return value.asLong();
|
||||
}
|
||||
else if (value.isString()) {
|
||||
return Long.valueOf(value.asString());
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -168,7 +168,7 @@ private String parseType(String columnName, String typeWithModifiers) {
|
||||
public Object getValue(String columnName, PostgresType type, String fullType, Value rawValue, final PgConnectionSupplier connection,
|
||||
boolean includeUnknownDatatypes) {
|
||||
final Wal2JsonColumnValue columnValue = new Wal2JsonColumnValue(rawValue);
|
||||
return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes);
|
||||
return ReplicationMessageColumnValueResolver.resolveValue(columnName, type, fullType, columnValue, connection, includeUnknownDatatypes, typeRegistry);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -493,9 +493,9 @@ public void shouldGenerateSnapshotForDataTypeAlias() throws Exception {
|
||||
TestHelper.execute("INSERT INTO alias_table (salary, salary2, a, area) values (7.25, 8.25, 12345.123, 12345.123);");
|
||||
|
||||
buildNoStreamProducer(TestHelper.defaultConfig()
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with("column.propagate.source.type", "public.alias_table.area"));
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with("column.propagate.source.type", "public.alias_table.area"));
|
||||
|
||||
final TestConsumer consumer = testConsumer(1, "public");
|
||||
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
|
||||
@ -503,7 +503,7 @@ public void shouldGenerateSnapshotForDataTypeAlias() throws Exception {
|
||||
// Specifying alias money2 results in JDBC type '2001' for 'salary2'
|
||||
// Specifying money results in JDBC type '8' for 'salary'
|
||||
|
||||
consumer.process(record ->assertReadRecord(record, Collect.hashMapOf("public.alias_table", schemaAndValueForMoneyAliasType())));
|
||||
consumer.process(record -> assertReadRecord(record, Collect.hashMapOf("public.alias_table", schemaAndValueForMoneyAliasType())));
|
||||
}
|
||||
|
||||
private List<SchemaAndValueField> schemaAndValueForMoneyAliasType() {
|
||||
|
@ -17,7 +17,12 @@
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -48,11 +53,18 @@
|
||||
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
|
||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
|
||||
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
|
||||
import io.debezium.data.Bits;
|
||||
import io.debezium.data.Envelope;
|
||||
import io.debezium.data.Json;
|
||||
import io.debezium.data.SpecialValueDecimal;
|
||||
import io.debezium.data.Uuid;
|
||||
import io.debezium.data.VariableScaleDecimal;
|
||||
import io.debezium.data.VerifyRecord;
|
||||
import io.debezium.data.Xml;
|
||||
import io.debezium.data.geometry.Point;
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.heartbeat.Heartbeat;
|
||||
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
|
||||
import io.debezium.jdbc.TemporalPrecisionMode;
|
||||
import io.debezium.junit.ConditionalFail;
|
||||
import io.debezium.junit.ShouldFailWhen;
|
||||
@ -60,6 +72,12 @@
|
||||
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.time.Date;
|
||||
import io.debezium.time.MicroDuration;
|
||||
import io.debezium.time.MicroTime;
|
||||
import io.debezium.time.MicroTimestamp;
|
||||
import io.debezium.time.ZonedTime;
|
||||
import io.debezium.time.ZonedTimestamp;
|
||||
import io.debezium.util.Stopwatch;
|
||||
import io.debezium.util.Testing;
|
||||
|
||||
@ -1466,12 +1484,13 @@ public void shouldStreamChangesForDataTypeAlias() throws Exception {
|
||||
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));");
|
||||
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"),
|
||||
false
|
||||
);
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"),
|
||||
false);
|
||||
|
||||
waitForStreamingToStart();
|
||||
|
||||
consumer = testConsumer(1);
|
||||
executeAndWait("INSERT INTO alias_table (data, salary, salary2) values ('hello', 7.25, 8.25);");
|
||||
@ -1483,8 +1502,7 @@ public void shouldStreamChangesForDataTypeAlias() throws Exception {
|
||||
new SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1),
|
||||
new SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"),
|
||||
new SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25)),
|
||||
new SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), new BigDecimal(8.25))
|
||||
);
|
||||
new SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), new BigDecimal(8.25)));
|
||||
|
||||
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
|
||||
assertThat(consumer.isEmpty()).isTrue();
|
||||
@ -1495,21 +1513,20 @@ public void shouldStreamChangesForDataTypeAlias() throws Exception {
|
||||
public void shouldStreamChangesForDomainAliasAlterTable() throws Exception {
|
||||
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, PRIMARY KEY(pk));");
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table")
|
||||
.with("column.propagate.source.type", "public.alias_table.salary3"),
|
||||
false
|
||||
);
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table")
|
||||
.with("column.propagate.source.type", "public.alias_table.salary3"),
|
||||
false);
|
||||
|
||||
waitForStreamingToStart();
|
||||
|
||||
// Now that streaming has started, alter the table schema
|
||||
TestHelper.execute("CREATE DOMAIN money2 AS money DEFAULT 0.0;");
|
||||
TestHelper.execute("CREATE DOMAIN money3 AS numeric(8,3) DEFAULT 0.0;");
|
||||
TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary2 money2 NOT NULL");
|
||||
TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary3 money3 NOT NULL");
|
||||
TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary2 money2 NOT NULL;");
|
||||
TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary3 money3 NOT NULL;");
|
||||
|
||||
consumer = testConsumer(1);
|
||||
executeAndWait("INSERT INTO alias_table (data, salary, salary2, salary3) values ('hello', 7.25, 8.25, 123.456);");
|
||||
@ -1524,15 +1541,301 @@ public void shouldStreamChangesForDomainAliasAlterTable() throws Exception {
|
||||
new SchemaAndValueField("salary2", Decimal.builder(2).build(), new BigDecimal(8.25)),
|
||||
new SchemaAndValueField("salary3", SchemaBuilder.float64()
|
||||
.parameter(TestHelper.TYPE_NAME_PARAMETER_KEY, "MONEY3")
|
||||
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "2147483647")
|
||||
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")
|
||||
.build(), 123.456)
|
||||
);
|
||||
.parameter(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8")
|
||||
.parameter(TestHelper.TYPE_SCALE_PARAMETER_KEY, "3")
|
||||
.build(), 123.456));
|
||||
|
||||
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
|
||||
assertThat(consumer.isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1413")
|
||||
public void shouldStreamDomainAliasWithProperModifiers() throws Exception {
|
||||
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY(pk));");
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"),
|
||||
false);
|
||||
|
||||
waitForStreamingToStart();
|
||||
|
||||
TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);");
|
||||
TestHelper.execute("ALTER TABLE public.alias_table ADD COLUMN value varbit2 NOT NULL;");
|
||||
|
||||
consumer = testConsumer(1);
|
||||
executeAndWait("INSERT INTO public.alias_table (value) VALUES (B'101');");
|
||||
|
||||
SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1);
|
||||
assertSourceInfo(rec, "postgres", "public", "alias_table");
|
||||
|
||||
List<SchemaAndValueField> expected = Arrays.asList(
|
||||
new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1),
|
||||
new SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{ 5, 0 }));
|
||||
|
||||
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
|
||||
assertThat(consumer.isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1413")
|
||||
public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception {
|
||||
// todo: should test all these combinations in RecordsSnapshotProducerIT
|
||||
|
||||
TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY (pk));");
|
||||
startConnector(config -> config
|
||||
.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.DOUBLE)
|
||||
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
|
||||
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
|
||||
.with(PostgresConnectorConfig.TABLE_WHITELIST, "public.alias_table"),
|
||||
false);
|
||||
|
||||
waitForStreamingToStart();
|
||||
|
||||
// note: skipped macaddr8 as that is only supported on PG10+ but was manually tested
|
||||
TestHelper.execute("CREATE DOMAIN bit2 AS BIT(3);");
|
||||
TestHelper.execute("CREATE DOMAIN smallint2 AS smallint;");
|
||||
TestHelper.execute("CREATE DOMAIN integer2 as integer;");
|
||||
TestHelper.execute("CREATE DOMAIN bigint2 as bigint;");
|
||||
TestHelper.execute("CREATE DOMAIN real2 as real;");
|
||||
TestHelper.execute("CREATE DOMAIN bool2 AS BOOL DEFAULT false;");
|
||||
TestHelper.execute("CREATE DOMAIN float82 as float8;");
|
||||
TestHelper.execute("CREATE DOMAIN numeric2 as numeric(6,2);");
|
||||
TestHelper.execute("CREATE DOMAIN string2 AS varchar(25) DEFAULT NULL;");
|
||||
TestHelper.execute("CREATE DOMAIN date2 AS date;");
|
||||
TestHelper.execute("CREATE DOMAIN time2 as time;");
|
||||
TestHelper.execute("CREATE DOMAIN timetz2 as timetz;");
|
||||
TestHelper.execute("CREATE DOMAIN timestamp2 as timestamp;");
|
||||
TestHelper.execute("CREATE DOMAIN timestamptz2 AS timestamptz;");
|
||||
TestHelper.execute("CREATE DOMAIN timewotz2 as time without time zone;");
|
||||
TestHelper.execute("CREATE DOMAIN box2 as box;");
|
||||
TestHelper.execute("CREATE DOMAIN circle2 as circle;");
|
||||
TestHelper.execute("CREATE DOMAIN interval2 as interval;");
|
||||
TestHelper.execute("CREATE DOMAIN line2 as line;");
|
||||
TestHelper.execute("CREATE DOMAIN lseg2 as lseg;");
|
||||
TestHelper.execute("CREATE DOMAIN path2 as path;");
|
||||
TestHelper.execute("CREATE DOMAIN point2 as point;");
|
||||
TestHelper.execute("CREATE DOMAIN polygon2 as polygon;");
|
||||
TestHelper.execute("CREATE DOMAIN char2 as char;");
|
||||
TestHelper.execute("CREATE DOMAIN text2 as text;");
|
||||
TestHelper.execute("CREATE DOMAIN json2 as json;");
|
||||
TestHelper.execute("CREATE DOMAIN xml2 as xml;");
|
||||
TestHelper.execute("CREATE DOMAIN uuid2 as uuid;");
|
||||
TestHelper.execute("CREATE DOMAIN varbit2 as varbit(3);");
|
||||
TestHelper.execute("CREATE DOMAIN inet2 as inet;");
|
||||
TestHelper.execute("CREATE DOMAIN cidr2 as cidr;");
|
||||
TestHelper.execute("CREATE DOMAIN macaddr2 as macaddr;");
|
||||
|
||||
TestHelper.execute("ALTER TABLE alias_table "
|
||||
+ "ADD COLUMN bit_base bit(3) NOT NULL, ADD COLUMN bit_alias bit2 NOT NULL, "
|
||||
+ "ADD COLUMN smallint_base smallint NOT NULL, ADD COLUMN smallint_alias smallint2 NOT NULL, "
|
||||
+ "ADD COLUMN integer_base integer NOT NULL, ADD COLUMN integer_alias integer2 NOT NULL, "
|
||||
+ "ADD COLUMN bigint_base bigint NOT NULL, ADD COLUMN bigint_alias bigint2 NOT NULL, "
|
||||
+ "ADD COLUMN real_base real NOT NULL, ADD COLUMN real_alias real2 NOT NULL, "
|
||||
+ "ADD COLUMN float8_base float8 NOT NULL, ADD COLUMN float8_alias float82 NOT NULL, "
|
||||
+ "ADD COLUMN numeric_base numeric(6,2) NOT NULL, ADD COLUMN numeric_alias numeric2 NOT NULL, "
|
||||
+ "ADD COLUMN bool_base bool NOT NULL, ADD COLUMN bool_alias bool2 NOT NULL, "
|
||||
+ "ADD COLUMN string_base varchar(25) NOT NULL, ADD COLUMN string_alias string2 NOT NULL, "
|
||||
+ "ADD COLUMN date_base date NOT NULL, ADD COLUMN date_alias date2 NOT NULL, "
|
||||
+ "ADD COLUMN time_base time NOT NULL, ADD COLUMN time_alias time2 NOT NULL, "
|
||||
+ "ADD COLUMN timetz_base timetz NOT NULL, ADD COLUMN timetz_alias timetz2 NOT NULL, "
|
||||
+ "ADD COLUMN timestamp_base timestamp NOT NULL, ADD COLUMN timestamp_alias timestamp2 NOT NULL, "
|
||||
+ "ADD COLUMN timestamptz_base timestamptz NOT NULL, ADD COLUMN timestamptz_alias timestamptz2 NOT NULL, "
|
||||
+ "ADD COLUMN timewottz_base time without time zone NOT NULL, ADD COLUMN timewottz_alias timewotz2 NOT NULL, "
|
||||
+ "ADD COLUMN box_base box NOT NULL, ADD COLUMN box_alias box2 NOT NULL, "
|
||||
+ "ADD COLUMN circle_base circle NOT NULL, ADD COLUMN circle_alias circle2 NOT NULL, "
|
||||
+ "ADD COLUMN interval_base interval NOT NULL, ADD COLUMN interval_alias interval2 NOT NULL, "
|
||||
+ "ADD COLUMN line_base line NOT NULL, ADD COLUMN line_alias line2 NOT NULL, "
|
||||
+ "ADD COLUMN lseg_base lseg NOT NULL, ADD COLUMN lseg_alias lseg2 NOT NULL, "
|
||||
+ "ADD COLUMN path_base path NOT NULL, ADD COLUMN path_alias path2 NOT NULL, "
|
||||
+ "ADD COLUMN point_base point NOT NULL, ADD COLUMN point_alias point2 NOT NULL, "
|
||||
+ "ADD COLUMN polygon_base polygon NOT NULL, ADD COLUMN polygon_alias polygon2 NOT NULL, "
|
||||
+ "ADD COLUMN char_base char NOT NULL, ADD COLUMN char_alias char2 NOT NULL, "
|
||||
+ "ADD COLUMN text_base text NOT NULL, ADD COLUMN text_alias text2 NOT NULL, "
|
||||
+ "ADD COLUMN json_base json NOT NULL, ADD COLUMN json_alias json2 NOT NULL, "
|
||||
+ "ADD COLUMN xml_base xml NOT NULL, ADD COLUMN xml_alias xml2 NOT NULL, "
|
||||
+ "ADD COLUMN uuid_base UUID NOT NULL, ADD COLUMN uuid_alias uuid2 NOT NULL, "
|
||||
+ "ADD COLUMN varbit_base varbit(3) NOT NULL, ADD COLUMN varbit_alias varbit2 NOT NULL,"
|
||||
+ "ADD COLUMN inet_base inet NOT NULL, ADD COLUMN inet_alias inet2 NOT NULL, "
|
||||
+ "ADD COLUMN cidr_base cidr NOT NULL, ADD COLUMN cidr_alias cidr2 NOT NULL, "
|
||||
+ "ADD COLUMN macaddr_base macaddr NOT NULL, ADD COLUMN macaddr_alias macaddr2 NOT NULL");
|
||||
|
||||
consumer = testConsumer(1);
|
||||
executeAndWait("INSERT INTO alias_table ("
|
||||
+ "bit_base, bit_alias, "
|
||||
+ "smallint_base, smallint_alias, "
|
||||
+ "integer_base, integer_alias, "
|
||||
+ "bigint_base, bigint_alias, "
|
||||
+ "real_base, real_alias, "
|
||||
+ "float8_base, float8_alias, "
|
||||
+ "numeric_base, numeric_alias, "
|
||||
+ "bool_base, bool_alias, "
|
||||
+ "string_base, string_alias, "
|
||||
+ "date_base, date_alias, "
|
||||
+ "time_base, time_alias, "
|
||||
+ "timetz_base, timetz_alias, "
|
||||
+ "timestamp_base, timestamp_alias, "
|
||||
+ "timestamptz_base, timestamptz_alias, "
|
||||
+ "timewottz_base, timewottz_alias, "
|
||||
+ "box_base, box_alias, "
|
||||
+ "circle_base, circle_alias, "
|
||||
+ "interval_base, interval_alias, "
|
||||
+ "line_base, line_alias, "
|
||||
+ "lseg_base, lseg_alias, "
|
||||
+ "path_base, path_alias, "
|
||||
+ "point_base, point_alias, "
|
||||
+ "polygon_base, polygon_alias, "
|
||||
+ "char_base, char_alias, "
|
||||
+ "text_base, text_alias, "
|
||||
+ "json_base, json_alias, "
|
||||
+ "xml_base, xml_alias, "
|
||||
+ "uuid_base, uuid_alias, "
|
||||
+ "varbit_base, varbit_alias, "
|
||||
+ "inet_base, inet_alias, "
|
||||
+ "cidr_base, cidr_alias, "
|
||||
+ "macaddr_base, macaddr_alias "
|
||||
+ ") VALUES ("
|
||||
+ "B'101', B'101', "
|
||||
+ "1, 1, "
|
||||
+ "1, 1, "
|
||||
+ "1000, 1000, "
|
||||
+ "3.14, 3.14, "
|
||||
+ "3.14, 3.14, "
|
||||
+ "1234.12, 1234.12, "
|
||||
+ "true, true, "
|
||||
+ "'hello', 'hello', "
|
||||
+ "'2019-10-02', '2019-10-02', "
|
||||
+ "'01:02:03', '01:02:03', "
|
||||
+ "'01:02:03.123789Z', '01:02:03.123789Z', "
|
||||
+ "'2019-10-02T01:02:03.123456', '2019-10-02T01:02:03.123456', "
|
||||
+ "'2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, "
|
||||
+ "'01:02:03', '01:02:03', "
|
||||
+ "'(0,0),(1,1)', '(0,0),(1,1)', "
|
||||
+ "'10,4,10', '10,4,10', "
|
||||
+ "'1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', "
|
||||
+ "'(0,0),(0,1)', '(0,0),(0,1)', "
|
||||
+ "'((0,0),(0,1))', '((0,0),(0,1))', "
|
||||
+ "'((0,0),(0,1),(0,2))', '((0,0),(0,1),(0,2))', "
|
||||
+ "'(1,1)', '(1,1)', "
|
||||
+ "'((0,0),(0,1),(1,0),(0,0))', '((0,0),(0,1),(1,0),(0,0))', "
|
||||
+ "'a', 'a', "
|
||||
+ "'Hello World', 'Hello World', "
|
||||
+ "'{\"key\": \"value\"}', '{\"key\": \"value\"}', "
|
||||
+ "XML('<foo>Hello</foo>'), XML('<foo>Hello</foo>'), "
|
||||
+ "'40e6215d-b5c6-4896-987c-f30f3678f608', '40e6215d-b5c6-4896-987c-f30f3678f608', "
|
||||
+ "B'101', B'101', "
|
||||
+ "'192.168.0.1', '192.168.0.1', "
|
||||
+ "'192.168/24', '192.168/24', "
|
||||
+ "'08:00:2b:01:02:03', '08:00:2b:01:02:03' "
|
||||
+ ");");
|
||||
|
||||
SourceRecord rec = assertRecordInserted("public.alias_table", PK_FIELD, 1);
|
||||
assertSourceInfo(rec, "postgres", "public", "alias_table");
|
||||
|
||||
// check with Jiri if we intend to have these differences
|
||||
final ByteBuffer boxByteBuffer;
|
||||
final ByteBuffer circleByteBuffer;
|
||||
final ByteBuffer lineByteBuffer;
|
||||
final ByteBuffer lsegByteBuffer;
|
||||
final ByteBuffer pathByteBuffer;
|
||||
final ByteBuffer polygonByteBuffer;
|
||||
if (TestHelper.decoderPlugin() == PostgresConnectorConfig.LogicalDecoder.DECODERBUFS) {
|
||||
boxByteBuffer = ByteBuffer.wrap("(1,1),(0,0)".getBytes());
|
||||
circleByteBuffer = ByteBuffer.wrap("<(10,4),10>".getBytes());
|
||||
lineByteBuffer = ByteBuffer.wrap("{-1,0,0}".getBytes());
|
||||
lsegByteBuffer = ByteBuffer.wrap("[(0,0),(0,1)]".getBytes());
|
||||
pathByteBuffer = ByteBuffer.wrap("((0,0),(0,1),(0,2))".getBytes());
|
||||
polygonByteBuffer = ByteBuffer.wrap("((0,0),(0,1),(1,0),(0,0))".getBytes());
|
||||
}
|
||||
else {
|
||||
boxByteBuffer = ByteBuffer.wrap("(1.0,1.0),(0.0,0.0)".getBytes());
|
||||
circleByteBuffer = ByteBuffer.wrap("<(10.0,4.0),10.0>".getBytes());
|
||||
lineByteBuffer = ByteBuffer.wrap("{-1.0,0.0,0.0}".getBytes());
|
||||
lsegByteBuffer = ByteBuffer.wrap("[(0.0,0.0),(0.0,1.0)]".getBytes());
|
||||
pathByteBuffer = ByteBuffer.wrap("((0.0,0.0),(0.0,1.0),(0.0,2.0))".getBytes());
|
||||
polygonByteBuffer = ByteBuffer.wrap("((0.0,0.0),(0.0,1.0),(1.0,0.0),(0.0,0.0))".getBytes());
|
||||
}
|
||||
|
||||
List<SchemaAndValueField> expected = Arrays.asList(
|
||||
new SchemaAndValueField(PK_FIELD, SchemaBuilder.INT32_SCHEMA, 1),
|
||||
new SchemaAndValueField("bit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }),
|
||||
new SchemaAndValueField("bit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }),
|
||||
new SchemaAndValueField("smallint_base", SchemaBuilder.INT16_SCHEMA, (short) 1),
|
||||
new SchemaAndValueField("smallint_alias", SchemaBuilder.INT16_SCHEMA, (short) 1),
|
||||
new SchemaAndValueField("integer_base", SchemaBuilder.INT32_SCHEMA, 1),
|
||||
new SchemaAndValueField("integer_alias", SchemaBuilder.INT32_SCHEMA, 1),
|
||||
new SchemaAndValueField("bigint_base", SchemaBuilder.INT64_SCHEMA, 1000L),
|
||||
new SchemaAndValueField("bigint_alias", SchemaBuilder.INT64_SCHEMA, 1000L),
|
||||
new SchemaAndValueField("real_base", SchemaBuilder.FLOAT32_SCHEMA, 3.14f),
|
||||
new SchemaAndValueField("real_alias", SchemaBuilder.FLOAT32_SCHEMA, 3.14f),
|
||||
new SchemaAndValueField("float8_base", SchemaBuilder.FLOAT64_SCHEMA, 3.14),
|
||||
new SchemaAndValueField("float8_alias", SchemaBuilder.FLOAT64_SCHEMA, 3.14),
|
||||
new SchemaAndValueField("numeric_base", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 4, 2).build(), 1234.12),
|
||||
new SchemaAndValueField("numeric_alias", SpecialValueDecimal.builder(DecimalMode.DOUBLE, 4, 2).build(), 1234.12),
|
||||
new SchemaAndValueField("bool_base", SchemaBuilder.BOOLEAN_SCHEMA, true),
|
||||
new SchemaAndValueField("bool_alias", SchemaBuilder.BOOLEAN_SCHEMA, true),
|
||||
new SchemaAndValueField("string_base", SchemaBuilder.STRING_SCHEMA, "hello"),
|
||||
new SchemaAndValueField("string_alias", SchemaBuilder.STRING_SCHEMA, "hello"),
|
||||
new SchemaAndValueField("date_base", Date.builder().build(), Date.toEpochDay(LocalDate.parse("2019-10-02"), null)),
|
||||
new SchemaAndValueField("date_alias", Date.builder().build(), Date.toEpochDay(LocalDate.parse("2019-10-02"), null)),
|
||||
new SchemaAndValueField("time_base", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000),
|
||||
new SchemaAndValueField("time_alias", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000),
|
||||
new SchemaAndValueField("timetz_base", ZonedTime.builder().build(), "01:02:03.123789Z"),
|
||||
new SchemaAndValueField("timetz_alias", ZonedTime.builder().build(), "01:02:03.123789Z"),
|
||||
new SchemaAndValueField("timestamp_base", MicroTimestamp.builder().build(), asEpochMicros("2019-10-02T01:02:03.123456")),
|
||||
new SchemaAndValueField("timestamp_alias", MicroTimestamp.builder().build(), asEpochMicros("2019-10-02T01:02:03.123456")),
|
||||
new SchemaAndValueField("timestamptz_base", ZonedTimestamp.builder().build(), "2019-10-02T11:51:30.123456Z"),
|
||||
new SchemaAndValueField("timestamptz_alias", ZonedTimestamp.builder().build(), "2019-10-02T11:51:30.123456Z"),
|
||||
new SchemaAndValueField("timewottz_base", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000),
|
||||
new SchemaAndValueField("timewottz_alias", MicroTime.builder().build(), LocalTime.parse("01:02:03").toNanoOfDay() / 1_000),
|
||||
new SchemaAndValueField("interval_base", MicroDuration.builder().build(),
|
||||
MicroDuration.durationMicros(1, 2, 3, 4, 5, 6, MicroDuration.DAYS_PER_MONTH_AVG)),
|
||||
new SchemaAndValueField("interval_alias", MicroDuration.builder().build(),
|
||||
MicroDuration.durationMicros(1, 2, 3, 4, 5, 6, MicroDuration.DAYS_PER_MONTH_AVG)),
|
||||
new SchemaAndValueField("box_base", SchemaBuilder.BYTES_SCHEMA, boxByteBuffer),
|
||||
new SchemaAndValueField("box_alias", SchemaBuilder.BYTES_SCHEMA, boxByteBuffer),
|
||||
new SchemaAndValueField("circle_base", SchemaBuilder.BYTES_SCHEMA, circleByteBuffer),
|
||||
new SchemaAndValueField("circle_alias", SchemaBuilder.BYTES_SCHEMA, circleByteBuffer),
|
||||
new SchemaAndValueField("line_base", SchemaBuilder.BYTES_SCHEMA, lineByteBuffer),
|
||||
new SchemaAndValueField("line_alias", SchemaBuilder.BYTES_SCHEMA, lineByteBuffer),
|
||||
new SchemaAndValueField("lseg_base", SchemaBuilder.BYTES_SCHEMA, lsegByteBuffer),
|
||||
new SchemaAndValueField("lseg_alias", SchemaBuilder.BYTES_SCHEMA, lsegByteBuffer),
|
||||
new SchemaAndValueField("path_base", SchemaBuilder.BYTES_SCHEMA, pathByteBuffer),
|
||||
new SchemaAndValueField("path_alias", SchemaBuilder.BYTES_SCHEMA, pathByteBuffer),
|
||||
new SchemaAndValueField("point_base", Point.builder().build(), Point.createValue(Point.builder().build(), 1, 1)),
|
||||
new SchemaAndValueField("point_alias", Point.builder().build(), Point.createValue(Point.builder().build(), 1, 1)),
|
||||
new SchemaAndValueField("polygon_base", SchemaBuilder.BYTES_SCHEMA, polygonByteBuffer),
|
||||
new SchemaAndValueField("polygon_alias", SchemaBuilder.BYTES_SCHEMA, polygonByteBuffer),
|
||||
new SchemaAndValueField("char_base", SchemaBuilder.STRING_SCHEMA, "a"),
|
||||
new SchemaAndValueField("char_alias", SchemaBuilder.STRING_SCHEMA, "a"),
|
||||
new SchemaAndValueField("text_base", SchemaBuilder.STRING_SCHEMA, "Hello World"),
|
||||
new SchemaAndValueField("text_alias", SchemaBuilder.STRING_SCHEMA, "Hello World"),
|
||||
new SchemaAndValueField("json_base", Json.builder().build(), "{\"key\": \"value\"}"),
|
||||
new SchemaAndValueField("json_alias", Json.builder().build(), "{\"key\": \"value\"}"),
|
||||
new SchemaAndValueField("xml_base", Xml.builder().build(), "<foo>Hello</foo>"),
|
||||
new SchemaAndValueField("xml_alias", Xml.builder().build(), "<foo>Hello</foo>"),
|
||||
new SchemaAndValueField("uuid_base", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"),
|
||||
new SchemaAndValueField("uuid_alias", Uuid.builder().build(), "40e6215d-b5c6-4896-987c-f30f3678f608"),
|
||||
new SchemaAndValueField("varbit_base", Bits.builder(3).build(), new byte[]{ 5, 0 }),
|
||||
new SchemaAndValueField("varbit_alias", Bits.builder(3).build(), new byte[]{ 5, 0 }),
|
||||
new SchemaAndValueField("inet_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"),
|
||||
new SchemaAndValueField("inet_alias", SchemaBuilder.STRING_SCHEMA, "192.168.0.1"),
|
||||
new SchemaAndValueField("cidr_base", SchemaBuilder.STRING_SCHEMA, "192.168.0.0/24"),
|
||||
new SchemaAndValueField("cidr_alias", SchemaBuilder.STRING_SCHEMA, "192.168.0.0/24"),
|
||||
new SchemaAndValueField("macaddr_base", SchemaBuilder.STRING_SCHEMA, "08:00:2b:01:02:03"),
|
||||
new SchemaAndValueField("macaddr_alias", SchemaBuilder.STRING_SCHEMA, "08:00:2b:01:02:03"));
|
||||
|
||||
assertRecordSchemaAndValues(expected, rec, Envelope.FieldName.AFTER);
|
||||
assertThat(consumer.isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
private long asEpochMicros(String timestamp) {
|
||||
Instant instant = LocalDateTime.parse(timestamp).atOffset(ZoneOffset.UTC).toInstant();
|
||||
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000;
|
||||
}
|
||||
|
||||
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart)
|
||||
throws Exception {
|
||||
if (tablesBeforeStart) {
|
||||
|
@ -138,6 +138,9 @@ public JdbcValueConverters(DecimalMode decimalMode, TemporalPrecisionMode tempor
|
||||
|
||||
@Override
|
||||
public SchemaBuilder schemaBuilder(Column column) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
System.out.println("JdbcValueConverters#schemaBuilder(" + column.jdbcType() + ")");
|
||||
}
|
||||
switch (column.jdbcType()) {
|
||||
case Types.NULL:
|
||||
logger.warn("Unexpected JDBC type: NULL");
|
||||
@ -245,9 +248,11 @@ public SchemaBuilder schemaBuilder(Column column) {
|
||||
// often treated as a string, but we'll generalize and treat it as a byte array
|
||||
return SchemaBuilder.bytes();
|
||||
|
||||
case Types.DISTINCT:
|
||||
return distinctSchema(column);
|
||||
|
||||
// Unhandled types
|
||||
case Types.ARRAY:
|
||||
case Types.DISTINCT:
|
||||
case Types.JAVA_OBJECT:
|
||||
case Types.OTHER:
|
||||
case Types.REF:
|
||||
@ -340,9 +345,11 @@ public ValueConverter converter(Column column, Field fieldDefn) {
|
||||
case Types.ROWID:
|
||||
return (data) -> convertRowId(column, fieldDefn, data);
|
||||
|
||||
case Types.DISTINCT:
|
||||
return convertDistinct(column, fieldDefn);
|
||||
|
||||
// Unhandled types
|
||||
case Types.ARRAY:
|
||||
case Types.DISTINCT:
|
||||
case Types.JAVA_OBJECT:
|
||||
case Types.OTHER:
|
||||
case Types.REF:
|
||||
@ -772,7 +779,15 @@ protected Object convertTinyInt(Column column, Field fieldDefn, Object data) {
|
||||
* @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls
|
||||
*/
|
||||
protected Object convertSmallInt(Column column, Field fieldDefn, Object data) {
|
||||
System.out.println("convertSmallInt(" + column.name() + ") with data " + (data != null ? data.getClass() : "null"));
|
||||
if (data == null) {
|
||||
System.out.println("pause");
|
||||
}
|
||||
return convertValue(column, fieldDefn, data, SHORT_FALSE, (r) -> {
|
||||
System.out.println("convertSmallInt '" + data + "' (" + data.getClass() + ").");
|
||||
if (data == null) {
|
||||
System.out.println("pause2");
|
||||
}
|
||||
if (data instanceof Short) {
|
||||
r.deliver(data);
|
||||
}
|
||||
@ -1239,4 +1254,12 @@ protected Object convertValue(Column column, Field fieldDefn, Object data, Objec
|
||||
private boolean supportsLargeTimeValues() {
|
||||
return adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode;
|
||||
}
|
||||
|
||||
protected SchemaBuilder distinctSchema(Column column) {
|
||||
return null;
|
||||
}
|
||||
|
||||
protected ValueConverter convertDistinct(Column column, Field fieldDefn) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user