DBZ-2790 Implement default values for postgres

Using the `SqlServerDefaultValueConverter` class as a starting point,
this commit implements default value support for postgres.
This commit is contained in:
Kevin Pullin 2020-11-24 12:10:24 -08:00 committed by Gunnar Morling
parent 9cc6239bc0
commit d2ab8baddf
5 changed files with 285 additions and 8 deletions

View File

@ -70,6 +70,7 @@ public class PostgresConnection extends JdbcConnection {
private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS = Duration.ofSeconds(2);
private final TypeRegistry typeRegistry;
private final PostgresDefaultValueConverter defaultValueConverter;
/**
* Creates a Postgres connection using the supplied configuration.
@ -82,6 +83,7 @@ public class PostgresConnection extends JdbcConnection {
public PostgresConnection(Configuration config, boolean provideTypeRegistry) {
super(config, FACTORY, PostgresConnection::validateServerVersion, PostgresConnection::defaultSettings);
this.typeRegistry = provideTypeRegistry ? new TypeRegistry(this) : null;
this.defaultValueConverter = new PostgresDefaultValueConverter(null);
}
/**
@ -507,12 +509,21 @@ protected Optional<ColumnEditor> readTableColumn(ResultSet columnMetadata, Table
column.scale(nativeType.getDefaultScale());
}
final String defaultValue = columnMetadata.getString(13);
if (defaultValue != null) {
getDefaultValue(column.create(), defaultValue).ifPresent(column::defaultValue);
}
return Optional.of(column);
}
return Optional.empty();
}
protected Optional<Object> getDefaultValue(Column column, String defaultValue) {
return defaultValueConverter.parseDefaultValue(column, defaultValue);
}
public TypeRegistry getTypeRegistry() {
Objects.requireNonNull(typeRegistry, "Connection does not provide type registry");
return typeRegistry;

View File

@ -0,0 +1,156 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
/**
* Parses and converts column default values.
*/
@ThreadSafe
class PostgresDefaultValueConverter {
private static Logger LOGGER = LoggerFactory.getLogger(PostgresDefaultValueConverter.class);
/**
* Converts JDBC string representation of a default column value to an object.
*/
@FunctionalInterface
private interface DefaultValueMapper {
/**
* Parses string to an object.
*
* @param value string representation
* @return value
* @throws Exception if there is an parsing error
*/
Object parse(String value) throws Exception;
}
private final PostgresValueConverter valueConverters;
private final Map<String, DefaultValueMapper> defaultValueMappers;
PostgresDefaultValueConverter(PostgresValueConverter valueConverters) {
this.valueConverters = valueConverters;
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers());
}
Optional<Object> parseDefaultValue(Column column, String defaultValue) {
final String dataType = column.typeName();
if (dataType.equals("serial") || dataType.equals("bigserial")) {
LOGGER.debug("Ignoring db generated default type '{}'", dataType);
return Optional.empty();
}
final DefaultValueMapper mapper = defaultValueMappers.get(dataType);
if (mapper == null) {
LOGGER.warn("Mapper for type '{}' not found.", dataType);
return Optional.empty();
}
try {
Object rawDefaultValue = mapper.parse(defaultValue);
Object convertedDefaultValue = convertDefaultValue(rawDefaultValue, column);
return Optional.of(convertedDefaultValue);
}
catch (Exception e) {
LOGGER.warn("Cannot parse column default value '{}' to type '{}'. Expression evaluation is not supported.", defaultValue, dataType);
LOGGER.debug("Parsing failed due to error", e);
return Optional.empty();
}
}
private Object convertDefaultValue(Object defaultValue, Column column) {
// if converters is not null and the default value is not null, we need to convert default value
if (valueConverters != null && defaultValue != null) {
final SchemaBuilder schemaBuilder = valueConverters.schemaBuilder(column);
if (schemaBuilder == null) {
return defaultValue;
}
final Schema schema = schemaBuilder.build();
// In order to get the valueConverter for this column, we have to create a field;
// The index value -1 in the field will never used when converting default value;
// So we can set any number here;
final Field field = new Field(column.name(), -1, schema);
final ValueConverter valueConverter = valueConverters.converter(column, field);
Object result = valueConverter.convert(defaultValue);
if ((result instanceof BigDecimal) && column.scale().isPresent() && column.scale().get() > ((BigDecimal) result).scale()) {
// Note that as the scale is increased only, the rounding is more cosmetic.
result = ((BigDecimal) result).setScale(column.scale().get(), RoundingMode.HALF_EVEN);
}
return result;
}
return defaultValue;
}
private Map<String, DefaultValueMapper> createDefaultValueMappers() {
final Map<String, DefaultValueMapper> result = new HashMap<>();
result.put("bit", v -> {
String defaultValue = extractDefault(v);
if (defaultValue.length() == 1) {
// treat as a bool
return "1".equals(defaultValue);
}
return defaultValue;
}); // Sample values: `B'1'::"bit"`, `B'11'::"bit"`
result.put("varbit", v -> extractDefault(v)); // Sample value: B'110'::"bit"
result.put("bool", v -> Boolean.parseBoolean(extractDefault(v))); // Sample value: true
result.put("bpchar", v -> extractDefault(v)); // Sample value: 'abcd'::bpchar
result.put("varchar", v -> extractDefault(v)); // Sample value: `abcde'::character varying
result.put("text", v -> extractDefault(v)); // Sample value: 'asdf'::text
result.put("numeric", v -> new BigDecimal(v)); // Sample value: 12345.67891
result.put("float4", v -> Float.parseFloat(extractDefault(v))); // Sample value: 1.234
result.put("float8", v -> Double.parseDouble(extractDefault(v))); // Sample values: `1.234`, `'12345678901234567890'::numeric`
result.put("int2", v -> Short.parseShort(extractDefault(v))); // Sample value: 32767
result.put("int4", v -> Integer.parseInt(v)); // Sample value: 123
result.put("int8", v -> Long.parseLong(extractDefault(v))); // Sample values: `123`, `'9223372036854775807'::bigint`
result.put("json", v -> extractDefault(v)); // Sample value: '{}'::json
result.put("jsonb", v -> extractDefault(v)); // Sample value: '{}'::jsonb
result.put("xml", v -> extractDefault(v)); // Sample value: '<foo>bar</foo>'::xml
// Other data types, such as box, bytea, date, time and more are not handled.
return result;
}
private String extractDefault(String defaultValue) {
// Values are either "raw", such as `1234`, or "type casted", such as `'9223372036854775807'::bigint`.
// If the value does NOT contain a single quote it is assumed to be a raw value. Otherwise the value is
// extracted from inside the single quotes.
if (!defaultValue.contains("'")) {
return defaultValue;
}
final Matcher matcher = Pattern.compile("'(.*?)'").matcher(defaultValue);
matcher.find();
return matcher.group(1);
}
}

View File

@ -6,7 +6,6 @@
package io.debezium.connector.postgresql;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.Duration;
@ -49,8 +48,7 @@ public void retryOnFailureToCreateConnection() throws Exception {
postgresConnectorTask.createReplicationConnection(new FakeContext(config, new PostgresSchema(
config,
null,
Charset.forName("UTF-8"),
PostgresTopicSelector.create(config))), true, true, 3, Duration.ofSeconds(2));
PostgresTopicSelector.create(config), null)), true, true, 3, Duration.ofSeconds(2));
// Verify retry happened for 10 seconds
long endTime = System.currentTimeMillis();

View File

@ -10,12 +10,15 @@
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_BLACKLIST;
import static io.debezium.relational.RelationalDatabaseConnectorConfig.SCHEMA_EXCLUDE_LIST;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Decimal;
@ -40,6 +43,7 @@
import io.debezium.doc.FixFor;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -370,6 +374,91 @@ public void shouldPopulateToastableColumnsCache() throws Exception {
}
}
@Test
public void shouldProperlyGetDefaultColumnValues() throws Exception {
String ddl = "DROP TABLE IF EXISTS default_column_test; CREATE TABLE default_column_test (" +
"pk SERIAL, " +
"bigint BIGINT default 9223372036854775807, " +
"bit_as_boolean BIT(1) default B'1', " +
"bit BIT(2) default B'11', " +
"varbit VARBIT(5) default B'110', " +
"boolean BOOLEAN not null default TRUE, " +
// box
// bytea
"char CHAR(10) default 'abcd', " +
"varchar VARCHAR(100) default 'abcde', " +
// cidr
// date
"double float8 default 123456789.1234567890123, " +
// inet
"integer INT default 2147483647, " +
// interval
"json JSON default '{}', " +
"jsonb JSONB default '{}', " +
// line
// lseg
// macaddr
// macaddr8
// money
"numeric NUMERIC(10, 5) default 12345.67891, " +
// path
// pg_lsn
// point
// polygon
"real FLOAT4 default 1234567890.5, " +
"smallint INT2 default 32767, " +
"text TEXT default 'asdf', " +
// time
// time with time zone
// timestamp
// timestamp with time zone
// tsquery
// tsvector
// txid_snapshot
// uuid
"xml XML default '<foo>bar</foo>'" +
");";
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
schema = TestHelper.getSchema(config);
try (PostgresConnection connection = TestHelper.createWithTypeRegistry()) {
connection.execute(ddl);
schema.refresh(connection, false);
List<Column> columns = tableFor("public.default_column_test").columns();
assertColumnDefault("bigint", 9223372036854775807l, columns);
assertColumnDefault("bit_as_boolean", true, columns);
assertColumnDefault("bit", new byte[]{ 3 }, columns);
assertColumnDefault("varbit", new byte[]{ 6 }, columns);
assertColumnDefault("boolean", true, columns);
assertColumnDefault("char", "abcd", columns);
assertColumnDefault("varchar", "abcde", columns);
assertColumnDefault("double", 123456789.1234567890123, columns);
assertColumnDefault("integer", 2147483647, columns);
assertColumnDefault("json", "{}", columns);
assertColumnDefault("jsonb", "{}", columns);
assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns);
assertColumnDefault("real", 1234567890.5f, columns);
assertColumnDefault("smallint", (short) 32767, columns);
assertColumnDefault("text", "asdf", columns);
assertColumnDefault("xml", "<foo>bar</foo>", columns);
}
}
private void assertColumnDefault(String columnName, Object expectedDefault, List<Column> columns) {
Column column = columns.stream().filter(c -> c.name().equals(columnName)).findFirst().get();
if (expectedDefault instanceof byte[]) {
byte[] expectedBytes = (byte[]) expectedDefault;
byte[] defaultBytes = (byte[]) column.defaultValue();
assertArrayEquals(expectedBytes, defaultBytes);
}
else {
assertTrue(column.defaultValue().equals(expectedDefault));
}
}
protected void assertKeySchema(String fullyQualifiedTableName, String fields, Schema... types) {
TableSchema tableSchema = schemaFor(fullyQualifiedTableName);
Schema keySchema = tableSchema.keySchema();

View File

@ -17,6 +17,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@ -117,7 +118,13 @@ public static PostgresConnection create() {
* @return the PostgresConnection instance; never null
*/
public static PostgresConnection createWithTypeRegistry() {
return new PostgresConnection(defaultJdbcConfig(), true);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
TypeRegistry typeRegistry = new TypeRegistry();
return new PostgresConnection(
defaultJdbcConfig(),
typeRegistry,
getPostgresValueConverter(typeRegistry, config));
}
/**
@ -185,9 +192,11 @@ public static void dropAllSchemas() throws SQLException {
}
public static TypeRegistry getTypeRegistry() {
try (final PostgresConnection connection = new PostgresConnection(defaultJdbcConfig(), true)) {
return connection.getTypeRegistry();
TypeRegistry typeRegistry = new TypeRegistry();
try (final PostgresConnection connection = new PostgresConnection(defaultJdbcConfig(), typeRegistry, null)) {
// Creating the connection primes the type registry.
}
return typeRegistry;
}
public static PostgresSchema getSchema(PostgresConnectorConfig config) {
@ -198,8 +207,8 @@ public static PostgresSchema getSchema(PostgresConnectorConfig config, TypeRegis
return new PostgresSchema(
config,
typeRegistry,
Charset.forName("UTF-8"),
PostgresTopicSelector.create(config));
PostgresTopicSelector.create(config),
getPostgresValueConverter(typeRegistry, config));
}
protected static Set<String> schemaNames() throws SQLException {
@ -362,4 +371,18 @@ private static List<String> getOpenIdleTransactions(PostgresConnection connectio
});
}
private static PostgresValueConverter getPostgresValueConverter(TypeRegistry typeRegistry, PostgresConnectorConfig config) {
return new PostgresValueConverter(
Charset.forName("UTF-8"),
config.getDecimalMode(),
config.getTemporalPrecisionMode(),
ZoneOffset.UTC,
null,
config.includeUnknownDatatypes(),
typeRegistry,
config.hStoreHandlingMode(),
config.binaryHandlingMode(),
config.intervalHandlingMode(),
config.toastedValuePlaceholder());
}
}