DBZ-616 Get default length and scale for types
This commit is contained in:
parent
b1362f9db9
commit
2a08a718ba
@ -16,23 +16,27 @@
|
||||
*/
|
||||
public class PostgresType {
|
||||
|
||||
public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE);
|
||||
public static final PostgresType UNKNOWN = new PostgresType("unknown", -1, Integer.MIN_VALUE, TypeRegistry.UNKNOWN_LENGTH, TypeRegistry.UNKNOWN_LENGTH);
|
||||
|
||||
private final String name;
|
||||
private final int oid;
|
||||
private final int jdbcId;
|
||||
private final PostgresType elementType;
|
||||
private final int defaultLength;
|
||||
private final int defaultScale;
|
||||
|
||||
public PostgresType(String name, int oid, int jdbcId) {
|
||||
this(name, oid, jdbcId, null);
|
||||
public PostgresType(String name, int oid, int jdbcId, int defaultLength, int defaultScale) {
|
||||
this(name, oid, jdbcId, defaultLength, defaultScale, null);
|
||||
}
|
||||
|
||||
public PostgresType(String name, int oid, int jdbcId, PostgresType elementType) {
|
||||
public PostgresType(String name, int oid, int jdbcId, int defaultLength, int defaultScale, PostgresType elementType) {
|
||||
Objects.requireNonNull(name);
|
||||
this.name = name;
|
||||
this.oid = oid;
|
||||
this.jdbcId = jdbcId;
|
||||
this.elementType = elementType;
|
||||
this.defaultLength = defaultLength;
|
||||
this.defaultScale = defaultScale;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -74,6 +78,22 @@ public PostgresType getElementType() {
|
||||
return elementType;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the default length of the type
|
||||
*/
|
||||
public int getDefaultLength() {
|
||||
return defaultLength;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return the default Scale of the type
|
||||
*/
|
||||
public int getDefaultScale() {
|
||||
return defaultScale;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return name.hashCode();
|
||||
@ -88,16 +108,6 @@ public boolean equals(Object obj) {
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
PostgresType other = (PostgresType) obj;
|
||||
if (elementType == null) {
|
||||
if (other.elementType != null)
|
||||
return false;
|
||||
}
|
||||
else if (!elementType.equals(other.elementType))
|
||||
return false;
|
||||
if (jdbcId != other.jdbcId)
|
||||
return false;
|
||||
if (!name.equals(other.name))
|
||||
return false;
|
||||
if (oid != other.oid)
|
||||
return false;
|
||||
return true;
|
||||
|
@ -459,14 +459,14 @@ private boolean schemaChanged(List<ReplicationMessage.Column> columns, Table tab
|
||||
}
|
||||
if (metadataInMessage) {
|
||||
final int localLength = column.length();
|
||||
final int incomingLength = message.getTypeMetadata().getLength().orElse(Column.UNSET_INT_VALUE);
|
||||
final int incomingLength = message.getTypeMetadata().getLength();
|
||||
if (localLength != incomingLength) {
|
||||
logger.info("detected new length for column '{}', old length was '{}', new length is '{}'; refreshing table schema", columnName, localLength,
|
||||
incomingLength);
|
||||
return true;
|
||||
}
|
||||
final int localScale = column.scale();
|
||||
final int incomingScale = message.getTypeMetadata().getScale().orElse(Column.UNSET_INT_VALUE);
|
||||
final int incomingScale = message.getTypeMetadata().getScale();
|
||||
if (localScale != incomingScale) {
|
||||
logger.info("detected new scale for column '{}', old scale was '{}', new scale is '{}'; refreshing table schema", columnName, localScale,
|
||||
incomingScale);
|
||||
@ -521,13 +521,8 @@ private Table tableFromFromMessage(List<ReplicationMessage.Column> columns, Tabl
|
||||
.type(type.getName())
|
||||
.optional(column.isOptional())
|
||||
.nativeType(type.getOid());
|
||||
TypeRegistry.reconcileJdbcOidTypeConstraints(type, columnEditor);
|
||||
if (column.getTypeMetadata().getLength().isPresent()) {
|
||||
columnEditor.length(column.getTypeMetadata().getLength().getAsInt());
|
||||
}
|
||||
if (column.getTypeMetadata().getScale().isPresent()) {
|
||||
columnEditor.scale(column.getTypeMetadata().getScale().getAsInt());
|
||||
}
|
||||
columnEditor.length(column.getTypeMetadata().getLength());
|
||||
columnEditor.scale(column.getTypeMetadata().getScale());
|
||||
return columnEditor.create();
|
||||
})
|
||||
.collect(Collectors.toList())
|
||||
|
@ -12,8 +12,6 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.relational.ColumnEditor;
|
||||
|
||||
/**
|
||||
* A registry of types supported by a PostgreSQL instance. Allows lookup of the types according to
|
||||
* type name or OID.
|
||||
@ -30,6 +28,9 @@ public class TypeRegistry {
|
||||
public static final String TYPE_NAME_GEOGRAPHY_ARRAY = "_geography";
|
||||
public static final String TYPE_NAME_GEOMETRY_ARRAY = "_geometry";
|
||||
|
||||
public static final int NO_TYPE_MODIFIER = -1;
|
||||
public static final int UNKNOWN_LENGTH = -1;
|
||||
|
||||
private static final Map<String, String> LONG_TYPE_NAMES = Collections.unmodifiableMap(getLongTypeNames());
|
||||
|
||||
private static Map<String, String> getLongTypeNames() {
|
||||
@ -218,31 +219,4 @@ public int geographyArrayOid() {
|
||||
public static String normalizeTypeName(String typeName) {
|
||||
return LONG_TYPE_NAMES.getOrDefault(typeName, typeName);
|
||||
}
|
||||
|
||||
/**
|
||||
* JDBC metadata are different for some of the unbounded types from those coming via decoder.
|
||||
* This method sets the type constraints to the values provided by JDBC metadata.
|
||||
*
|
||||
* @param type column type coming from decoder
|
||||
* @param columnEditor the JDBC counterpart of the column
|
||||
*/
|
||||
public static void reconcileJdbcOidTypeConstraints(PostgresType type,
|
||||
final ColumnEditor columnEditor) {
|
||||
switch (type.getName()) {
|
||||
case "money":
|
||||
// JDBC returns scale 0 but decoder plugin returns -1 (unscaled)
|
||||
columnEditor.scale(0);
|
||||
break;
|
||||
case "timestamp":
|
||||
// JDBC returns length/scale 29/6 but decoder plugin returns -1 (unlimited)
|
||||
columnEditor.length(29);
|
||||
columnEditor.scale(6);
|
||||
break;
|
||||
case "time":
|
||||
// JDBC returns length/scale 15/6 but decoder plugin returns -1 (unlimited)
|
||||
columnEditor.length(15);
|
||||
columnEditor.scale(6);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,6 @@
|
||||
*/
|
||||
package io.debezium.connector.postgresql.connection;
|
||||
|
||||
import java.util.OptionalInt;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -49,7 +48,7 @@ public static class TypeMetadataImpl implements ReplicationMessage.ColumnTypeMet
|
||||
|
||||
private String[] typeModifiers = {};
|
||||
|
||||
public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean optional) {
|
||||
public TypeMetadataImpl(String columnName, PostgresType type, String typeWithModifiers, boolean optional) {
|
||||
this.optional = optional;
|
||||
Matcher m = TYPE_PATTERN.matcher(typeWithModifiers);
|
||||
if (!m.matches()) {
|
||||
@ -62,12 +61,15 @@ public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean opt
|
||||
}
|
||||
|
||||
// TODO: make this more elegant/type-specific
|
||||
length = type.getDefaultLength();
|
||||
if (typeModifiers.length > 0) {
|
||||
try {
|
||||
this.length = Integer.parseInt(typeModifiers[0]);
|
||||
} catch (NumberFormatException e) {
|
||||
}
|
||||
}
|
||||
|
||||
scale = type.getDefaultScale();
|
||||
if (typeModifiers.length > 1) {
|
||||
try {
|
||||
this.scale = Integer.parseInt(typeModifiers[1]);
|
||||
@ -77,13 +79,13 @@ public TypeMetadataImpl(String columnName, String typeWithModifiers, boolean opt
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt getLength() {
|
||||
return length != null ? OptionalInt.of(length) : OptionalInt.empty();
|
||||
public int getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OptionalInt getScale() {
|
||||
return scale != null ? OptionalInt.of(scale) : OptionalInt.empty();
|
||||
public int getScale() {
|
||||
return scale;
|
||||
}
|
||||
|
||||
public String[] getModifiers() {
|
||||
@ -113,7 +115,7 @@ public AbstractReplicationMessageColumn(String columnName, PostgresType type, St
|
||||
|
||||
private void initMetadata() {
|
||||
assert hasMetadata : "Metadata not available";
|
||||
typeMetadata = new TypeMetadataImpl(columnName, typeWithModifiers, optional);
|
||||
typeMetadata = new TypeMetadataImpl(columnName, type, typeWithModifiers, optional);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -16,6 +16,8 @@
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.postgresql.core.BaseConnection;
|
||||
import org.postgresql.core.TypeInfo;
|
||||
import org.postgresql.replication.LogSequenceNumber;
|
||||
import org.postgresql.util.PSQLState;
|
||||
import org.slf4j.Logger;
|
||||
@ -270,12 +272,20 @@ protected int resolveNativeType(String typeName) {
|
||||
|
||||
private static TypeRegistry initTypeRegistry(Connection db, Map<String, Integer> nameToJdbc) {
|
||||
TypeRegistry.Builder typeRegistryBuilder = TypeRegistry.create();
|
||||
final TypeInfo typeInfo = ((BaseConnection)db).getTypeInfo();
|
||||
try {
|
||||
try (final Statement statement = db.createStatement()) {
|
||||
// Read non-array types
|
||||
try (final ResultSet rs = statement.executeQuery(SQL_NON_ARRAY_TYPES)) {
|
||||
while (rs.next()) {
|
||||
typeRegistryBuilder.addType(new PostgresType(rs.getString("name"), rs.getInt("oid"), nameToJdbc.get(rs.getString("name"))));
|
||||
final int oid = rs.getInt("oid");
|
||||
typeRegistryBuilder.addType(new PostgresType(
|
||||
rs.getString("name"),
|
||||
oid,
|
||||
nameToJdbc.get(rs.getString("name")),
|
||||
getSize(typeInfo, oid),
|
||||
getScale(typeInfo, oid)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@ -283,7 +293,15 @@ private static TypeRegistry initTypeRegistry(Connection db, Map<String, Integer>
|
||||
try (final ResultSet rs = statement.executeQuery(SQL_ARRAY_TYPES)) {
|
||||
while (rs.next()) {
|
||||
// int2vector and oidvector will not be treated as arrays
|
||||
typeRegistryBuilder.addType(new PostgresType(rs.getString("name"), rs.getInt("oid"), nameToJdbc.get(rs.getString("name")), typeRegistryBuilder.get(rs.getInt("element"))));
|
||||
final int oid = rs.getInt("oid");
|
||||
typeRegistryBuilder.addType(new PostgresType(
|
||||
rs.getString("name"),
|
||||
oid,
|
||||
nameToJdbc.get(rs.getString("name")),
|
||||
getSize(typeInfo, oid),
|
||||
getScale(typeInfo, oid),
|
||||
typeRegistryBuilder.get(rs.getInt("element"))
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -294,6 +312,18 @@ private static TypeRegistry initTypeRegistry(Connection db, Map<String, Integer>
|
||||
return typeRegistryBuilder.build();
|
||||
}
|
||||
|
||||
private static int getSize(final TypeInfo typeInfo, final int oid) {
|
||||
int size = typeInfo.getPrecision(oid, TypeRegistry.NO_TYPE_MODIFIER);
|
||||
if (size == 0) {
|
||||
size = typeInfo.getDisplaySize(oid, TypeRegistry.NO_TYPE_MODIFIER);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
private static int getScale(final TypeInfo typeInfo, final int oid) {
|
||||
return typeInfo.getScale(oid, TypeRegistry.NO_TYPE_MODIFIER);
|
||||
}
|
||||
|
||||
public TypeRegistry getTypeRegistry() {
|
||||
return typeRegistry;
|
||||
}
|
||||
|
@ -7,7 +7,6 @@
|
||||
package io.debezium.connector.postgresql.connection;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.OptionalInt;
|
||||
|
||||
import io.debezium.connector.postgresql.PostgresType;
|
||||
import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier;
|
||||
@ -47,8 +46,8 @@ public interface Column {
|
||||
}
|
||||
|
||||
public interface ColumnTypeMetadata {
|
||||
OptionalInt getLength();
|
||||
OptionalInt getScale();
|
||||
int getLength();
|
||||
int getScale();
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
Reference in New Issue
Block a user