DBZ-1164 Supporting TIME columns with value of "24:00:00"

This commit is contained in:
GraySmog 2019-05-02 16:06:53 +08:00 committed by Gunnar Morling
parent 9afe06945e
commit 9b52ac86e8
6 changed files with 105 additions and 5 deletions

View File

@ -19,6 +19,7 @@
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.OffsetTime; import java.time.OffsetTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.Duration;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField; import java.time.temporal.ChronoField;
@ -29,8 +30,11 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import io.debezium.util.Strings;
import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -291,6 +295,8 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return convertBits(column, fieldDefn); return convertBits(column, fieldDefn);
case PgOid.INTERVAL: case PgOid.INTERVAL:
return data -> convertInterval(column, fieldDefn, data); return data -> convertInterval(column, fieldDefn, data);
case PgOid.TIME:
return data -> convertTwentyFourHourTime(column, fieldDefn, data);
case PgOid.TIMESTAMP: case PgOid.TIMESTAMP:
return ((ValueConverter) (data -> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn)); return ((ValueConverter) (data -> convertTimestampToLocalDateTime(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
case PgOid.TIMESTAMPTZ: case PgOid.TIMESTAMPTZ:
@ -642,6 +648,73 @@ else if (data instanceof String) {
return super.convertTimeWithZone(column, fieldDefn, data); return super.convertTimeWithZone(column, fieldDefn, data);
} }
private Object convertTwentyFourHourTime(Column column, Field fieldDefn, Object data) {
final long nanosecondsPerDay = TimeUnit.DAYS.toNanos(1);
long twentyFourHour = nanosecondsPerDay;
if (adaptiveTimeMicrosecondsPrecisionMode) {
twentyFourHour = nanosecondsPerDay / 1_000;
}
if (adaptiveTimePrecisionMode) {
if (getTimePrecision(column) <= 3) {
twentyFourHour = nanosecondsPerDay / 1_000_000;
}
if (getTimePrecision(column) <= 6) {
twentyFourHour = nanosecondsPerDay / 1_000;
}
}
// during streaming
if (data instanceof Long) {
if ((Long) data == nanosecondsPerDay) {
return twentyFourHour;
}
return super.converter(column, fieldDefn).convert(data);
}
// during snapshotting
else if (data instanceof String) {
Duration d = stringToDuration((String) data);
if (d.toNanos() == nanosecondsPerDay) {
return twentyFourHour;
}
return super.converter(column, fieldDefn).convert(d);
}
return super.converter(column, fieldDefn).convert(data);
}
private static Duration stringToDuration(String timeString) {
final Pattern timeFieldPattern = Pattern.compile("([0-9]*):([0-9]*):([0-9]*)(\\.([0-9]*))?");
Matcher matcher = timeFieldPattern.matcher(timeString);
if (!matcher.matches()) {
throw new RuntimeException("Unexpected format for TIME column: " + timeString);
}
long hours = Long.parseLong(matcher.group(1));
long minutes = Long.parseLong(matcher.group(2));
long seconds = Long.parseLong(matcher.group(3));
long nanoSeconds = 0;
String microSecondsString = matcher.group(5);
if (microSecondsString != null) {
nanoSeconds = Long.parseLong(Strings.justifyLeft(microSecondsString, 9, '0'));
}
if (hours >= 0) {
return Duration.ofHours(hours)
.plusMinutes(minutes)
.plusSeconds(seconds)
.plusNanos(nanoSeconds);
}
else {
return Duration.ofHours(hours)
.minusMinutes(minutes)
.minusSeconds(seconds)
.minusNanos(nanoSeconds);
}
}
private static LocalDateTime nanosToLocalDateTimeUTC(long epocNanos) { private static LocalDateTime nanosToLocalDateTimeUTC(long epocNanos) {
// the pg plugin stores date/time info as microseconds since epoch // the pg plugin stores date/time info as microseconds since epoch
BigInteger epochMicrosBigInt = BigInteger.valueOf(epocNanos); BigInteger epochMicrosBigInt = BigInteger.valueOf(epocNanos);

View File

@ -374,12 +374,12 @@ private Object valueForColumn(ResultSet rs, int colIdx, ResultSetMetaData metaDa
Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s); Optional<SpecialValueDecimal> value = PostgresValueConverter.toSpecialValue(s);
return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(colIdx)); return value.isPresent() ? value.get() : new SpecialValueDecimal(rs.getBigDecimal(colIdx));
case PgOid.TIME:
// To handle time 24:00:00 supported by TIME columns, read the column as a string.
case PgOid.TIMETZ: case PgOid.TIMETZ:
// In order to guarantee that we resolve TIMETZ columns with proper microsecond precision, // In order to guarantee that we resolve TIMETZ columns with proper microsecond precision,
// read the column as a string instead and then re-parse inside the converter. // read the column as a string instead and then re-parse inside the converter.
return rs.getString(colIdx); return rs.getString(colIdx);
default: default:
Object x = rs.getObject(colIdx); Object x = rs.getObject(colIdx);
if(x != null) { if(x != null) {

View File

@ -269,6 +269,8 @@ else if (rawValue.isBigInteger()) {
return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC)); return Conversions.toEpochNanos(serverLocal.toInstant(ZoneOffset.UTC));
case "time": case "time":
return rawValue.asString();
case "time without time zone": case "time without time zone":
return DateTimeFormat.get().time(rawValue.asString()); return DateTimeFormat.get().time(rawValue.asString());

View File

@ -80,9 +80,9 @@ public abstract class AbstractRecordsProducerTest {
protected static final Pattern INSERT_TABLE_MATCHING_PATTERN = Pattern.compile("insert into (.*)\\(.*\\) VALUES .*", Pattern.CASE_INSENSITIVE); protected static final Pattern INSERT_TABLE_MATCHING_PATTERN = Pattern.compile("insert into (.*)\\(.*\\) VALUES .*", Pattern.CASE_INSENSITIVE);
protected static final String INSERT_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('$1234.11')"; protected static final String INSERT_CASH_TYPES_STMT = "INSERT INTO cash_table (csh) VALUES ('$1234.11')";
protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttz, tptz, it) " + protected static final String INSERT_DATE_TIME_TYPES_STMT = "INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it) " +
"VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, " + "VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, " +
"'2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, " + "'2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, " +
"'P1Y2M3DT4H5M0S'::INTERVAL)"; "'P1Y2M3DT4H5M0S'::INTERVAL)";
protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bs, bv) " + protected static final String INSERT_BIN_TYPES_STMT = "INSERT INTO bitbin_table (ba, bol, bs, bv) " +
"VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))"; "VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))";
@ -500,6 +500,7 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null); int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000; long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
long expectedTiPrecision = LocalTime.parse("13:51:30.123").toNanoOfDay() / 1_000_000; long expectedTiPrecision = LocalTime.parse("13:51:30.123").toNanoOfDay() / 1_000_000;
long expectedTtf = TimeUnit.DAYS.toNanos(1) / 1_000;
String expectedTtz = "11:51:30.123789Z"; //time is stored with TZ, should be read back at GMT String expectedTtz = "11:51:30.123789Z"; //time is stored with TZ, should be read back at GMT
String expectedTtzPrecision = "11:51:30.123Z"; String expectedTtzPrecision = "11:51:30.123Z";
double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, MicroDuration.DAYS_PER_MONTH_AVG); double interval = MicroDuration.durationMicros(1, 2, 3, 4, 5, 0, MicroDuration.DAYS_PER_MONTH_AVG);
@ -512,6 +513,7 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
new SchemaAndValueField("date", Date.builder().optional().build(), expectedDate), new SchemaAndValueField("date", Date.builder().optional().build(), expectedDate),
new SchemaAndValueField("ti", MicroTime.builder().optional().build(), expectedTi), new SchemaAndValueField("ti", MicroTime.builder().optional().build(), expectedTi),
new SchemaAndValueField("tip", Time.builder().optional().build(), (int) expectedTiPrecision), new SchemaAndValueField("tip", Time.builder().optional().build(), (int) expectedTiPrecision),
new SchemaAndValueField("ttf", MicroTime.builder().optional().build(), expectedTtf),
new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz), new SchemaAndValueField("ttz", ZonedTime.builder().optional().build(), expectedTtz),
new SchemaAndValueField("tptz", ZonedTime.builder().optional().build(), expectedTtzPrecision), new SchemaAndValueField("tptz", ZonedTime.builder().optional().build(), expectedTtzPrecision),
new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval)); new SchemaAndValueField("it", MicroDuration.builder().optional().build(), interval));

View File

@ -538,4 +538,27 @@ public void shouldGenerateSnapshotForMacaddr8Datatype() throws Exception {
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName)); consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
} }
@Test
@FixFor("DBZ-1164")
public void shouldGenerateSnapshotForTwentyFourHourTime() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.build());
snapshotProducer = buildNoStreamProducer(context, config);
final TestConsumer consumer = testConsumer(1, "public");
// insert data and time data
TestHelper.execute(INSERT_DATE_TIME_TYPES_STMT);
snapshotProducer.start(consumer, e -> {});
consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
final Map<String, List<SchemaAndValueField>> expectedValueByTopicName = Collect.hashMapOf("public.time_table", schemaAndValuesForDateTimeTypes());
consumer.process(record -> assertReadRecord(record, expectedValueByTopicName));
}
} }

View File

@ -20,7 +20,7 @@ CREATE TABLE macaddr_table(pk SERIAL, m MACADDR, PRIMARY KEY(pk));
CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk)); CREATE TABLE cash_table (pk SERIAL, csh MONEY, PRIMARY KEY(pk));
CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk)); CREATE TABLE bitbin_table (pk SERIAL, ba BYTEA, bol BIT(1), bs BIT(2), bv BIT VARYING(2) , PRIMARY KEY(pk));
CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIME ZONE, ts_ms TIMESTAMP(3), ts_us TIMESTAMP(6), tz TIMESTAMPTZ, date DATE, CREATE TABLE time_table (pk SERIAL, ts TIMESTAMP, tsneg TIMESTAMP(6) WITHOUT TIME ZONE, ts_ms TIMESTAMP(3), ts_us TIMESTAMP(6), tz TIMESTAMPTZ, date DATE,
ti TIME, tip TIME(3), ti TIME, tip TIME(3), ttf TIME,
ttz TIME WITH TIME ZONE, tptz TIME(3) WITH TIME ZONE, ttz TIME WITH TIME ZONE, tptz TIME(3) WITH TIME ZONE,
it INTERVAL, tsp TIMESTAMP (0) WITH TIME ZONE, PRIMARY KEY(pk)); it INTERVAL, tsp TIMESTAMP (0) WITH TIME ZONE, PRIMARY KEY(pk));
CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk)); CREATE TABLE text_table (pk SERIAL, j JSON, jb JSONB, x XML, u Uuid, PRIMARY KEY(pk));