diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index d625ea321..d88f02916 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -70,6 +70,7 @@ public class PostgresConnection extends JdbcConnection { private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS = Duration.ofSeconds(2); private final TypeRegistry typeRegistry; + private final PostgresDefaultValueConverter defaultValueConverter; /** * Creates a Postgres connection using the supplied configuration. @@ -82,6 +83,7 @@ public class PostgresConnection extends JdbcConnection { public PostgresConnection(Configuration config, boolean provideTypeRegistry) { super(config, FACTORY, PostgresConnection::validateServerVersion, PostgresConnection::defaultSettings); this.typeRegistry = provideTypeRegistry ? new TypeRegistry(this) : null; + this.defaultValueConverter = new PostgresDefaultValueConverter(null); } /** @@ -507,12 +509,21 @@ protected Optional readTableColumn(ResultSet columnMetadata, Table column.scale(nativeType.getDefaultScale()); } + final String defaultValue = columnMetadata.getString(13); + if (defaultValue != null) { + getDefaultValue(column.create(), defaultValue).ifPresent(column::defaultValue); + } + return Optional.of(column); } return Optional.empty(); } + protected Optional getDefaultValue(Column column, String defaultValue) { + return defaultValueConverter.parseDefaultValue(column, defaultValue); + } + public TypeRegistry getTypeRegistry() { Objects.requireNonNull(typeRegistry, "Connection does not provide type registry"); return typeRegistry; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java new file mode 100644 index 000000000..972ce2852 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresDefaultValueConverter.java @@ -0,0 +1,156 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.annotation.ThreadSafe; +import io.debezium.connector.postgresql.PostgresValueConverter; +import io.debezium.relational.Column; +import io.debezium.relational.ValueConverter; + +/** + * Parses and converts column default values. + */ +@ThreadSafe +class PostgresDefaultValueConverter { + + private static Logger LOGGER = LoggerFactory.getLogger(PostgresDefaultValueConverter.class); + + /** + * Converts JDBC string representation of a default column value to an object. + */ + @FunctionalInterface + private interface DefaultValueMapper { + + /** + * Parses string to an object. + * + * @param value string representation + * @return value + * @throws Exception if there is an parsing error + */ + Object parse(String value) throws Exception; + } + + private final PostgresValueConverter valueConverters; + private final Map defaultValueMappers; + + PostgresDefaultValueConverter(PostgresValueConverter valueConverters) { + this.valueConverters = valueConverters; + this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers()); + } + + Optional parseDefaultValue(Column column, String defaultValue) { + final String dataType = column.typeName(); + + if (dataType.equals("serial") || dataType.equals("bigserial")) { + LOGGER.debug("Ignoring db generated default type '{}'", dataType); + return Optional.empty(); + } + + final DefaultValueMapper mapper = defaultValueMappers.get(dataType); + if (mapper == null) { + LOGGER.warn("Mapper for type '{}' not found.", dataType); + return Optional.empty(); + } + + try { + Object rawDefaultValue = mapper.parse(defaultValue); + Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column); + return Optional.of(convertedDefaultValue); + } + catch (Exception e) { + LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValue, dataType); + LOGGER.debug("Parsing failed due to error", e); + return Optional.empty(); + } + } + + private Object convertDefaultValue(Object defaultValue, Column column) { + // if converters is not null and the default value is not null, we need to convert default value + if (valueConverters != null && defaultValue != null) { + final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column); + if (schemaBuilder == null) { + return defaultValue; + } + final Schema schema = schemaBuilder.build(); + // In order to get the valueConverter for this column, we have to create a field; + // The index value -1 in the field will never used when converting default value; + // So we can set any number here; + final Field field = new Field(column.name(), -1, schema); + final ValueConverter valueConverter = valueConverters.converter(column, field); + Object result = valueConverter.convert(defaultValue); + if ((result instanceof BigDecimal) && column.scale().isPresent() && column.scale().get() > ((BigDecimal) result).scale()) { + // Note that as the scale is increased only, the rounding is more cosmetic. + result = ((BigDecimal) result).setScale(column.scale().get(), RoundingMode.HALF_EVEN); + } + return result; + } + return defaultValue; + } + + private Map createDefaultValueMappers() { + final Map result = new HashMap<>(); + + result.put("bit", v -> { + String defaultValue = extractDefault(v); + if (defaultValue.length() == 1) { + // treat as a bool + return "1".equals(defaultValue); + } + return defaultValue; + }); // Sample values: `B'1'::"bit"`, `B'11'::"bit"` + result.put("varbit", v -> extractDefault(v)); // Sample value: B'110'::"bit" + + result.put("bool", v -> Boolean.parseBoolean(extractDefault(v))); // Sample value: true + + result.put("bpchar", v -> extractDefault(v)); // Sample value: 'abcd'::bpchar + result.put("varchar", v -> extractDefault(v)); // Sample value: `abcde'::character varying + result.put("text", v -> extractDefault(v)); // Sample value: 'asdf'::text + + result.put("numeric", v -> new BigDecimal(v)); // Sample value: 12345.67891 + result.put("float4", v -> Float.parseFloat(extractDefault(v))); // Sample value: 1.234 + result.put("float8", v -> Double.parseDouble(extractDefault(v))); // Sample values: `1.234`, `'12345678901234567890'::numeric` + result.put("int2", v -> Short.parseShort(extractDefault(v))); // Sample value: 32767 + result.put("int4", v -> Integer.parseInt(v)); // Sample value: 123 + result.put("int8", v -> Long.parseLong(extractDefault(v))); // Sample values: `123`, `'9223372036854775807'::bigint` + + result.put("json", v -> extractDefault(v)); // Sample value: '{}'::json + result.put("jsonb", v -> extractDefault(v)); // Sample value: '{}'::jsonb + result.put("xml", v -> extractDefault(v)); // Sample value: 'bar'::xml + + // Other data types, such as box, bytea, date, time and more are not handled. + return result; + } + + private String extractDefault(String defaultValue) { + // Values are either "raw", such as `1234`, or "type casted", such as `'9223372036854775807'::bigint`. + // If the value does NOT contain a single quote it is assumed to be a raw value. Otherwise the value is + // extracted from inside the single quotes. + if (!defaultValue.contains("'")) { + return defaultValue; + } + + final Matcher matcher = Pattern.compile("'(.*?)'").matcher(defaultValue); + matcher.find(); + return matcher.group(1); + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java index 3d6da2ec6..197edf5df 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorTaskIT.java @@ -6,7 +6,6 @@ package io.debezium.connector.postgresql; -import java.nio.charset.Charset; import java.sql.SQLException; import java.time.Duration; @@ -49,8 +48,7 @@ public void retryOnFailureToCreateConnection() throws Exception { postgresConnectorTask.createReplicationConnection(new FakeContext(config, new PostgresSchema( config, null, - Charset.forName("UTF-8"), - PostgresTopicSelector.create(config))), true, true, 3, Duration.ofSeconds(2)); + PostgresTopicSelector.create(config), null)), true, true, 3, Duration.ofSeconds(2)); // Verify retry happened for 10 seconds long endTime = System.currentTimeMillis(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java index daa661225..2f373d5d8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java @@ -10,12 +10,15 @@ import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_BLACKLIST; import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST; import static org.fest.assertions.Assertions.assertThat; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.math.BigDecimal; import java.sql.SQLException; import java.util.Arrays; +import java.util.List; import java.util.stream.IntStream; import org.apache.kafka.connect.data.Decimal; @@ -40,6 +43,7 @@ import io.debezium.doc.FixFor; import io.debezium.junit.SkipTestRule; import io.debezium.junit.SkipWhenDatabaseVersion; +import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; @@ -370,6 +374,91 @@ public void shouldPopulateToastableColumnsCache() throws Exception { } } + @Test + public void shouldProperlyGetDefaultColumnValues() throws Exception { + String ddl = "DROP TABLE IF EXISTS default_column_test; CREATE TABLE default_column_test (" + + "pk SERIAL, " + + "bigint BIGINT default 9223372036854775807, " + + "bit_as_boolean BIT(1) default B'1', " + + "bit BIT(2) default B'11', " + + "varbit VARBIT(5) default B'110', " + + "boolean BOOLEAN not null default TRUE, " + + // box + // bytea + "char CHAR(10) default 'abcd', " + + "varchar VARCHAR(100) default 'abcde', " + + // cidr + // date + "double float8 default 123456789.1234567890123, " + + // inet + "integer INT default 2147483647, " + + // interval + "json JSON default '{}', " + + "jsonb JSONB default '{}', " + + // line + // lseg + // macaddr + // macaddr8 + // money + "numeric NUMERIC(10, 5) default 12345.67891, " + + // path + // pg_lsn + // point + // polygon + "real FLOAT4 default 1234567890.5, " + + "smallint INT2 default 32767, " + + "text TEXT default 'asdf', " + + // time + // time with time zone + // timestamp + // timestamp with time zone + // tsquery + // tsvector + // txid_snapshot + // uuid + "xml XML default 'bar'" + + ");"; + + PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); + schema = TestHelper.getSchema(config); + + try (PostgresConnection connection = TestHelper.createWithTypeRegistry()) { + connection.execute(ddl); + schema.refresh(connection, false); + + List columns = tableFor("public.default_column_test").columns(); + assertColumnDefault("bigint", 9223372036854775807l, columns); + assertColumnDefault("bit_as_boolean", true, columns); + assertColumnDefault("bit", new byte[]{ 3 }, columns); + assertColumnDefault("varbit", new byte[]{ 6 }, columns); + assertColumnDefault("boolean", true, columns); + assertColumnDefault("char", "abcd", columns); + assertColumnDefault("varchar", "abcde", columns); + assertColumnDefault("double", 123456789.1234567890123, columns); + assertColumnDefault("integer", 2147483647, columns); + assertColumnDefault("json", "{}", columns); + assertColumnDefault("jsonb", "{}", columns); + assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns); + assertColumnDefault("real", 1234567890.5f, columns); + assertColumnDefault("smallint", (short) 32767, columns); + assertColumnDefault("text", "asdf", columns); + assertColumnDefault("xml", "bar", columns); + } + } + + private void assertColumnDefault(String columnName, Object expectedDefault, List columns) { + Column column = columns.stream().filter(c -> c.name().equals(columnName)).findFirst().get(); + + if (expectedDefault instanceof byte[]) { + byte[] expectedBytes = (byte[]) expectedDefault; + byte[] defaultBytes = (byte[]) column.defaultValue(); + assertArrayEquals(expectedBytes, defaultBytes); + } + else { + assertTrue(column.defaultValue().equals(expectedDefault)); + } + } + protected void assertKeySchema(String fullyQualifiedTableName, String fields, Schema... types) { TableSchema tableSchema = schemaFor(fullyQualifiedTableName); Schema keySchema = tableSchema.keySchema(); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java index d24512b1d..c45143e5b 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/TestHelper.java @@ -17,6 +17,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -117,7 +118,13 @@ public static PostgresConnection create() { * @return the PostgresConnection instance; never null */ public static PostgresConnection createWithTypeRegistry() { - return new PostgresConnection(defaultJdbcConfig(), true); + PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build()); + TypeRegistry typeRegistry = new TypeRegistry(); + + return new PostgresConnection( + defaultJdbcConfig(), + typeRegistry, + getPostgresValueConverter(typeRegistry, config)); } /** @@ -185,9 +192,11 @@ public static void dropAllSchemas() throws SQLException { } public static TypeRegistry getTypeRegistry() { - try (final PostgresConnection connection = new PostgresConnection(defaultJdbcConfig(), true)) { - return connection.getTypeRegistry(); + TypeRegistry typeRegistry = new TypeRegistry(); + try (final PostgresConnection connection = new PostgresConnection(defaultJdbcConfig(), typeRegistry, null)) { + // Creating the connection primes the type registry. } + return typeRegistry; } public static PostgresSchema getSchema(PostgresConnectorConfig config) { @@ -198,8 +207,8 @@ public static PostgresSchema getSchema(PostgresConnectorConfig config, TypeRegis return new PostgresSchema( config, typeRegistry, - Charset.forName("UTF-8"), - PostgresTopicSelector.create(config)); + PostgresTopicSelector.create(config), + getPostgresValueConverter(typeRegistry, config)); } protected static Set schemaNames() throws SQLException { @@ -362,4 +371,18 @@ private static List getOpenIdleTransactions(PostgresConnection connectio }); } + private static PostgresValueConverter getPostgresValueConverter(TypeRegistry typeRegistry, PostgresConnectorConfig config) { + return new PostgresValueConverter( + Charset.forName("UTF-8"), + config.getDecimalMode(), + config.getTemporalPrecisionMode(), + ZoneOffset.UTC, + null, + config.includeUnknownDatatypes(), + typeRegistry, + config.hStoreHandlingMode(), + config.binaryHandlingMode(), + config.intervalHandlingMode(), + config.toastedValuePlaceholder()); + } }