[DBZ-1255,DBZ-1205] Fix for parsing dates with year larger than 9999. Avoid using nonoseconds and parse the time directly to OffsetDateTime.

This commit is contained in:
Ivan Luzyanin 2019-08-12 22:24:07 -07:00 committed by Gunnar Morling
parent ef30b8a8cf
commit fa10cc502d
7 changed files with 116 additions and 80 deletions

View File

@ -7,8 +7,8 @@
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
@ -26,7 +26,6 @@
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
import io.debezium.time.Conversions;
/**
* @author Chris Cranford
@ -51,14 +50,13 @@ public OffsetTime asOffsetTime() {
}
@Override
public long asTimestampWithTimeZone() {
return DateTimeFormat.get().timestampWithTimeZone(asString());
public OffsetDateTime asOffsetDateTime() {
return DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(asString());
}
@Override
public long asTimestampWithoutTimeZone() {
final LocalDateTime serverLocal = Conversions.fromNanosToLocalDateTimeUTC(DateTimeFormat.get().timestamp(asString()));
return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC));
public OffsetDateTime asOffsetDateTimeWithoutTimeZone() {
return DateTimeFormat.get().timestampToOffsetDateTime(asString(), ZoneOffset.UTC);
}
@Override

View File

@ -6,10 +6,7 @@
package io.debezium.connector.postgresql.connection;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.*;
import java.util.List;
import org.postgresql.geometric.PGbox;
@ -80,8 +77,8 @@ public interface ColumnValue<T> {
Double asDouble();
SpecialValueDecimal asDecimal();
LocalDate asLocalDate();
long asTimestampWithTimeZone();
long asTimestampWithoutTimeZone();
OffsetDateTime asOffsetDateTime();
OffsetDateTime asOffsetDateTimeWithoutTimeZone();
LocalTime asLocalTime();
OffsetTime asOffsetTime();
byte[] asByteArray();

View File

@ -102,11 +102,11 @@ public static Object resolveValue(String columnName, PostgresType type, String f
case "timestamp with time zone":
case "timestamptz":
return value.asTimestampWithTimeZone();
return value.asOffsetDateTime();
case "timestamp":
case "timestamp without time zone":
return value.asTimestampWithoutTimeZone();
return value.asOffsetDateTimeWithoutTimeZone();
case "time":
return value.asString();

View File

@ -5,25 +5,25 @@
*/
package io.debezium.connector.postgresql.connection.wal2json;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.format.SignStyle;
import java.time.format.TextStyle;
import java.time.temporal.ChronoField;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.time.NanoTimestamp;
/**
* Transformer for time/date related string representations in JSON messages coming from the wal2json plugin.
*
@ -31,9 +31,9 @@
*
*/
public interface DateTimeFormat {
public long timestamp(final String s);
public long timestampWithTimeZone(final String s);
public long systemTimestamp(final String s);
public OffsetDateTime timestampToOffsetDateTime(final String s, ZoneOffset offset);
public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(final String s);
public OffsetDateTime systemTimestampToOffsetDateTime(final String s);
public LocalDate date(final String s);
public LocalTime time(final String s);
public OffsetTime timeWithTimeZone(final String s);
@ -44,20 +44,33 @@ public static DateTimeFormat get() {
public static class ISODateTimeFormat implements DateTimeFormat {
private static final Logger LOGGER = LoggerFactory.getLogger(ISODateTimeFormat.class);
private static final String TS_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss[.S]";
// This formatter is similar to standard Java's ISO_LOCAL_DATE. But this one is
// using 'YEAR_OF_ERA + SignStyle.NEVER' instead of 'YEAR+SignStyle.EXCEEDS_PAD'
// to support ChronoField.ERA at the end of the date string.
private static final DateTimeFormatter NON_ISO_LOCAL_DATE = new DateTimeFormatterBuilder()
.appendValue(ChronoField.YEAR_OF_ERA, 4, 10, SignStyle.NEVER)
.appendLiteral('-')
.appendValue(ChronoField.MONTH_OF_YEAR, 2)
.appendLiteral('-')
.appendValue(ChronoField.DAY_OF_MONTH, 2)
.toFormatter();
private static final String TS_FORMAT_PATTERN_HINT = "y..y-MM-dd HH:mm:ss[.S]";
private static final DateTimeFormatter TS_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.append(NON_ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalStart()
.appendLiteral(" ")
.appendText(ChronoField.ERA, TextStyle.SHORT)
.optionalEnd()
.toFormatter();
private static final String TS_TZ_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss[.S]X";
private static final String TS_TZ_FORMAT_PATTERN_HINT = "y..y-MM-dd HH:mm:ss[.S]X";
private static final DateTimeFormatter TS_TZ_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.append(NON_ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.appendOffset("+HH:mm", "")
.optionalStart()
.appendLiteral(" ")
@ -65,10 +78,11 @@ public static class ISODateTimeFormat implements DateTimeFormat {
.optionalEnd()
.toFormatter();
private static final String SYSTEM_TS_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSSX";
private static final String SYSTEM_TS_FORMAT_PATTERN_HINT = "y..y-MM-dd HH:mm:ss.SSSSSSX";
private static final DateTimeFormatter SYSTEM_TS_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.append(NON_ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.appendOffset("+HH:mm", "Z")
.optionalStart()
.appendLiteral(" ")
@ -76,8 +90,14 @@ public static class ISODateTimeFormat implements DateTimeFormat {
.optionalEnd()
.toFormatter();
private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd[ GG]";
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern(DATE_FORMAT_PATTERN);
private static final String DATE_FORMAT_OPT_ERA_PATTERN_HINT = "y..y-MM-dd[ GG]";
private static final DateTimeFormatter DATE_FORMAT_OPT_ERA = new DateTimeFormatterBuilder()
.append(NON_ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral(' ')
.appendText(ChronoField.ERA, TextStyle.SHORT)
.optionalEnd()
.toFormatter();
private static final String TIME_FORMAT_PATTERN = "HH:mm:ss[.S]";
private static final DateTimeFormatter TIME_FORMAT = new DateTimeFormatterBuilder()
@ -89,26 +109,13 @@ public static class ISODateTimeFormat implements DateTimeFormat {
private static final String TIME_TZ_FORMAT_PATTERN = "HH:mm:ss[.S]X";
private static final DateTimeFormatter TIME_TZ_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.optionalStart()
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.optionalEnd()
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.appendOffset("+HH:mm", "")
.toFormatter();
@Override
public long timestamp(final String s) {
return format(TS_FORMAT_PATTERN, s, () -> NanoTimestamp.toEpochNanos(LocalDateTime.parse(s, TS_FORMAT), null));
}
@Override
public long timestampWithTimeZone(final String s) {
return formatTZ(TS_TZ_FORMAT_PATTERN, TS_TZ_FORMAT, s);
}
@Override
public LocalDate date(final String s) {
return format(DATE_FORMAT_PATTERN, s, () -> LocalDate.parse(s, DATE_FORMAT));
return format(DATE_FORMAT_OPT_ERA_PATTERN_HINT, s, () -> LocalDate.parse(s, DATE_FORMAT_OPT_ERA));
}
@Override
@ -121,11 +128,6 @@ public OffsetTime timeWithTimeZone(final String s) {
return format(TIME_TZ_FORMAT_PATTERN, s, () -> OffsetTime.parse(s, TIME_TZ_FORMAT)).withOffsetSameInstant(ZoneOffset.UTC);
}
@Override
public long systemTimestamp(final String s) {
return formatTZ(SYSTEM_TS_FORMAT_PATTERN, SYSTEM_TS_FORMAT, s);
}
private long formatTZ(final String pattern, final DateTimeFormatter formatter, final String s) {
return format(pattern, s, () -> {
final Instant ts = Instant.from(formatter.parse(s));
@ -141,5 +143,20 @@ private <T> T format(final String pattern, final String s, final Supplier<T> val
throw new ConnectException(e);
}
}
@Override
public OffsetDateTime timestampToOffsetDateTime(String s, ZoneOffset offset) {
return format(TS_FORMAT_PATTERN_HINT, s, () -> LocalDateTime.from(TS_FORMAT.parse(s)).atOffset(offset));
}
@Override
public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(String s) {
return format(TS_TZ_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(TS_TZ_FORMAT.parse(s)));
}
@Override
public OffsetDateTime systemTimestampToOffsetDateTime(String s) {
return format(SYSTEM_TS_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(SYSTEM_TS_FORMAT.parse(s)));
}
}
}

View File

@ -25,7 +25,6 @@
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
import io.debezium.time.Conversions;
/**
* A non-streaming version of JSON deserialization of a message sent by
@ -55,7 +54,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
final Document message = DocumentReader.floatNumbersAsTextReader().read(content);
final long txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
final Instant commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));
final Instant commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant();
final Array changes = message.getArray("change");
// WAL2JSON may send empty changes that still have a txid. These events are from things like vacuum,

View File

@ -21,7 +21,6 @@
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.time.Conversions;
/**
* <p>JSON deserialization of a message sent by
@ -146,7 +145,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
// Correct initial chunk
txId = message.getLong("xid");
final String timestamp = message.getString("timestamp");
commitTime = Conversions.toInstant(dateTime.systemTimestamp(timestamp));
commitTime = dateTime.systemTimestampToOffsetDateTime(timestamp).toInstant();
messageInProgress = true;
currentChunk = null;
}

View File

@ -8,8 +8,10 @@
import org.fest.assertions.Assertions;
import org.junit.Test;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.chrono.IsoEra;
@ -20,29 +22,52 @@ public class ISODateTimeFormatTest {
private static final String BCE_DISPLAY_NAME = IsoEra.BCE.getDisplayName(TextStyle.SHORT, Locale.getDefault());
@Test
public void testTimestamp() {
Assertions.assertThat(DateTimeFormat.get().timestamp("2016-11-04 13:51:30")).isEqualTo(1478267490_000_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestamp("2016-11-04 13:51:30.123")).isEqualTo(1478267490_123_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestamp("2016-11-04 13:51:30.123000")).isEqualTo(1478267490_123_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestamp("2016-11-04 13:51:30.123456")).isEqualTo(1478267490_123_456_000l);
Assertions.assertThat(DateTimeFormat.get().timestamp("2016-11-04 13:51:30.123456")).isEqualTo(1478267490_123_456_000l);
Assertions.assertThat(DateTimeFormat.get().timestamp("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME)).isEqualTo(-6829604178_871_345_152l);
public void testTimestampToOffsetDateTime() {
ZoneOffset offset = ZoneOffset.UTC;
ZoneOffset otherOffset = ZoneOffset.ofHoursMinutes(2, 30);
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123000", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("2016-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("0002-12-01 17:00:00 " + BCE_DISPLAY_NAME, offset))
.isEqualTo(OffsetDateTime.of(-1, 12, 1, 17, 0, 0, 0, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", offset))
.isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, offset));
Assertions.assertThat(DateTimeFormat.get().timestampToOffsetDateTime("20160-11-04 13:51:30.123456", otherOffset))
.isEqualTo(OffsetDateTime.of(20160, 11, 4, 13, 51, 30, 123_456_000, otherOffset));
}
@Test
public void testTimestampWithTimeZone() {
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30+02")).isEqualTo(1478260290_000_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30.123+02")).isEqualTo(1478260290_123_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30.123000+02")).isEqualTo(1478260290_123_000_000l);
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30.123789+02")).isEqualTo(1478260290_123_789_000l);
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30.123789+02:30")).isEqualTo(1478258490_123_789_000l);
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZone("2016-11-04 13:51:30.123789+02:30 " + BCE_DISPLAY_NAME)).isEqualTo(3399351806_090_650_312l);
public void testTimestampWithTimeZoneToOffsetTime() {
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30+02"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 0, ZoneOffset.ofHours(2)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30.123+02"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, ZoneOffset.ofHours(2)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30.123000+02"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_000_000, ZoneOffset.ofHours(2)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30.123789+02"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_789_000, ZoneOffset.ofHours(2)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30.123789+02:30"))
.isEqualTo(OffsetDateTime.of(2016, 11, 4, 13, 51, 30, 123_789_000, ZoneOffset.ofHoursMinutes(2, 30)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("2016-11-04 13:51:30.123789+02:30 " + BCE_DISPLAY_NAME))
.isEqualTo(OffsetDateTime.of(-2015, 11, 4, 13, 51, 30, 123_789_000, ZoneOffset.ofHoursMinutes(2, 30)));
Assertions.assertThat(DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime("20160-11-04 13:51:30.123789+02:30 " + BCE_DISPLAY_NAME))
.isEqualTo(OffsetDateTime.of(-20159, 11, 4, 13, 51, 30, 123_789_000, ZoneOffset.ofHoursMinutes(2, 30)));
}
@Test
public void testDate() {
Assertions.assertThat(DateTimeFormat.get().date("2016-11-04")).isEqualTo(LocalDate.of(2016, 11, 4));
Assertions.assertThat(DateTimeFormat.get().date("2016-11-04 " + BCE_DISPLAY_NAME)).isEqualTo(LocalDate.of(-2015, 11, 4));
Assertions.assertThat(DateTimeFormat.get().date("20160-11-04")).isEqualTo(LocalDate.of(20160, 11, 4));
Assertions.assertThat(DateTimeFormat.get().date("20160-11-04 " + BCE_DISPLAY_NAME)).isEqualTo(LocalDate.of(-20159, 11, 4));
Assertions.assertThat(DateTimeFormat.get().date("12345678-11-04")).isEqualTo(LocalDate.of(12345678, 11, 4));
}
@Test
@ -56,13 +81,14 @@ public void testTimeWithTimeZone() {
}
@Test
public void testSystemTimestamp() {
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2017-10-17 13:51:30Z")).isEqualTo(1508248290_000_000_000l);
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2017-10-17 13:51:30.000Z")).isEqualTo(1508248290_000_000_000l);
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2017-10-17 13:51:30.456Z")).isEqualTo(1508248290_456_000_000l);
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2017-10-17 13:51:30.345123Z")).isEqualTo(1508248290_345_123_000l);
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2018-03-22 12:30:56.824452+05:30")).isEqualTo(1521702056_824_452_000l);
Assertions.assertThat(DateTimeFormat.get().systemTimestamp("2018-03-22 12:30:56.824452+05")).isEqualTo(1521703856_824_452_000l);
public void testSystemTimestampToOffsetDateTime() {
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.000Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 0, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.456Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, Duration.ofMillis(456).getNano(), ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2017-10-17 13:51:30.345123Z")).isEqualTo(OffsetDateTime.of(2017, 10, 17, 13, 51, 30, 345_123_000, ZoneOffset.UTC));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05:30")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHoursMinutes(5, 30)));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("2018-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(2018, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)));
Assertions.assertThat(DateTimeFormat.get().systemTimestampToOffsetDateTime("20180-03-22 12:30:56.824452+05")).isEqualTo(OffsetDateTime.of(20180, 3, 22, 12, 30, 56, 824_452_000, ZoneOffset.ofHours(5)));
}
}