diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java index f33ea9e7c..839301121 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresType.java @@ -16,23 +16,27 @@ */ public class PostgresType { - public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE); + public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, TypeRegistry.UNKNOWN_LENGTH, TypeRegistry.UNKNOWN_LENGTH); private final String name; private final int oid; private final int jdbcId; private final PostgresType elementType; + private final int defaultLength; + private final int defaultScale; - public PostgresType(String name, int oid, int jdbcId) { - this(name, oid, jdbcId, null); + public PostgresType(String name, int oid, int jdbcId, int defaultLength, int defaultScale) { + this(name, oid, jdbcId, defaultLength, defaultScale, null); } - public PostgresType(String name, int oid, int jdbcId, PostgresType elementType) { + public PostgresType(String name, int oid, int jdbcId, int defaultLength, int defaultScale, PostgresType elementType) { Objects.requireNonNull(name); this.name = name; this.oid = oid; this.jdbcId = jdbcId; this.elementType = elementType; + this.defaultLength = defaultLength; + this.defaultScale = defaultScale; } /** @@ -74,6 +78,22 @@ public PostgresType getElementType() { return elementType; } + /** + * + * @return the default length of the type + */ + public int getDefaultLength() { + return defaultLength; + } + + /** + * + * @return the default Scale of the type + */ + public int getDefaultScale() { + return defaultScale; + } + @Override public int hashCode() { return name.hashCode(); @@ -88,16 +108,6 @@ public boolean equals(Object obj) { if (getClass() != obj.getClass()) return false; PostgresType other = (PostgresType) obj; - if (elementType == null) { - if (other.elementType != null) - return false; - } - else if (!elementType.equals(other.elementType)) - return false; - if (jdbcId != other.jdbcId) - return false; - if (!name.equals(other.name)) - return false; if (oid != other.oid) return false; return true; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index 5dee7d658..1935307db 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -459,14 +459,14 @@ private boolean schemaChanged(List columns, Table tab } if (metadataInMessage) { final int localLength = column.length(); - final int incomingLength = message.getTypeMetadata().getLength().orElse(Column.UNSET_INT_VALUE); + final int incomingLength = message.getTypeMetadata().getLength(); if (localLength != incomingLength) { logger.info("detected new length for column '{}', old length was '{}', new length is '{}'; refreshing table schema", columnName, localLength, incomingLength); return true; } final int localScale = column.scale(); - final int incomingScale = message.getTypeMetadata().getScale().orElse(Column.UNSET_INT_VALUE); + final int incomingScale = message.getTypeMetadata().getScale(); if (localScale != incomingScale) { logger.info("detected new scale for column '{}', old scale was '{}', new scale is '{}'; refreshing table schema", columnName, localScale, incomingScale); @@ -521,13 +521,8 @@ private Table tableFromFromMessage(List columns, Tabl .type(type.getName()) .optional(column.isOptional()) .nativeType(type.getOid()); - TypeRegistry.reconcileJdbcOidTypeConstraints(type, columnEditor); - if (column.getTypeMetadata().getLength().isPresent()) { - columnEditor.length(column.getTypeMetadata().getLength().getAsInt()); - } - if (column.getTypeMetadata().getScale().isPresent()) { - columnEditor.scale(column.getTypeMetadata().getScale().getAsInt()); - } + columnEditor.length(column.getTypeMetadata().getLength()); + columnEditor.scale(column.getTypeMetadata().getScale()); return columnEditor.create(); }) .collect(Collectors.toList()) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java index 825e60a76..c82c23e79 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/TypeRegistry.java @@ -12,8 +12,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.debezium.relational.ColumnEditor; - /** * A registry of types supported by a PostgreSQL instance. Allows lookup of the types according to * type name or OID. @@ -30,6 +28,9 @@ public class TypeRegistry { public static final String TYPE_NAME_GEOGRAPHY_ARRAY = "_geography"; public static final String TYPE_NAME_GEOMETRY_ARRAY = "_geometry"; + public static final int NO_TYPE_MODIFIER = -1; + public static final int UNKNOWN_LENGTH = -1; + private static final Map LONG_TYPE_NAMES = Collections.unmodifiableMap(getLongTypeNames()); private static Map getLongTypeNames() { @@ -218,31 +219,4 @@ public int geographyArrayOid() { public static String normalizeTypeName(String typeName) { return LONG_TYPE_NAMES.getOrDefault(typeName, typeName); } - - /** - * JDBC metadata are different for some of the unbounded types from those coming via decoder. - * This method sets the type constraints to the values provided by JDBC metadata. - * - * @param type column type coming from decoder - * @param columnEditor the JDBC counterpart of the column - */ - public static void reconcileJdbcOidTypeConstraints(PostgresType type, - final ColumnEditor columnEditor) { - switch (type.getName()) { - case "money": - // JDBC returns scale 0 but decoder plugin returns -1 (unscaled) - columnEditor.scale(0); - break; - case "timestamp": - // JDBC returns length/scale 29/6 but decoder plugin returns -1 (unlimited) - columnEditor.length(29); - columnEditor.scale(6); - break; - case "time": - // JDBC returns length/scale 15/6 but decoder plugin returns -1 (unlimited) - columnEditor.length(15); - columnEditor.scale(6); - break; - } - } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractReplicationMessageColumn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractReplicationMessageColumn.java index 7534ae9fb..e27e6d512 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractReplicationMessageColumn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractReplicationMessageColumn.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.postgresql.connection; -import java.util.OptionalInt; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,7 +48,7 @@ public static class TypeMetadataImpl implements ReplicationMessage.ColumnTypeMet private String[] typeModifiers = {}; - public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean optional) { + public TypeMetadataImpl(String columnName, PostgresType type, String typeWithModifiers, boolean optional) { this.optional = optional; Matcher m = TYPE_PATTERN.matcher(typeWithModifiers); if (!m.matches()) { @@ -62,12 +61,15 @@ public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean opt } // TODO: make this more elegant/type-specific + length = type.getDefaultLength(); if (typeModifiers.length > 0) { try { this.length = Integer.parseInt(typeModifiers[0]); } catch (NumberFormatException e) { } } + + scale = type.getDefaultScale(); if (typeModifiers.length > 1) { try { this.scale = Integer.parseInt(typeModifiers[1]); @@ -77,13 +79,13 @@ public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean opt } @Override - public OptionalInt getLength() { - return length != null ? OptionalInt.of(length) : OptionalInt.empty(); + public int getLength() { + return length; } @Override - public OptionalInt getScale() { - return scale != null ? OptionalInt.of(scale) : OptionalInt.empty(); + public int getScale() { + return scale; } public String[] getModifiers() { @@ -113,7 +115,7 @@ public AbstractReplicationMessageColumn(String columnName, PostgresType type, St private void initMetadata() { assert hasMetadata : "Metadata not available"; - typeMetadata = new TypeMetadataImpl(columnName, typeWithModifiers, optional); + typeMetadata = new TypeMetadataImpl(columnName, type, typeWithModifiers, optional); } /** diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index c4de262fe..d515efd1f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -16,6 +16,8 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.core.TypeInfo; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.util.PSQLState; import org.slf4j.Logger; @@ -270,12 +272,20 @@ protected int resolveNativeType(String typeName) { private static TypeRegistry initTypeRegistry(Connection db, Map nameToJdbc) { TypeRegistry.Builder typeRegistryBuilder = TypeRegistry.create(); + final TypeInfo typeInfo = ((BaseConnection)db).getTypeInfo(); try { try (final Statement statement = db.createStatement()) { // Read non-array types try (final ResultSet rs = statement.executeQuery(SQL_NON_ARRAY_TYPES)) { while (rs.next()) { - typeRegistryBuilder.addType(new PostgresType(rs.getString("name"), rs.getInt("oid"), nameToJdbc.get(rs.getString("name")))); + final int oid = rs.getInt("oid"); + typeRegistryBuilder.addType(new PostgresType( + rs.getString("name"), + oid, + nameToJdbc.get(rs.getString("name")), + getSize(typeInfo, oid), + getScale(typeInfo, oid) + )); } } @@ -283,7 +293,15 @@ private static TypeRegistry initTypeRegistry(Connection db, Map try (final ResultSet rs = statement.executeQuery(SQL_ARRAY_TYPES)) { while (rs.next()) { // int2vector and oidvector will not be treated as arrays - typeRegistryBuilder.addType(new PostgresType(rs.getString("name"), rs.getInt("oid"), nameToJdbc.get(rs.getString("name")), typeRegistryBuilder.get(rs.getInt("element")))); + final int oid = rs.getInt("oid"); + typeRegistryBuilder.addType(new PostgresType( + rs.getString("name"), + oid, + nameToJdbc.get(rs.getString("name")), + getSize(typeInfo, oid), + getScale(typeInfo, oid), + typeRegistryBuilder.get(rs.getInt("element")) + )); } } } @@ -294,6 +312,18 @@ private static TypeRegistry initTypeRegistry(Connection db, Map return typeRegistryBuilder.build(); } + private static int getSize(final TypeInfo typeInfo, final int oid) { + int size = typeInfo.getPrecision(oid, TypeRegistry.NO_TYPE_MODIFIER); + if (size == 0) { + size = typeInfo.getDisplaySize(oid, TypeRegistry.NO_TYPE_MODIFIER); + } + return size; + } + + private static int getScale(final TypeInfo typeInfo, final int oid) { + return typeInfo.getScale(oid, TypeRegistry.NO_TYPE_MODIFIER); + } + public TypeRegistry getTypeRegistry() { return typeRegistry; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java index 2daa532d0..213f64052 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java @@ -7,7 +7,6 @@ package io.debezium.connector.postgresql.connection; import java.util.List; -import java.util.OptionalInt; import io.debezium.connector.postgresql.PostgresType; import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier; @@ -47,8 +46,8 @@ public interface Column { } public interface ColumnTypeMetadata { - OptionalInt getLength(); - OptionalInt getScale(); + int getLength(); + int getScale(); } /**