DBZ-1255 Using Instant for representing timestamps

This commit is contained in:
Gunnar Morling 2019-08-15 09:38:06 +02:00
parent fd5841ab6d
commit 94eb75257b
8 changed files with 51 additions and 55 deletions

View File

@ -6,11 +6,11 @@
package io.debezium.connector.postgresql.connection;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.geometric.PGbox;
@ -55,8 +55,8 @@ public OffsetDateTime asOffsetDateTime() {
}
@Override
public OffsetDateTime asOffsetDateTimeWithoutTimeZone() {
return DateTimeFormat.get().timestampToOffsetDateTime(asString(), ZoneOffset.UTC);
public Instant asInstant() {
return DateTimeFormat.get().timestampToInstant(asString());
}
@Override

View File

@ -82,7 +82,7 @@ public interface ColumnValue<T> {
SpecialValueDecimal asDecimal();
LocalDate asLocalDate();
OffsetDateTime asOffsetDateTime();
OffsetDateTime asOffsetDateTimeWithoutTimeZone();
Instant asInstant();
LocalTime asLocalTime();
OffsetTime asOffsetTime();
byte[] asByteArray();

View File

@ -106,7 +106,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f
case "timestamp":
case "timestamp without time zone":
return value.asOffsetDateTimeWithoutTimeZone();
return value.asInstant();
case "time":
return value.asString();

View File

@ -5,10 +5,6 @@
*/
package io.debezium.connector.postgresql.connection.wal2json;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@ -24,6 +20,10 @@
import java.time.temporal.ChronoField;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Transformer for time/date related string representations in JSON messages coming from the wal2json plugin.
*
@ -31,9 +31,9 @@
*
*/
public interface DateTimeFormat {
public OffsetDateTime timestampToOffsetDateTime(final String s, ZoneOffset offset);
public Instant timestampToInstant(final String s);
public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(final String s);
public OffsetDateTime systemTimestampToOffsetDateTime(final String s);
public Instant systemTimestampToInstant(final String s);
public LocalDate date(final String s);
public LocalTime time(final String s);
public OffsetTime timeWithTimeZone(final String s);
@ -128,13 +128,6 @@ public OffsetTime timeWithTimeZone(final String s) {
return format(TIME_TZ_FORMAT_PATTERN, s, () -> OffsetTime.parse(s, TIME_TZ_FORMAT)).withOffsetSameInstant(ZoneOffset.UTC);
}
private long formatTZ(final String pattern, final DateTimeFormatter formatter, final String s) {
return format(pattern, s, () -> {
final Instant ts = Instant.from(formatter.parse(s));
return ts.getEpochSecond() * 1_000_000_000 + ts.getNano();
});
}
private <T> T format(final String pattern, final String s, final Supplier<T> value) {
try {
return value.get();
@ -145,8 +138,8 @@ private <T> T format(final String pattern, final String s, final Supplier<T> val
}
@Override
public OffsetDateTime timestampToOffsetDateTime(String s, ZoneOffset offset) {
return format(TS_FORMAT_PATTERN_HINT, s, () -> LocalDateTime.from(TS_FORMAT.parse(s)).atOffset(offset));
public Instant timestampToInstant(String s) {
return format(TS_FORMAT_PATTERN_HINT, s, () -> LocalDateTime.from(TS_FORMAT.parse(s)).toInstant(ZoneOffset.UTC));
}
@Override
@ -155,8 +148,8 @@ public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(String s) {
}
@Override
public OffsetDateTime systemTimestampToOffsetDateTime(String s) {
return format(SYSTEM_TS_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(SYSTEM_TS_FORMAT.parse(s)));
public Instant systemTimestampToInstant(String s) {
return format(SYSTEM_TS_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(SYSTEM_TS_FORMAT.parse(s)).toInstant());
}
}
}

View File

@ -54,7 +54,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
final Document message = DocumentReader.floatNumbersAsTextReader().read(content);
final long txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
final Instant commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant();
final Instant commitTime = dateTime.systemTimestampToInstant(timestamp);
final Array changes = message.getArray("change");
// WAL2JSON may send empty changes that still have a txid. These events are from things like vacuum,

View File

@ -145,7 +145,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
// Correct initial chunk
txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant();
commitTime = dateTime.systemTimestampToInstant(timestamp);
messageInProgress = true;
currentChunk = null;
}
@ -193,7 +193,7 @@ else if (firstChar == RIGHT_BRACKET) {
* This issue is very hard to reproduce so a precaution is taken and metadata are filled with
* synthetic values.
* <p>The new wal2json format will be resilient to this situation.
*
*
* @param content
*/
protected void outOfOrderChunk(final byte[] content) {

View File

@ -5,9 +5,6 @@
*/
package io.debezium.connector.postgresql.connection.wal2json;
import org.fest.assertions.Assertions;
import org.junit.Test;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalTime;
@ -18,29 +15,29 @@
import java.time.format.TextStyle;
import java.util.Locale;
import org.fest.assertions.Assertions;
import org.junit.Test;
public class ISODateTimeFormatTest {
private static final String BCE_DISPLAY_NAME = IsoEra.BCE.getDisplayName(TextStyle.SHORT, Locale.getDefault());
@Test
public void testTimestampToOffsetDateTime() {
public void testTimestampToInstant() {
ZoneOffset offset = ZoneOffset.UTC;
ZoneOffset otherOffset = ZoneOffset.ofHoursMinutes(2, 30);
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123000", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME, offset))
.isEqualTo(OffsetDateTime.of(-1, 12, 1, 17, 0, 0, 0, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", otherOffset))
.isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, otherOffset));
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123000"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123456"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("2016-11-04 13:51:30.123456"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME))
.isEqualTo(OffsetDateTime.of(-1, 12, 1, 17, 0, 0, 0, offset).toInstant());
Assertions.assertThat(DateTimeFormat.get().timestampToInstant("20160-11-04 13:51:30.123456"))
.isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, offset).toInstant());
}
@Test
@ -81,14 +78,13 @@ public void testTimeWithTimeZone() {
}
@Test
public void testSystemTimestampToOffsetDateTime() {
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.000Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.456Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, Duration.ofMillis(456).getNano(), ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.345123Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 345_123_000, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05:30")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHoursMinutes(5, 30)));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("20180-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(20180, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)));
public void testSystemTimestampToInstant() {
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.000Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.456Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, Duration.ofMillis(456).getNano(), ZoneOffset.UTC).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2017-10-17 13:51:30.345123Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 345_123_000, ZoneOffset.UTC).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2018-03-22 12:30:56.824452+05:30")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHoursMinutes(5, 30)).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("2018-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)).toInstant());
Assertions.assertThat(DateTimeFormat.get().systemTimestampToInstant("20180-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(20180, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)).toInstant());
}
}

View File

@ -10,6 +10,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
@ -120,6 +121,12 @@ protected static LocalDateTime toLocalDateTime(Object obj) {
if ( obj == null ) {
return null;
}
if (obj instanceof OffsetDateTime) {
return ((OffsetDateTime) obj).toLocalDateTime();
}
if (obj instanceof Instant) {
return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime();
}
if ( obj instanceof LocalDateTime) {
return (LocalDateTime) obj;
}