DBZ-2790 Handle default values for date & time types

This commit is contained in:
Eric Slep 2021-03-20 19:10:52 +01:00 committed by Gunnar Morling
parent f88f135c43
commit d1917eeff6
3 changed files with 75 additions and 22 deletions

View File

@ -20,6 +20,8 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.util.PGmoney;
import org.postgresql.util.PSQLState;
@ -95,7 +97,7 @@ public PostgresConnection(Configuration config, Function<TypeRegistry, PostgresV
this.typeRegistry = new TypeRegistry(this);
final PostgresValueConverter valueConverter = valueConverterBuilder.apply(this.typeRegistry);
this.defaultValueConverter = new PostgresDefaultValueConverter(valueConverter);
this.defaultValueConverter = new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils());
}
}
@ -453,6 +455,15 @@ public Charset getDatabaseCharset() {
}
}
public TimestampUtils getTimestampUtils() {
try {
return ((PgConnection) this.connection()).getTimestampUtils();
}
catch (SQLException e) {
throw new DebeziumException("Couldn't get timestamp utils from underlying connection", e);
}
}
protected static void defaultSettings(Configuration.Builder builder) {
// we require Postgres 9.4 as the minimum server version since that's where logical replication was first introduced
builder.with("assumeMinServerVersion", "9.4");

View File

@ -6,16 +6,6 @@
package io.debezium.connector.postgresql.connection;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections;
@ -26,6 +16,19 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
/**
* Parses and converts column default values.
*/
@ -56,9 +59,9 @@ private interface DefaultValueMapper {
private final PostgresValueConverter valueConverters;
private final Map<String, DefaultValueMapper> defaultValueMappers;
PostgresDefaultValueConverter(PostgresValueConverter valueConverters) {
PostgresDefaultValueConverter(PostgresValueConverter valueConverters, TimestampUtils timestampUtils) {
this.valueConverters = valueConverters;
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers());
this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers(timestampUtils));
}
Optional<Object> parseDefaultValue(Column column, String defaultValue) {
@ -105,7 +108,7 @@ private Object convertDefaultValue(Object defaultValue, Column column) {
return defaultValue;
}
private static Map<String, DefaultValueMapper> createDefaultValueMappers() {
private static Map<String, DefaultValueMapper> createDefaultValueMappers(TimestampUtils timestampUtils) {
final Map<String, DefaultValueMapper> result = new HashMap<>();
result.put("bit", v -> {
@ -139,7 +142,13 @@ private static Map<String, DefaultValueMapper> createDefaultValueMappers() {
result.put("uuid", v -> UUID.fromString(extractDefault(v, "00000000-0000-0000-0000-000000000000"))); // Sample value: '76019d1a-ad2e-4b22-96e9-1a6d6543c818'::uuid
// Other data types, such as box, bytea, date, time and more are not handled.
result.put("date", v -> timestampUtils.toLocalDateTime(extractDefault(v, "1970-01-01")));
result.put("time", v -> timestampUtils.toLocalTime(extractDefault(v, "00:00")));
result.put("timestamp", v -> timestampUtils.toOffsetDateTime(extractDefault(v, "1970-01-01")));
result.put("timestamptz", v -> timestampUtils.toOffsetDateTime(extractDefault(v, "1970-01-01")));
result.put("interval", v -> new PGInterval(extractDefault(v, "epoch")));
// Other data types, such as box, bytea, and more are not handled.
return result;
}

View File

@ -17,9 +17,13 @@
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Decimal;
@ -391,14 +395,16 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
"char CHAR(10) default 'abcd', " +
"varchar VARCHAR(100) default 'abcde', " +
// cidr
// date
"date DATE default '2021-03-19'::date, " +
"date_func DATE default NOW()::date, " +
"double float8 default 123456789.1234567890123, " +
// inet
"integer INT default 2147483647, " +
"integer_func INT default ABS(-1), " +
"integer_func1 INT default ABS(-1), " +
"integer_func2 INT default DIV(2, 1), " +
"integer_opt INT, " +
// interval
"interval INTERVAL default INTERVAL '1 hour', " +
"interval_func1 INTERVAL default make_interval(hours := 1), " +
"json JSON default '{}', " +
"json_opt JSON, " +
"jsonb JSONB default '{}', " +
@ -416,10 +422,16 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
"smallint INT2 default 32767, " +
"text TEXT default 'asdf', " +
"text_func3 TEXT default concat('foo', 'bar', 'baz'), " +
// time
"time_hm TIME default '12:34'::time, " +
"time_hms TIME default '12:34:56'::time, " +
"time_func TIME default NOW()::time, " +
// time with time zone
// timestamp
// timestamp with time zone
"timestamp TIMESTAMP default '2021-03-20 13:44:28'::timestamp, " +
"timestamp_func TIMESTAMP default NOW()::timestamp, " +
"timestamp_opt TIMESTAMP, " +
"timestamptz TIMESTAMPTZ default '2021-03-20 14:44:28 +1'::timestamptz, " +
"timestamptz_func TIMESTAMPTZ default NOW()::timestamptz, " +
"timestamptz_opt TIMESTAMPTZ, " +
// tsquery
// tsvector
// txid_snapshot
@ -447,19 +459,40 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception {
assertColumnDefault("boolean", true, columns);
assertColumnDefault("char", "abcd", columns);
assertColumnDefault("varchar", "abcde", columns);
assertColumnDefault("date", (int) LocalDate.of(2021, 3, 19).toEpochDay(), columns);
assertColumnDefault("date_func", 0, columns);
assertColumnDefault("double", 123456789.1234567890123, columns);
assertColumnDefault("integer", 2147483647, columns);
assertColumnDefault("integer_func", 0, columns);
assertColumnDefault("integer_func1", 0, columns);
assertColumnDefault("integer_func2", 0, columns);
assertColumnDefault("integer_opt", null, columns);
assertColumnDefault("interval", TimeUnit.HOURS.toMicros(1), columns);
assertColumnDefault("interval_func1", 0L, columns);
assertColumnDefault("json", "{}", columns);
assertColumnDefault("json_opt", null, columns);
assertColumnDefault("jsonb", "{}", columns);
assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns);
assertColumnDefault("real", 1234567890.5f, columns);
assertColumnDefault("smallint", (short) 32767, columns);
assertColumnDefault("text", "asdf", columns);
assertColumnDefault("text_func3", "", columns);
assertColumnDefault("time_hm", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34).toSecondOfDay()), columns);
assertColumnDefault("time_hms", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34, 56).toSecondOfDay()), columns);
assertColumnDefault("time_func", 0L, columns);
assertColumnDefault("timestamp", TimeUnit.SECONDS.toMicros(1616247868), columns);
assertColumnDefault("timestamp_func", 0L, columns);
assertColumnDefault("timestamp_opt", null, columns);
assertColumnDefault("timestamptz", Instant.ofEpochSecond(1616247868).toString(), columns);
assertColumnDefault("timestamptz_func", Instant.ofEpochSecond(0).toString(), columns);
assertColumnDefault("timestamptz_opt", null, columns);
assertColumnDefault("uuid", "76019d1a-ad2e-4b22-96e9-1a6d6543c818", columns);
assertColumnDefault("uuid_func", "00000000-0000-0000-0000-000000000000", columns);
assertColumnDefault("uuid_opt", null, columns);