diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java index 8010d8318..b7b0fcced 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -20,6 +20,8 @@ import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.core.BaseConnection; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.TimestampUtils; import org.postgresql.replication.LogSequenceNumber; import org.postgresql.util.PGmoney; import org.postgresql.util.PSQLState; @@ -95,7 +97,7 @@ public PostgresConnection(Configuration config, Function defaultValueMappers; - PostgresDefaultValueConverter(PostgresValueConverter valueConverters) { + PostgresDefaultValueConverter(PostgresValueConverter valueConverters, TimestampUtils timestampUtils) { this.valueConverters = valueConverters; - this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers()); + this.defaultValueMappers = Collections.unmodifiableMap(createDefaultValueMappers(timestampUtils)); } Optional parseDefaultValue(Column column, String defaultValue) { @@ -105,7 +108,7 @@ private Object convertDefaultValue(Object defaultValue, Column column) { return defaultValue; } - private static Map createDefaultValueMappers() { + private static Map createDefaultValueMappers(TimestampUtils timestampUtils) { final Map result = new HashMap<>(); result.put("bit", v -> { @@ -139,7 +142,13 @@ private static Map createDefaultValueMappers() { result.put("uuid", v -> UUID.fromString(extractDefault(v, "00000000-0000-0000-0000-000000000000"))); // Sample value: '76019d1a-ad2e-4b22-96e9-1a6d6543c818'::uuid - // Other data types, such as box, bytea, date, time and more are not handled. + result.put("date", v -> timestampUtils.toLocalDateTime(extractDefault(v, "1970-01-01"))); + result.put("time", v -> timestampUtils.toLocalTime(extractDefault(v, "00:00"))); + result.put("timestamp", v -> timestampUtils.toOffsetDateTime(extractDefault(v, "1970-01-01"))); + result.put("timestamptz", v -> timestampUtils.toOffsetDateTime(extractDefault(v, "1970-01-01"))); + result.put("interval", v -> new PGInterval(extractDefault(v, "epoch"))); + + // Other data types, such as box, bytea, and more are not handled. return result; } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java index f40f07c80..97f8a5d68 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresSchemaIT.java @@ -17,9 +17,13 @@ import java.math.BigDecimal; import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.kafka.connect.data.Decimal; @@ -391,14 +395,16 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception { "char CHAR(10) default 'abcd', " + "varchar VARCHAR(100) default 'abcde', " + // cidr - // date + "date DATE default '2021-03-19'::date, " + + "date_func DATE default NOW()::date, " + "double float8 default 123456789.1234567890123, " + // inet "integer INT default 2147483647, " + - "integer_func INT default ABS(-1), " + + "integer_func1 INT default ABS(-1), " + "integer_func2 INT default DIV(2, 1), " + "integer_opt INT, " + - // interval + "interval INTERVAL default INTERVAL '1 hour', " + + "interval_func1 INTERVAL default make_interval(hours := 1), " + "json JSON default '{}', " + "json_opt JSON, " + "jsonb JSONB default '{}', " + @@ -416,10 +422,16 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception { "smallint INT2 default 32767, " + "text TEXT default 'asdf', " + "text_func3 TEXT default concat('foo', 'bar', 'baz'), " + - // time + "time_hm TIME default '12:34'::time, " + + "time_hms TIME default '12:34:56'::time, " + + "time_func TIME default NOW()::time, " + // time with time zone - // timestamp - // timestamp with time zone + "timestamp TIMESTAMP default '2021-03-20 13:44:28'::timestamp, " + + "timestamp_func TIMESTAMP default NOW()::timestamp, " + + "timestamp_opt TIMESTAMP, " + + "timestamptz TIMESTAMPTZ default '2021-03-20 14:44:28 +1'::timestamptz, " + + "timestamptz_func TIMESTAMPTZ default NOW()::timestamptz, " + + "timestamptz_opt TIMESTAMPTZ, " + // tsquery // tsvector // txid_snapshot @@ -447,19 +459,40 @@ public void shouldProperlyGetDefaultColumnValues() throws Exception { assertColumnDefault("boolean", true, columns); assertColumnDefault("char", "abcd", columns); assertColumnDefault("varchar", "abcde", columns); + + assertColumnDefault("date", (int) LocalDate.of(2021, 3, 19).toEpochDay(), columns); + assertColumnDefault("date_func", 0, columns); + assertColumnDefault("double", 123456789.1234567890123, columns); assertColumnDefault("integer", 2147483647, columns); - assertColumnDefault("integer_func", 0, columns); + assertColumnDefault("integer_func1", 0, columns); assertColumnDefault("integer_func2", 0, columns); assertColumnDefault("integer_opt", null, columns); + + assertColumnDefault("interval", TimeUnit.HOURS.toMicros(1), columns); + assertColumnDefault("interval_func1", 0L, columns); + assertColumnDefault("json", "{}", columns); assertColumnDefault("json_opt", null, columns); assertColumnDefault("jsonb", "{}", columns); + assertColumnDefault("numeric", new BigDecimal("12345.67891"), columns); assertColumnDefault("real", 1234567890.5f, columns); assertColumnDefault("smallint", (short) 32767, columns); + assertColumnDefault("text", "asdf", columns); assertColumnDefault("text_func3", "", columns); + + assertColumnDefault("time_hm", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34).toSecondOfDay()), columns); + assertColumnDefault("time_hms", TimeUnit.SECONDS.toMicros(LocalTime.of(12, 34, 56).toSecondOfDay()), columns); + assertColumnDefault("time_func", 0L, columns); + assertColumnDefault("timestamp", TimeUnit.SECONDS.toMicros(1616247868), columns); + assertColumnDefault("timestamp_func", 0L, columns); + assertColumnDefault("timestamp_opt", null, columns); + assertColumnDefault("timestamptz", Instant.ofEpochSecond(1616247868).toString(), columns); + assertColumnDefault("timestamptz_func", Instant.ofEpochSecond(0).toString(), columns); + assertColumnDefault("timestamptz_opt", null, columns); + assertColumnDefault("uuid", "76019d1a-ad2e-4b22-96e9-1a6d6543c818", columns); assertColumnDefault("uuid_func", "00000000-0000-0000-0000-000000000000", columns); assertColumnDefault("uuid_opt", null, columns);