diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java index f6bb54d4c..c57b53604 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/AbstractColumnValue.java @@ -6,11 +6,11 @@ package io.debezium.connector.postgresql.connection; import java.sql.SQLException; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; -import java.time.ZoneOffset; import org.apache.kafka.connect.errors.ConnectException; import org.postgresql.geometric.PGbox; @@ -55,8 +55,8 @@ public OffsetDateTime asOffsetDateTime() { } @Override - public OffsetDateTime asOffsetDateTimeWithoutTimeZone() { - return DateTimeFormat.get().timestampToOffsetDateTime(asString(), ZoneOffset.UTC); + public Instant asInstant() { + return DateTimeFormat.get().timestampToInstant(asString()); } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java index 067e45668..cea321437 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessage.java @@ -82,7 +82,7 @@ public interface ColumnValue { SpecialValueDecimal asDecimal(); LocalDate asLocalDate(); OffsetDateTime asOffsetDateTime(); - OffsetDateTime asOffsetDateTimeWithoutTimeZone(); + Instant asInstant(); LocalTime asLocalTime(); OffsetTime asOffsetTime(); byte[] asByteArray(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java index 79ce59a38..3e5106fc4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationMessageColumnValueResolver.java @@ -106,7 +106,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f case "timestamp": case "timestamp without time zone": - return value.asOffsetDateTimeWithoutTimeZone(); + return value.asInstant(); case "time": return value.asString(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/DateTimeFormat.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/DateTimeFormat.java index 04807b7bb..48a3fd117 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/DateTimeFormat.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/DateTimeFormat.java @@ -5,10 +5,6 @@ */ package io.debezium.connector.postgresql.connection.wal2json; -import org.apache.kafka.connect.errors.ConnectException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -24,6 +20,10 @@ import java.time.temporal.ChronoField; import java.util.function.Supplier; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Transformer for time/date related string representations in JSON messages coming from the wal2json plugin. * @@ -31,9 +31,9 @@ * */ public interface DateTimeFormat { - public OffsetDateTime timestampToOffsetDateTime(final String s, ZoneOffset offset); + public Instant timestampToInstant(final String s); public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(final String s); - public OffsetDateTime systemTimestampToOffsetDateTime(final String s); + public Instant systemTimestampToInstant(final String s); public LocalDate date(final String s); public LocalTime time(final String s); public OffsetTime timeWithTimeZone(final String s); @@ -128,13 +128,6 @@ public OffsetTime timeWithTimeZone(final String s) { return format(TIME_TZ_FORMAT_PATTERN, s, () -> OffsetTime.parse(s, TIME_TZ_FORMAT)).withOffsetSameInstant(ZoneOffset.UTC); } - private long formatTZ(final String pattern, final DateTimeFormatter formatter, final String s) { - return format(pattern, s, () -> { - final Instant ts = Instant.from(formatter.parse(s)); - return ts.getEpochSecond() * 1_000_000_000 + ts.getNano(); - }); - } - private T format(final String pattern, final String s, final Supplier value) { try { return value.get(); @@ -145,8 +138,8 @@ private T format(final String pattern, final String s, final Supplier val } @Override - public OffsetDateTime timestampToOffsetDateTime(String s, ZoneOffset offset) { - return format(TS_FORMAT_PATTERN_HINT, s, () -> LocalDateTime.from(TS_FORMAT.parse(s)).atOffset(offset)); + public Instant timestampToInstant(String s) { + return format(TS_FORMAT_PATTERN_HINT, s, () -> LocalDateTime.from(TS_FORMAT.parse(s)).toInstant(ZoneOffset.UTC)); } @Override @@ -155,8 +148,8 @@ public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(String s) { } @Override - public OffsetDateTime systemTimestampToOffsetDateTime(String s) { - return format(SYSTEM_TS_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(SYSTEM_TS_FORMAT.parse(s))); + public Instant systemTimestampToInstant(String s) { + return format(SYSTEM_TS_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(SYSTEM_TS_FORMAT.parse(s)).toInstant()); } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java index 9683b8dda..9d5d0307e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/NonStreamingWal2JsonMessageDecoder.java @@ -54,7 +54,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces final Document message = DocumentReader.floatNumbersAsTextReader().read(content); final long txId = message.getLong("xid"); final String timestamp = message.getString("timestamp"); - final Instant commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant(); + final Instant commitTime = dateTime.systemTimestampToInstant(timestamp); final Array changes = message.getArray("change"); // WAL2JSON may send empty changes that still have a txid. These events are from things like vacuum, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java index 7f8db3aae..fdc249d1c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/wal2json/StreamingWal2JsonMessageDecoder.java @@ -145,7 +145,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces // Correct initial chunk txId = message.getLong("xid"); final String timestamp = message.getString("timestamp"); - commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant(); + commitTime = dateTime.systemTimestampToInstant(timestamp); messageInProgress = true; currentChunk = null; } @@ -193,7 +193,7 @@ else if (firstChar == RIGHT_BRACKET) { * This issue is very hard to reproduce so a precaution is taken and metadata are filled with * synthetic values. *

The new wal2json format will be resilient to this situation. - * + * * @param content */ protected void outOfOrderChunk(final byte[] content) { diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/wal2json/ISODateTimeFormatTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/wal2json/ISODateTimeFormatTest.java index d4733e0ac..df1dbfc23 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/wal2json/ISODateTimeFormatTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/connection/wal2json/ISODateTimeFormatTest.java @@ -5,9 +5,6 @@ */ package io.debezium.connector.postgresql.connection.wal2json; -import org.fest.assertions.Assertions; -import org.junit.Test; - import java.time.Duration; import java.time.LocalDate; import java.time.LocalTime; @@ -18,29 +15,29 @@ import java.time.format.TextStyle; import java.util.Locale; +import org.fest.assertions.Assertions; +import org.junit.Test; + public class ISODateTimeFormatTest { private static final String BCE_DISPLAY_NAME = IsoEra.BCE.getDisplayName(TextStyle.SHORT, Locale.getDefault()); @Test - public void testTimestampToOffsetDateTime() { + public void testTimestampToInstant() { ZoneOffset offset = ZoneOffset.UTC; - ZoneOffset otherOffset = ZoneOffset.ofHoursMinutes(2, 30); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30", offset)) - .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123", offset)) - .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123000", offset)) - .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset)) - .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset)) - .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME, offset)) - .isEqualTo(OffsetDateTime.of(-1, 12, 1, 17, 0, 0, 0, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", offset)) - .isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, offset)); - Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", otherOffset)) - .isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, otherOffset)); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30")) + .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123")) + .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123000")) + .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123456")) + .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123456")) + .isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME)) + .isEqualTo(OffsetDateTime.of(-1, 12, 1, 17, 0, 0, 0, offset).toInstant()); + Assertions.assertThat(DateTimeFormat.get().timestampToInstant("20160-11-04 13:51:30.123456")) + .isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant()); } @Test @@ -81,14 +78,13 @@ public void testTimeWithTimeZone() { } @Test - public void testSystemTimestampToOffsetDateTime() { - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC)); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.000Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC)); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.456Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, Duration.ofMillis(456).getNano(), ZoneOffset.UTC)); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.345123Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 345_123_000, ZoneOffset.UTC)); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05:30")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHoursMinutes(5, 30))); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5))); - Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("20180-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(20180, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5))); + public void testSystemTimestampToInstant() { + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.000Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.456Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, Duration.ofMillis(456).getNano(), ZoneOffset.UTC).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.345123Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 345_123_000, ZoneOffset.UTC).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2018-03-22 12:30:56.824452+05:30")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHoursMinutes(5, 30)).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2018-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)).toInstant()); + Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("20180-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(20180, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)).toInstant()); } - } diff --git a/debezium-core/src/main/java/io/debezium/time/Conversions.java b/debezium-core/src/main/java/io/debezium/time/Conversions.java index be2e0b258..0d0dbe93e 100644 --- a/debezium-core/src/main/java/io/debezium/time/Conversions.java +++ b/debezium-core/src/main/java/io/debezium/time/Conversions.java @@ -10,6 +10,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.concurrent.TimeUnit; @@ -120,6 +121,12 @@ protected static LocalDateTime toLocalDateTime(Object obj) { if ( obj == null ) { return null; } + if (obj instanceof OffsetDateTime) { + return ((OffsetDateTime) obj).toLocalDateTime(); + } + if (obj instanceof Instant) { + return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime(); + } if ( obj instanceof LocalDateTime) { return (LocalDateTime) obj; }