DBZ-2614 Support for infinite timestamps

This commit is contained in:
Jiri Pechanec 2020-10-12 12:29:14 +02:00 committed by Gunnar Morling
parent 1f79c15a48
commit 1edad510ff
8 changed files with 107 additions and 9 deletions

View File

@ -40,6 +40,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.PGStatement;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.util.HStoreConverter;
@ -66,6 +67,7 @@
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;
import io.debezium.time.Conversions;
import io.debezium.time.Interval;
import io.debezium.time.MicroDuration;
import io.debezium.time.MicroTime;
@ -86,6 +88,18 @@
*/
public class PostgresValueConverter extends JdbcValueConverters {
public static final Timestamp POSITIVE_INFINITY_TIMESTAMP = new Timestamp(PGStatement.DATE_POSITIVE_INFINITY);
public static final Instant POSITIVE_INFINITY_INSTANT = Conversions.toInstantFromMicros(PGStatement.DATE_POSITIVE_INFINITY);
public static final LocalDateTime POSITIVE_INFINITY_LOCAL_DATE_TIME = LocalDateTime.ofInstant(POSITIVE_INFINITY_INSTANT, ZoneOffset.UTC);
public static final OffsetDateTime POSITIVE_INFINITY_OFFSET_DATE_TIME = OffsetDateTime.ofInstant(Conversions.toInstantFromMillis(PGStatement.DATE_POSITIVE_INFINITY),
ZoneOffset.UTC);
public static final Timestamp NEGATIVE_INFINITY_TIMESTAMP = new Timestamp(PGStatement.DATE_NEGATIVE_INFINITY);
public static final Instant NEGATIVE_INFINITY_INSTANT = Conversions.toInstantFromMicros(PGStatement.DATE_NEGATIVE_INFINITY);
public static final LocalDateTime NEGATIVE_INFINITY_LOCAL_DATE_TIME = LocalDateTime.ofInstant(NEGATIVE_INFINITY_INSTANT, ZoneOffset.UTC);
public static final OffsetDateTime NEGATIVE_INFINITY_OFFSET_DATE_TIME = OffsetDateTime.ofInstant(Conversions.toInstantFromMillis(PGStatement.DATE_NEGATIVE_INFINITY),
ZoneOffset.UTC);
/**
* Variable scale decimal/numeric is defined by metadata
* scale - 0
@ -750,6 +764,13 @@ protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object
data = OffsetDateTime.ofInstant(((Date) data).toInstant(), ZoneOffset.UTC);
}
if (POSITIVE_INFINITY_OFFSET_DATE_TIME.equals(data)) {
return "infinity";
}
else if (NEGATIVE_INFINITY_OFFSET_DATE_TIME.equals(data)) {
return "-infinity";
}
return super.convertTimestampWithZone(column, fieldDefn, data);
}
@ -939,6 +960,13 @@ protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn,
}
final Timestamp timestamp = (Timestamp) data;
if (POSITIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return POSITIVE_INFINITY_LOCAL_DATE_TIME;
}
else if (NEGATIVE_INFINITY_TIMESTAMP.equals(timestamp)) {
return NEGATIVE_INFINITY_LOCAL_DATE_TIME;
}
final Instant instant = timestamp.toInstant();
final LocalDateTime utcTime = LocalDateTime
.ofInstant(instant, ZoneOffset.systemDefault());

View File

@ -28,6 +28,7 @@
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
import io.debezium.connector.postgresql.PostgresType;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
@ -60,11 +61,23 @@ public OffsetTime asOffsetTimeUtc() {
@Override
public OffsetDateTime asOffsetDateTimeAtUtc() {
if ("infinity".equals(asString())) {
return PostgresValueConverter.POSITIVE_INFINITY_OFFSET_DATE_TIME;
}
else if ("-infinity".equals(asString())) {
return PostgresValueConverter.NEGATIVE_INFINITY_OFFSET_DATE_TIME;
}
return DateTimeFormat.get().timestampWithTimeZoneToOffsetDateTime(asString()).withOffsetSameInstant(ZoneOffset.UTC);
}
@Override
public Instant asInstant() {
if ("infinity".equals(asString())) {
return PostgresValueConverter.POSITIVE_INFINITY_INSTANT;
}
else if ("-infinity".equals(asString())) {
return PostgresValueConverter.NEGATIVE_INFINITY_INSTANT;
}
return DateTimeFormat.get().timestampToInstant(asString());
}

View File

@ -23,7 +23,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PgOid;
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.PgConnectionSupplier;
import io.debezium.connector.postgresql.PostgresType;
@ -195,8 +194,13 @@ public OffsetTime asOffsetTimeUtc() {
@Override
public OffsetDateTime asOffsetDateTimeAtUtc() {
if (value.hasDatumInt64()) {
if (value.getDatumInt64() < TIMESTAMP_MIN || value.getDatumInt64() >= TIMESTAMP_MAX) {
throw new DebeziumException("Infinite value '" + value.getDatumInt64() + "' arrived from database, this is not supported yet");
if (value.getDatumInt64() >= TIMESTAMP_MAX) {
LOGGER.trace("Infinite(+) value '{}' arrived from database", value.getDatumInt64());
return PostgresValueConverter.POSITIVE_INFINITY_OFFSET_DATE_TIME;
}
else if (value.getDatumInt64() < TIMESTAMP_MIN) {
LOGGER.trace("Infinite(-) value '{}' arrived from database", value.getDatumInt64());
return PostgresValueConverter.NEGATIVE_INFINITY_OFFSET_DATE_TIME;
}
return Conversions.toInstantFromMicros(value.getDatumInt64()).atOffset(ZoneOffset.UTC);
}
@ -208,8 +212,13 @@ public OffsetDateTime asOffsetDateTimeAtUtc() {
@Override
public Instant asInstant() {
if (value.hasDatumInt64()) {
if (value.getDatumInt64() < TIMESTAMP_MIN || value.getDatumInt64() >= TIMESTAMP_MAX) {
throw new DebeziumException("Infinite value '" + value.getDatumInt64() + "' arrived from database, this is not supported yet");
if (value.getDatumInt64() >= TIMESTAMP_MAX) {
LOGGER.trace("Infinite(+) value '{}' arrived from database", value.getDatumInt64());
return PostgresValueConverter.POSITIVE_INFINITY_INSTANT;
}
else if (value.getDatumInt64() < TIMESTAMP_MIN) {
LOGGER.trace("Infinite(-) value '{}' arrived from database", value.getDatumInt64());
return PostgresValueConverter.NEGATIVE_INFINITY_INSTANT;
}
return Conversions.toInstantFromMicros(value.getDatumInt64());
}

View File

@ -18,6 +18,7 @@
import java.time.format.SignStyle;
import java.time.format.TextStyle;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
@ -83,6 +84,16 @@ public static class ISODateTimeFormat implements DateTimeFormat {
.appendText(ChronoField.ERA, TextStyle.SHORT)
.optionalEnd()
.toFormatter();
private static final DateTimeFormatter TS_TZ_WITH_SECONDS_FORMAT = new DateTimeFormatterBuilder()
.append(NON_ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.appendOffset("+HH:MM:SS", "")
.optionalStart()
.appendLiteral(" ")
.appendText(ChronoField.ERA, TextStyle.SHORT)
.optionalEnd()
.toFormatter();
private static final String SYSTEM_TS_FORMAT_PATTERN_HINT = "y..y-MM-dd HH:mm:ss.SSSSSSX";
private static final DateTimeFormatter SYSTEM_TS_FORMAT = new DateTimeFormatterBuilder()
@ -151,7 +162,21 @@ public Instant timestampToInstant(String s) {
@Override
public OffsetDateTime timestampWithTimeZoneToOffsetDateTime(String s) {
return format(TS_TZ_FORMAT_PATTERN_HINT, s, () -> OffsetDateTime.from(TS_TZ_FORMAT.parse(s)));
return format(TS_TZ_FORMAT_PATTERN_HINT, s, () -> {
TemporalAccessor parsedTimestamp;
// Usually the timestamp contains only hour offset and optionally minutes
// For very large negative timestamps the offset could contain seconds
// The standard parsing library does not allow both optional minutes and seconds in offset,
// so it is necessary to parse it with optional minutes and if that fails then retyr with
// seconds
try {
parsedTimestamp = TS_TZ_FORMAT.parse(s);
}
catch (DateTimeParseException e) {
parsedTimestamp = TS_TZ_WITH_SECONDS_FORMAT.parse(s);
}
return OffsetDateTime.from(parsedTimestamp);
});
}
@Override

View File

@ -54,6 +54,7 @@
import org.apache.kafka.connect.source.SourceTask;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.postgresql.jdbc.PgStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -105,7 +106,7 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
protected static final String INSERT_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('$1234.11')";
protected static final String INSERT_NEGATIVE_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('($1234.11)')";
protected static final String INSERT_NULL_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES (NULL)";
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large, ts_max, ts_min, tz_max, tz_min) "
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large, ts_max, ts_min, tz_max, tz_min, ts_pinf, ts_ninf, tz_pinf, tz_ninf) "
+
"VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, "
+
@ -116,7 +117,11 @@ public abstract class AbstractRecordsProducerTest extends AbstractConnectorTest
"'294247-01-01T23:59:59.999999'::TIMESTAMP," +
"'4713-12-31T23:59:59.999999 BC'::TIMESTAMP," +
"'294247-01-01T23:59:59.999999+00:00'::TIMESTAMPTZ," +
"'4714-12-31T23:59:59.999999Z BC'::TIMESTAMPTZ"
"'4714-12-31T23:59:59.999999Z BC'::TIMESTAMPTZ," +
"'infinity'::TIMESTAMP," +
"'-infinity'::TIMESTAMP," +
"'infinity'::TIMESTAMPTZ," +
"'-infinity'::TIMESTAMPTZ"
+ ")";
protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bol2, bs, bs7, bv, bv2, bvl, bvunlimited1, bvunlimited2) " +
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '1'::bit(1), '11'::bit(2), '1'::bit(7), '00'::bit(2), '000000110000001000000001'::bit(24)," +
@ -645,7 +650,11 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
new SchemaAndValueField("ts_max", MicroTimestamp.builder().optional().build(), 9223371331200000000L - 1L),
new SchemaAndValueField("ts_min", MicroTimestamp.builder().optional().build(), -1L).assertWithCondition(largeNegativeTimestamp),
new SchemaAndValueField("tz_max", ZonedTimestamp.builder().optional().build(), "+294247-01-01T23:59:59.999999Z"),
new SchemaAndValueField("tz_min", ZonedTimestamp.builder().optional().build(), "").assertWithCondition(largeNegativeTzTimestamp));
new SchemaAndValueField("tz_min", ZonedTimestamp.builder().optional().build(), "").assertWithCondition(largeNegativeTzTimestamp),
new SchemaAndValueField("ts_pinf", MicroTimestamp.builder().optional().build(), PgStatement.DATE_POSITIVE_INFINITY),
new SchemaAndValueField("ts_ninf", MicroTimestamp.builder().optional().build(), PgStatement.DATE_NEGATIVE_INFINITY),
new SchemaAndValueField("tz_pinf", ZonedTimestamp.builder().optional().build(), "infinity"),
new SchemaAndValueField("tz_ninf", ZonedTimestamp.builder().optional().build(), "-infinity"));
}
protected List<SchemaAndValueField> schemaAndValuesForTimeArrayTypes() {

View File

@ -35,6 +35,10 @@ CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIM
ts_min TIMESTAMP(6),
tz_max TIMESTAMPTZ,
tz_min TIMESTAMPTZ,
ts_pinf TIMESTAMP(6),
ts_ninf TIMESTAMP(6),
tz_pinf TIMESTAMPTZ,
tz_ninf TIMESTAMPTZ,
PRIMARY KEY(pk));
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));

View File

@ -188,4 +188,10 @@ public static Instant toInstantFromMicros(long microsSinceEpoch) {
TimeUnit.MICROSECONDS.toSeconds(microsSinceEpoch),
TimeUnit.MICROSECONDS.toNanos(microsSinceEpoch % TimeUnit.SECONDS.toMicros(1)));
}
public static Instant toInstantFromMillis(long millisecondSinceEpoch) {
return Instant.ofEpochSecond(
TimeUnit.MILLISECONDS.toSeconds(millisecondSinceEpoch),
TimeUnit.MILLISECONDS.toNanos(millisecondSinceEpoch % TimeUnit.SECONDS.toMillis(1)));
}
}

View File

@ -1498,6 +1498,10 @@ Such columns are converted into an equivalent Kafka Connect value based on UTC.
The timezone of the JVM running Kafka Connect and {prodname} does not affect this conversion.
PostgreSQL supports using `+/-infinite` values in `TIMESTAMP` columns.
These special values are converted to timestamps with value `9223372036825200000` in case of positive infinity or `-9223372036832400000` in case of negative infinity.
This behaviour mimics the standard behaviour of PostgreSQL JDBC driver - see `org.postgresql.PGStatement` interface for reference.
[id="postgresql-decimal-types"]
=== Decimal types