DBZ-DBZ-1255 Preventing value overflows for TIMESTAMP;

* Generally avoiding conversion to nano-seconds to accomodate for larger values
* Generally avoiding the usage of long to convey temporal values internally:
 * Instant is used for TIMESTAMP
 * OffsetDateTime for TIMESTAMPTZ
 * Duration for TIME (LocalTime cannot be used as it doesn't support 24:00:00 as possible with Postgres)
 * OffsetTime for TIMETZ
* Avoiding usage of methods under test for calculating expected values in tests
This commit is contained in:
Gunnar Morling 2019-08-19 14:39:09 +02:00
parent 94eb75257b
commit 08ef1418d9
15 changed files with 231 additions and 259 deletions

View File

@ -9,14 +9,10 @@
import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
@ -303,7 +299,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case PgOid.INTERVAL:
return data -> convertInterval(column, fieldDefn, data);
case PgOid.TIME:
return data -> convertTwentyFourHourTime(column, fieldDefn, data);
return data -> convertTime(column, fieldDefn, data);
case PgOid.TIMESTAMP:
return ((ValueConverter) (data -> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
case PgOid.TIMESTAMPTZ:
@ -432,6 +428,15 @@ private ValueConverter createArrayConverter(Column column, Field fieldDefn) {
return data -> convertArray(column, fieldDefn, elementConverter, data);
}
@Override
protected Object convertTime(Column column, Field fieldDefn, Object data) {
if (data instanceof String) {
data = Strings.asDuration((String) data);
}
return super.convertTime(column, fieldDefn, data);
}
protected Object convertDecimal(Column column, Field fieldDefn, Object data, DecimalMode mode) {
SpecialValueDecimal value;
BigDecimal newDecimal;
@ -599,45 +604,9 @@ protected Object convertInterval(Column column, Field fieldDefn, Object data) {
});
}
@Override
protected Object convertTimestampToEpochMillis(Column column, Field fieldDefn, Object data) {
if (data instanceof Long) {
data = nanosToLocalDateTimeUTC((Long) data);
}
return super.convertTimestampToEpochMillis(column, fieldDefn, data);
}
@Override
protected Object convertTimestampToEpochMicros(Column column, Field fieldDefn, Object data) {
if (data instanceof Long) {
data = nanosToLocalDateTimeUTC((Long) data);
}
return super.convertTimestampToEpochMicros(column, fieldDefn, data);
}
@Override
protected Object convertTimestampToEpochNanos(Column column, Field fieldDefn, Object data) {
if (data instanceof Long) {
data = nanosToLocalDateTimeUTC((Long) data);
}
return super.convertTimestampToEpochNanos(column, fieldDefn, data);
}
@Override
protected Object convertTimestampToEpochMillisAsDate(Column column, Field fieldDefn, Object data) {
if (data instanceof Long) {
data = nanosToLocalDateTimeUTC((Long) data);
}
return super.convertTimestampToEpochMillisAsDate(column, fieldDefn, data);
}
@Override
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
if (data instanceof Long) {
LocalDateTime localDateTime = nanosToLocalDateTimeUTC((Long) data);
data = OffsetDateTime.of(localDateTime, ZoneOffset.UTC);
}
else if (data instanceof java.util.Date) {
if (data instanceof java.util.Date) {
// any Date like subclasses will be given to us by the JDBC driver, which uses the local VM TZ, so we need to go
// back to GMT
data = OffsetDateTime.ofInstant(((Date) data).toInstant(), ZoneOffset.UTC);
@ -648,13 +617,8 @@ else if (data instanceof java.util.Date) {
@Override
protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data) {
// during streaming
if (data instanceof Long) {
LocalTime localTime = LocalTime.ofNanoOfDay((Long) data);
data = OffsetTime.of(localTime, ZoneOffset.UTC);
}
// during snapshotting
else if (data instanceof String) {
// during snapshotting; already receiving OffsetTime @ UTC during streaming
if (data instanceof String) {
// The TIMETZ column is returned as a String which we initially parse here
// The parsed offset-time potentially has a zone-offset from the data, shift it after to GMT.
final OffsetTime offsetTime = OffsetTime.parse((String) data, TIME_WITH_TIMEZONE_FORMATTER);
@ -664,49 +628,6 @@ else if (data instanceof String) {
return super.convertTimeWithZone(column, fieldDefn, data);
}
private Object convertTwentyFourHourTime(Column column, Field fieldDefn, Object data) {
long twentyFourHour = NANO_SECONDS_PER_DAY;
if (adaptiveTimeMicrosecondsPrecisionMode) {
twentyFourHour = NANO_SECONDS_PER_DAY / 1_000;
}
if (adaptiveTimePrecisionMode) {
if (getTimePrecision(column) <= 3) {
twentyFourHour = NANO_SECONDS_PER_DAY / 1_000_000;
}
if (getTimePrecision(column) <= 6) {
twentyFourHour = NANO_SECONDS_PER_DAY / 1_000;
}
}
// during streaming
if (data instanceof Long) {
if ((Long) data == NANO_SECONDS_PER_DAY) {
return twentyFourHour;
}
return super.converter(column, fieldDefn).convert(data);
}
// during snapshotting
else if (data instanceof String) {
Duration d = Strings.asDuration((String) data);
if (d.equals(ONE_DAY)) {
return twentyFourHour;
}
return super.converter(column, fieldDefn).convert(d);
}
return super.converter(column, fieldDefn).convert(data);
}
private static LocalDateTime nanosToLocalDateTimeUTC(long epocNanos) {
// the pg plugin stores date/time info as microseconds since epoch
BigInteger epochMicrosBigInt = BigInteger.valueOf(epocNanos);
BigInteger[] secondsAndNanos = epochMicrosBigInt.divideAndRemainder(BigInteger.valueOf(TimeUnit.SECONDS.toNanos(1)));
return LocalDateTime.ofInstant(Instant.ofEpochSecond(secondsAndNanos[0].longValue(), secondsAndNanos[1].longValue()),
ZoneOffset.UTC);
}
protected Object convertGeometry(Column column, Field fieldDefn, Object data) {
final PostgisGeometry empty = PostgisGeometry.createEmpty();
return convertValue(column, fieldDefn, data, io.debezium.data.geometry.Geometry.createValue(fieldDefn.schema(), empty.getWkb(), empty.getSrid()), (r) -> {

View File

@ -11,6 +11,7 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.geometric.PGbox;
@ -45,13 +46,13 @@ public LocalTime asLocalTime() {
}
@Override
public OffsetTime asOffsetTime() {
public OffsetTime asOffsetTimeUtc() {
return DateTimeFormat.get().timeWithTimeZone(asString());
}
@Override
public OffsetDateTime asOffsetDateTime() {
return DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(asString());
public OffsetDateTime asOffsetDateTimeAtUtc() {
return DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(asString()).withOffsetSameInstant(ZoneOffset.UTC);
}
@Override

View File

@ -81,10 +81,10 @@ public interface ColumnValue<T> {
Double asDouble();
SpecialValueDecimal asDecimal();
LocalDate asLocalDate();
OffsetDateTime asOffsetDateTime();
OffsetDateTime asOffsetDateTimeAtUtc();
Instant asInstant();
LocalTime asLocalTime();
OffsetTime asOffsetTime();
OffsetTime asOffsetTimeUtc();
byte[] asByteArray();
PGbox asBox();
PGcircle asCircle();

View File

@ -102,7 +102,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f
case "timestamp with time zone":
case "timestamptz":
return value.asOffsetDateTime();
return value.asOffsetDateTimeAtUtc();
case "timestamp":
case "timestamp without time zone":
@ -116,7 +116,7 @@ public static Object resolveValue(String columnName, PostgresType type, String f
case "time with time zone":
case "timetz":
return value.asOffsetTime();
return value.asOffsetTimeUtc();
case "bytea":
return value.asByteArray();

View File

@ -9,13 +9,13 @@
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -187,23 +187,28 @@ else if (datumMessage.hasDatumString()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
// but we'll convert them to nanos which is the smallest unit
final LocalDateTime serverLocal = Conversions.toLocalDateTimeUTC(datumMessage.getDatumInt64());
return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC));
return Conversions.toInstantFromMicros(datumMessage.getDatumInt64());
case PgOid.TIMESTAMPTZ:
case PgOid.TIME:
if (!datumMessage.hasDatumInt64()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
// but we'll convert them to nanos which is the smallest unit
return TimeUnit.NANOSECONDS.convert(datumMessage.getDatumInt64(), TimeUnit.MICROSECONDS);
return Conversions.toInstantFromMicros(datumMessage.getDatumInt64()).atOffset(ZoneOffset.UTC);
case PgOid.TIME:
if (!datumMessage.hasDatumInt64()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
return Duration.of(datumMessage.getDatumInt64(), ChronoUnit.MICROS);
case PgOid.TIMETZ:
if (!datumMessage.hasDatumDouble()) {
return null;
}
// the value is sent as a double microseconds, convert to nano
return BigDecimal.valueOf(datumMessage.getDatumDouble() * 1000).longValue();
// the value is sent as a double microseconds
return Conversions.toInstantFromMicros((long) datumMessage.getDatumDouble())
.atOffset(ZoneOffset.UTC)
.toOffsetTime();
case PgOid.INTERVAL:
// these are sent as doubles by the plugin since their storage is larger than 8 bytes
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;

View File

@ -21,7 +21,9 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Month;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
@ -94,10 +96,12 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
protected static final Pattern INSERT_TABLE_MATCHING_PATTERN = Pattern.compile("insert into (.*)\\(.*\\) VALUES .*", Pattern.CASE_INSENSITIVE);
protected static final String INSERT_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('$1234.11')";
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it) " +
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large) " +
"VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, " +
"'2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, " +
"'P1Y2M3DT4H5M0S'::INTERVAL)";
"'P1Y2M3DT4H5M0S'::INTERVAL," +
"'21016-11-04T13:51:30.123456'::TIMESTAMP, '21016-11-04T13:51:30.123457'::TIMESTAMP, '21016-11-04T13:51:30.124'::TIMESTAMP," +
"'21016-11-04T13:51:30.123456+07:00'::TIMESTAMPTZ)";
protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bs, bv) " +
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))";
protected static final String INSERT_GEOM_TYPES_STMT = "INSERT INTO geom_table(p) VALUES ('(1,1)'::point)";
@ -534,10 +538,19 @@ protected List<SchemaAndValueField> schemaAndValuesForBinTypes() {
new SchemaAndValueField("bv", Bits.builder(2).optional().build(), new byte[] { 0, 0 }));
}
private long asEpochMillis(String timestamp) {
return LocalDateTime.parse(timestamp).atOffset(ZoneOffset.UTC).toInstant().toEpochMilli();
}
private long asEpochMicros(String timestamp) {
Instant instant = LocalDateTime.parse(timestamp).atOffset(ZoneOffset.UTC).toInstant();
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1_000;
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
long expectedTs = asEpochMicros("2016-11-04T13:51:30.123456");
long expectedTsMs = asEpochMillis("2016-11-04T13:51:30.123456");
long expectedNegTs = asEpochMicros("1936-10-25T22:10:12.608");
String expectedTz = "2016-11-04T11:51:30.123456Z"; //timestamp is stored with TZ, should be read back with UTC
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
@ -547,6 +560,12 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
String expectedTtzPrecision = "11:51:30.123Z";
double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, MicroDuration.DAYS_PER_MONTH_AVG);
long expectedTsLarge = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000 + 123456;
long expectedTsLargeUs = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000 + 123457;
long expectedTsLargeMs = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 124000000, ZoneOffset.UTC).toInstant().toEpochMilli();
String expectedTzLarge = "+21016-11-04T06:51:30.123456Z";
return Arrays.asList(new SchemaAndValueField("ts", MicroTimestamp.builder().optional().build(), expectedTs),
new SchemaAndValueField("tsneg", MicroTimestamp.builder().optional().build(), expectedNegTs),
new SchemaAndValueField("ts_ms", Timestamp.builder().optional().build(), expectedTsMs),
@ -558,19 +577,28 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
new SchemaAndValueField("ttf", MicroTime.builder().optional().build(), expectedTtf),
new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz),
new SchemaAndValueField("tptz", ZonedTime.builder().optional().build(), expectedTtzPrecision),
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval),
new SchemaAndValueField("ts_large", MicroTimestamp.builder().optional().build(), expectedTsLarge),
new SchemaAndValueField("ts_large_us", MicroTimestamp.builder().optional().build(), expectedTsLargeUs),
new SchemaAndValueField("ts_large_ms", Timestamp.builder().optional().build(), expectedTsLargeMs),
new SchemaAndValueField("tz_large", ZonedTimestamp.builder().optional().build(), expectedTzLarge)
);
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds() {
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
long expectedTs = asEpochMicros("2016-11-04T13:51:30.123456");
long expectedTsMs = asEpochMillis("2016-11-04T13:51:30.123456");
long expectedNegTs = asEpochMicros("1936-10-25T22:10:12.608");
String expectedTz = "2016-11-04T11:51:30.123456Z"; //timestamp is stored with TZ, should be read back with UTC
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
String expectedTtz = "11:51:30.123789Z"; //time is stored with TZ, should be read back at GMT
double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, MicroDuration.DAYS_PER_MONTH_AVG);
long expectedTsLarge = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000 + 123456;
long expectedTsLargeUs = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000 + 123457;
long expectedTsLargeMs = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 124000000, ZoneOffset.UTC).toInstant().toEpochMilli();
return Arrays.asList(new SchemaAndValueField("ts", MicroTimestamp.builder().optional().build(), expectedTs),
new SchemaAndValueField("tsneg", MicroTimestamp.builder().optional().build(), expectedNegTs),
new SchemaAndValueField("ts_ms", Timestamp.builder().optional().build(), expectedTsMs),
@ -579,7 +607,11 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypesAdaptiveTimeM
new SchemaAndValueField("date", Date.builder().optional().build(), expectedDate),
new SchemaAndValueField("ti", MicroTime.builder().optional().build(), expectedTi),
new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz),
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval),
new SchemaAndValueField("ts_large", MicroTimestamp.builder().optional().build(), expectedTsLarge),
new SchemaAndValueField("ts_large_us", MicroTimestamp.builder().optional().build(), expectedTsLargeUs),
new SchemaAndValueField("ts_large_ms", Timestamp.builder().optional().build(), expectedTsLargeMs)
);
}
protected List<SchemaAndValueField> schemaAndValuesForMoneyTypes() {

View File

@ -7,22 +7,31 @@ CREATE TABLE numeric_table (pk SERIAL, si SMALLINT, i INTEGER, bi BIGINT,
r_pinf REAL, db_pinf DOUBLE PRECISION,
r_ninf REAL, db_ninf DOUBLE PRECISION,
ss SMALLSERIAL, bs BIGSERIAL, b BOOLEAN, PRIMARY KEY(pk));
-- no suffix -fixed scale, zs - zero scale, vs - variable scale
CREATE TABLE numeric_decimal_table (pk SERIAL,
d DECIMAL(3,2), dzs DECIMAL(4), dvs DECIMAL, d_nn DECIMAL(3,2) NOT NULL, n NUMERIC(6,4), nzs NUMERIC(4), nvs NUMERIC,
d_int DECIMAL(3,2), dzs_int DECIMAL(4), dvs_int DECIMAL, n_int NUMERIC(6,4), nzs_int NUMERIC(4), nvs_int NUMERIC,
d_nan DECIMAL(3,2), dzs_nan DECIMAL(4), dvs_nan DECIMAL, n_nan NUMERIC(6,4), nzs_nan NUMERIC(4), nvs_nan NUMERIC,
PRIMARY KEY(pk));
CREATE TABLE string_table (pk SERIAL, vc VARCHAR(2), vcv CHARACTER VARYING(2), ch CHARACTER(4), c CHAR(3), t TEXT, b BYTEA, bnn BYTEA NOT NULL, ct CITEXT, PRIMARY KEY(pk));
CREATE TABLE network_address_table (pk SERIAL, i INET, PRIMARY KEY(pk));
CREATE TABLE cidr_network_address_table (pk SERIAL, i CIDR, PRIMARY KEY(pk));
CREATE TABLE macaddr_table(pk SERIAL, m MACADDR, PRIMARY KEY(pk));
CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk));
CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk));
CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIME ZONE, ts_ms TIMESTAMP(3), ts_us TIMESTAMP(6), tz TIMESTAMPTZ, date DATE,
ti TIME, tip TIME(3), ttf TIME,
ttz TIME WITH TIME ZONE, tptz TIME(3) WITH TIME ZONE,
it INTERVAL, tsp TIMESTAMP (0) WITH TIME ZONE, PRIMARY KEY(pk));
it INTERVAL, tsp TIMESTAMP (0) WITH TIME ZONE,
ts_large TIMESTAMP,
ts_large_us TIMESTAMP(6),
ts_large_ms TIMESTAMP(3),
tz_large TIMESTAMPTZ,
PRIMARY KEY(pk));
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));
CREATE TABLE geom_table (pk SERIAL, p POINT, PRIMARY KEY(pk));
CREATE TABLE range_table (pk SERIAL, unbounded_exclusive_tsrange TSRANGE, bounded_inclusive_tsrange TSRANGE, unbounded_exclusive_tstzrange TSTZRANGE, bounded_inclusive_tstzrange TSTZRANGE, unbounded_exclusive_daterange DATERANGE, bounded_exclusive_daterange DATERANGE, int4_number_range INT4RANGE, numerange NUMRANGE, int8_number_range INT8RANGE, PRIMARY KEY(pk));

View File

@ -318,19 +318,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
}
return (data) -> convertDateToEpochDaysAsDate(column, fieldDefn, data);
case Types.TIME:
if(adaptiveTimeMicrosecondsPrecisionMode) {
return data -> convertTimeToMicrosPastMidnight(column, fieldDefn, data);
}
if (adaptiveTimePrecisionMode) {
if (getTimePrecision(column) <= 3) {
return data -> convertTimeToMillisPastMidnight(column, fieldDefn, data);
}
if (getTimePrecision(column) <= 6) {
return data -> convertTimeToMicrosPastMidnight(column, fieldDefn, data);
}
return (data) -> convertTimeToNanosPastMidnight(column, fieldDefn, data);
}
return (data) -> convertTimeToMillisPastMidnightAsDate(column, fieldDefn, data);
return (data) -> convertTime(column, fieldDefn, data);
case Types.TIMESTAMP:
if (adaptiveTimePrecisionMode || adaptiveTimeMicrosecondsPrecisionMode) {
if (getTimePrecision(column) <= 3) {
@ -422,6 +410,25 @@ protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data
});
}
protected Object convertTime(Column column, Field fieldDefn, Object data) {
if(adaptiveTimeMicrosecondsPrecisionMode) {
return convertTimeToMicrosPastMidnight(column, fieldDefn, data);
}
if (adaptiveTimePrecisionMode) {
if (getTimePrecision(column) <= 3) {
return convertTimeToMillisPastMidnight(column, fieldDefn, data);
}
if (getTimePrecision(column) <= 6) {
return convertTimeToMicrosPastMidnight(column, fieldDefn, data);
}
return convertTimeToNanosPastMidnight(column, fieldDefn, data);
}
// "connect" mode
else {
return convertTimeToMillisPastMidnightAsDate(column, fieldDefn, data);
}
}
/**
* Converts a value object for an expected JDBC type of {@link Types#TIMESTAMP} to {@link Timestamp} values, or milliseconds
* past epoch.

View File

@ -109,10 +109,6 @@ protected static LocalTime toLocalTime(Object obj) {
throw new IllegalArgumentException("Time values must use number of milliseconds greater than 0 and less than 86400000000000");
}
}
if ( obj instanceof Long) {
// Assume the value is the epoch day number
return LocalTime.ofNanoOfDay((Long) obj);
}
throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + obj + "' of type " + obj.getClass().getName());
}
@ -175,83 +171,15 @@ protected static LocalDateTime toLocalDateTime(Object obj) {
throw new IllegalArgumentException("Unable to convert to LocalTime from unexpected value '" + obj + "' of type " + obj.getClass().getName());
}
/**
* Get the number of nanoseconds past epoch of the given {@link LocalDateTime}.
*
* @param timestamp the Java timestamp value
* @return the epoch nanoseconds
*/
static long toEpochNanos(LocalDateTime timestamp) {
long nanoInDay = timestamp.toLocalTime().toNanoOfDay();
long nanosOfDay = toEpochNanos(timestamp.toLocalDate());
return nanosOfDay + nanoInDay;
}
/**
* Get the number of nanoseconds past epoch of the given {@link LocalDate}.
*
* @param date the Java date value
* @return the epoch nanoseconds
*/
static long toEpochNanos(LocalDate date) {
long epochDay = date.toEpochDay();
return epochDay * Conversions.NANOSECONDS_PER_DAY;
}
/**
* Get the UTC-based {@link LocalDateTime} for given microseconds epoch
*
* @param microseconds - timestamp in microseconds
* @return timestamp in UTC timezone
*/
public static LocalDateTime toLocalDateTimeUTC(long microseconds) {
long seconds = microseconds / MICROSECONDS_PER_SECOND;
// typecasting is safe as microseconds and nanoseconds in second fit in int range
int microsecondsOfSecond = (int) (microseconds % MICROSECONDS_PER_SECOND);
if (microsecondsOfSecond < 0) {
seconds--;
microsecondsOfSecond = (int) Conversions.MICROSECONDS_PER_SECOND + microsecondsOfSecond;
}
return LocalDateTime.ofEpochSecond(seconds, (int) (microsecondsOfSecond * NANOSECONDS_PER_MICROSECOND), ZoneOffset.UTC);
}
/**
* Get the UTC-based {@link LocalDateTime} for given nanoseconds epoch
*
* @param nanoseconds - timestamp in nanoseconds
* @return timestamp in UTC timezone
*/
public static LocalDateTime fromNanosToLocalDateTimeUTC(long nanoseconds) {
long seconds = nanoseconds / NANOSECONDS_PER_SECOND;
// typecasting is safe as microseconds and nanoseconds in second fit in int range
int nanosecondsOfSecond = (int) (nanoseconds % NANOSECONDS_PER_SECOND);
if (nanosecondsOfSecond < 0) {
seconds--;
nanosecondsOfSecond = (int) Conversions.NANOSECONDS_PER_SECOND + nanosecondsOfSecond;
}
return LocalDateTime.ofEpochSecond(seconds, nanosecondsOfSecond, ZoneOffset.UTC);
}
/**
* Get the number of nanoseconds past epoch of the given {@link Instant}.
*
* @param instant the Java instant value
* @return the epoch nanoseconds
*/
public static long toEpochNanos(Instant instant) {
return TimeUnit.NANOSECONDS.convert(instant.getEpochSecond() * MICROSECONDS_PER_SECOND + instant.getNano() / NANOSECONDS_PER_MICROSECOND, TimeUnit.MICROSECONDS);
}
public static long toEpochMicros(Instant instant) {
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano());
}
public static Instant toInstant(long epochNanos) {
return Instant.ofEpochSecond(0, epochNanos);
}
public static Instant toInstantFromMicros(long epochMicros) {
return toInstant(TimeUnit.MICROSECONDS.toNanos(epochMicros));
public static Instant toInstantFromMicros(long microsSinceEpoch) {
return Instant.ofEpochSecond(
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1))
);
}
public static Instant toInstantFromMillis(long epochMillis) {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.time;
import java.time.Duration;
import java.time.LocalTime;
import java.time.temporal.TemporalAdjuster;
@ -64,6 +65,11 @@ public static Schema schema() {
* @throws IllegalArgumentException if the value is not an instance of the acceptable types
*/
public static long toMicroOfDay(Object value, TemporalAdjuster adjuster) {
// conversion to nanos is fine as TIME values won't exceed long range
if (value instanceof Duration) {
return ((Duration)value).toNanos() / 1_000;
}
LocalTime time = Conversions.toLocalTime(value);
if (adjuster != null) {
time = time.with(adjuster);

View File

@ -6,6 +6,7 @@
package io.debezium.time;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAdjuster;
import org.apache.kafka.connect.data.Schema;
@ -69,8 +70,7 @@ public static long toEpochMicros(Object value, TemporalAdjuster adjuster) {
if (adjuster != null) {
dateTime = dateTime.with(adjuster);
}
long epochNanos = Conversions.toEpochNanos(dateTime);
return Math.floorDiv(epochNanos, Conversions.NANOSECONDS_PER_MICROSECOND);
return Conversions.toEpochMicros(dateTime.toInstant(ZoneOffset.UTC));
}
private MicroTimestamp() {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.time;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAdjuster;
@ -69,7 +70,30 @@ public static long toEpochNanos(Object value, TemporalAdjuster adjuster) {
if ( adjuster != null) {
dateTime = dateTime.with(adjuster);
}
return Conversions.toEpochNanos(dateTime);
return toEpochNanos(dateTime);
}
/**
* Get the number of nanoseconds past epoch of the given {@link LocalDateTime}.
*
* @param timestamp the Java timestamp value
* @return the epoch nanoseconds
*/
private static long toEpochNanos(LocalDateTime timestamp) {
long nanoInDay = timestamp.toLocalTime().toNanoOfDay();
long nanosOfDay = toEpochNanos(timestamp.toLocalDate());
return nanosOfDay + nanoInDay;
}
/**
* Get the number of nanoseconds past epoch of the given {@link LocalDate}.
*
* @param date the Java date value
* @return the epoch nanoseconds
*/
private static long toEpochNanos(LocalDate date) {
long epochDay = date.toEpochDay();
return epochDay * Conversions.NANOSECONDS_PER_DAY;
}
private NanoTimestamp() {

View File

@ -5,6 +5,7 @@
*/
package io.debezium.time;
import java.time.Duration;
import java.time.LocalTime;
import java.time.temporal.TemporalAdjuster;
@ -64,6 +65,10 @@ public static Schema schema() {
* @throws IllegalArgumentException if the value is not an instance of the acceptable types
*/
public static int toMilliOfDay(Object value, TemporalAdjuster adjuster) {
if (value instanceof Duration) {
// int conversion is ok for the range of TIME
return (int) ((Duration) value).toMillis();
}
LocalTime time = Conversions.toLocalTime(value);
if (adjuster != null) {
time = time.with(adjuster);

View File

@ -6,6 +6,7 @@
package io.debezium.time;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.TemporalAdjuster;
import org.apache.kafka.connect.data.Schema;
@ -72,8 +73,8 @@ public static long toEpochMillis(Object value, TemporalAdjuster adjuster) {
if (adjuster != null) {
dateTime = dateTime.with(adjuster);
}
long epochNanos = Conversions.toEpochNanos(dateTime);
return Math.floorDiv(epochNanos, Conversions.NANOSECONDS_PER_MILLISECOND);
return dateTime.toInstant(ZoneOffset.UTC).toEpochMilli();
}
private Timestamp() {

View File

@ -5,16 +5,18 @@
*/
package io.debezium.time;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
/**
* @author Randall Hauch
*
@ -170,9 +172,40 @@ public void shouldReturnLocalTimeInstanceWhenConvertingSqlTimeToLocalTime() {
}
@Test
public void shouldReturnLocalTimeInstanceWhenConvertingLongToLocalTime() {
LocalTime now = LocalTime.now();
long nanoOfDay = now.toNanoOfDay();
assertThat(Conversions.toLocalTime(nanoOfDay)).isEqualTo(now);
public void shouldReturnCorrectInstantWhenConvertingMicroSecondsSinceEpoch() {
// value obtained from Postgres for '21016-11-04T13:51:30.123456'::TIMESTAMP
long usSinceEpoch = 601060312290123456L;
assertThat(Conversions.toInstantFromMicros(usSinceEpoch)).isEqualTo(OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 123456000, ZoneOffset.UTC).toInstant());
Instant source = LocalDateTime.parse("1970-01-01T00:00:01.250").toInstant(ZoneOffset.UTC);
long epochMicros = Conversions.toEpochMicros(source);
Instant target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
source = LocalDateTime.parse("1969-12-31T23:59:58.750000").toInstant(ZoneOffset.UTC);
epochMicros = Conversions.toEpochMicros(source);
target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
source = LocalDateTime.parse("1969-12-31T23:59:58.250000").toInstant(ZoneOffset.UTC);
epochMicros = Conversions.toEpochMicros(source);
target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
source = OffsetDateTime.of(21016, 11, 4, 13, 51, 30, 123456000, ZoneOffset.UTC).toInstant();
epochMicros = Conversions.toEpochMicros(source);
target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
// Postgres' timestamp is 294276 AD, but this should be good enough
source = OffsetDateTime.of(294247, 1, 10, 4, 0, 54, 775_807_000, ZoneOffset.UTC).toInstant();
epochMicros = Conversions.toEpochMicros(source);
target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
source = OffsetDateTime.of(-4713, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant();
epochMicros = Conversions.toEpochMicros(source);
target = Conversions.toInstantFromMicros(epochMicros);
assertThat(target).isEqualTo(source);
}
}