DBZ-578 Process timestamps without timezones as in server timezone

This commit is contained in:
Jiri Pechanec 2018-03-06 14:55:20 +01:00 committed by Gunnar Morling
parent 3f20c8d8cc
commit 29f8891b4f
27 changed files with 332 additions and 97 deletions

View File

@ -103,6 +103,7 @@
-->
<docker.filter>debezium/mysql-server-test-database</docker.filter>
<docker.skip>false</docker.skip>
<docker.initimage>ln -s /usr/share/zoneinfo/US/Samoa /etc/localtime</docker.initimage>
</properties>
<build>
<plugins>
@ -144,6 +145,7 @@
</run>
<build>
<from>mysql/mysql-server:${version.mysql.server}</from>
<runCmds>${docker.initimage}</runCmds>
<assembly>
<inline>
<fileSets>
@ -191,6 +193,7 @@
</run>
<build>
<from>mysql/mysql-server:${version.mysql.server}</from>
<runCmds>${docker.initimage}</runCmds>
<assembly>
<inline>
<fileSets>
@ -242,6 +245,7 @@
</run>
<build>
<from>mysql/mysql-server:${version.mysql.server}</from>
<runCmds>${docker.initimage}</runCmds>
<assembly>
<inline>
<fileSets>

View File

@ -7,6 +7,10 @@
import java.io.IOException;
import java.io.Serializable;
import java.sql.SQLException;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.HashMap;
@ -16,6 +20,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
@ -49,6 +54,7 @@
import io.debezium.function.BlockingConsumer;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.OffsetPosition;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
@ -211,24 +217,54 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
}
}
};
ZoneId serverTimezone = ZoneOffset.UTC;
try (final JdbcConnection db = connectionContext.jdbc()) {
AtomicReference<String> zoneName = new AtomicReference<String>(db.config().getString("serverTimezone"));
if (zoneName.get() == null) {
db.query("SHOW VARIABLES LIKE '%time_zone%'", rs -> {
if (rs.next()) {
zoneName.set(rs.getString(2));
}
});
}
if ("SYSTEM".equalsIgnoreCase(zoneName.get())) {
db.query("SHOW VARIABLES LIKE '%system_time_zone%'", rs -> {
if (rs.next()) {
zoneName.set(rs.getString(2));
}
});
}
try {
if (zoneName.get() != null) {
serverTimezone = ZoneId.of(zoneName.get());
}
}
catch (DateTimeException e) {
logger.warn("Invalid zone name '{}', defaulting to '{}'. Please set 'database.serverTimezone' to explicitly define database server timezone", zoneName.get(), serverTimezone);
}
}
catch (SQLException e) {
throw new ConnectException("Could not get timezone setting from database", e);
}
logger.info("Using timezone '{}' as baseline for converting to epoch");
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
eventDeserializer.setEventDataDeserializer(EventType.WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId, serverTimezone));
eventDeserializer.setEventDataDeserializer(EventType.UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId, serverTimezone));
eventDeserializer.setEventDataDeserializer(EventType.DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId, serverTimezone));
eventDeserializer.setEventDataDeserializer(EventType.EXT_WRITE_ROWS,
new RowDeserializers.WriteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
tableMapEventByTableId, serverTimezone).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_UPDATE_ROWS,
new RowDeserializers.UpdateRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
tableMapEventByTableId, serverTimezone).setMayContainExtraInformation(true));
eventDeserializer.setEventDataDeserializer(EventType.EXT_DELETE_ROWS,
new RowDeserializers.DeleteRowsDeserializer(
tableMapEventByTableId).setMayContainExtraInformation(true));
tableMapEventByTableId, serverTimezone).setMayContainExtraInformation(true));
client.setEventDeserializer(eventDeserializer);
// Set up for JMX ...

View File

@ -19,6 +19,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.config.Configuration.Builder;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnectorConfig.EventProcessingFailureHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode;
@ -35,6 +36,8 @@
public class MySqlJdbcContext implements AutoCloseable {
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull";
protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode";
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL);
protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -49,11 +52,23 @@ public MySqlJdbcContext(Configuration config) {
// to give us better JDBC database metadata behavior, including using UTF-8 for the client-side character encoding
// per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html
boolean useSSL = sslModeEnabled();
Configuration jdbcConfig = config.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true)
.edit()
.with("useSSL", Boolean.toString(useSSL))
.build();
Configuration jdbcConfig = config
.filter(x -> !(x.startsWith(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING) || x.equals(MySqlConnectorConfig.DATABASE_HISTORY.name())))
.subset("database.", true);
Builder jdbcConfigBuilder = jdbcConfig
.edit()
.with("useSSL", Boolean.toString(useSSL));
final String legacyDateTime = jdbcConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME);
if (legacyDateTime == null) {
jdbcConfigBuilder.with(JDBC_PROPERTY_LEGACY_DATETIME, "false");
}
else if ("true".equals(legacyDateTime)) {
logger.warn("'" + JDBC_PROPERTY_LEGACY_DATETIME + "'" + " is set to 'true'. This setting is not recommended and can result in timezone issues.");
}
jdbcConfig = jdbcConfigBuilder.build();
String driverClassName = jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER);
this.jdbc = new JdbcConnection(jdbcConfig,
JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL, driverClassName, getClass().getClassLoader()));

View File

@ -12,8 +12,10 @@
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoField;
@ -277,6 +279,8 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case Types.TIME:
if (adaptiveTimeMicrosecondsPrecisionMode)
return data -> convertDurationToMicroseconds(column, fieldDefn, data);
case Types.TIMESTAMP:
return ((ValueConverter)(data-> convertTimestampToUTC(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
default:
break;
}
@ -803,4 +807,14 @@ protected Object convertDurationToMicroseconds(Column column, Field fieldDefn, O
}
return handleUnknownData(column, fieldDefn, data);
}
protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object data) {
if (data == null) {
return null;
}
if (!(data instanceof Timestamp)) {
return data;
}
return LocalDateTime.ofInstant(((Timestamp)data).toInstant(), ZoneOffset.UTC);
}
}

View File

@ -15,6 +15,7 @@
import java.time.OffsetDateTime;
import java.time.Year;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.Map;
@ -53,9 +54,11 @@ class RowDeserializers {
* {@link OffsetDateTime} objects, respectively.
*/
public static class DeleteRowsDeserializer extends DeleteRowsEventDataDeserializer {
private final ZoneId serverTimezone;
public DeleteRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
public DeleteRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId, ZoneId serverTimezone) {
super(tableMapEventByTableId);
this.serverTimezone = serverTimezone;
}
@Override
@ -75,12 +78,12 @@ protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws
@Override
protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetime(inputStream);
return RowDeserializers.deserializeDatetime(inputStream, serverTimezone);
}
@Override
protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
return RowDeserializers.deserializeDatetimeV2(meta, inputStream, serverTimezone);
}
@Override
@ -115,9 +118,11 @@ protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws
* {@link OffsetDateTime} objects, respectively.
*/
public static class UpdateRowsDeserializer extends UpdateRowsEventDataDeserializer {
private final ZoneId serverTimezone;
public UpdateRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
public UpdateRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId, ZoneId serverTimezone) {
super(tableMapEventByTableId);
this.serverTimezone = serverTimezone;
}
@Override
@ -137,12 +142,12 @@ protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws
@Override
protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetime(inputStream);
return RowDeserializers.deserializeDatetime(inputStream, serverTimezone);
}
@Override
protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
return RowDeserializers.deserializeDatetimeV2(meta, inputStream, serverTimezone);
}
@Override
@ -177,9 +182,11 @@ protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws
* {@link OffsetDateTime} objects, respectively.
*/
public static class WriteRowsDeserializer extends WriteRowsEventDataDeserializer {
private final ZoneId serverTimezone;
public WriteRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
public WriteRowsDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId, ZoneId serverTimezone) {
super(tableMapEventByTableId);
this.serverTimezone = serverTimezone;
}
@Override
@ -199,12 +206,12 @@ protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws
@Override
protected Serializable deserializeDatetime(ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetime(inputStream);
return RowDeserializers.deserializeDatetime(inputStream, serverTimezone);
}
@Override
protected Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
return RowDeserializers.deserializeDatetimeV2(meta, inputStream);
return RowDeserializers.deserializeDatetimeV2(meta, inputStream, serverTimezone);
}
@Override
@ -363,7 +370,7 @@ protected static Serializable deserializeTimeV2(int meta, ByteArrayInputStream i
* @return the {@link LocalDateTime} object
* @throws IOException if there is an error reading from the binlog event data
*/
protected static Serializable deserializeDatetime(ByteArrayInputStream inputStream) throws IOException {
protected static Serializable deserializeDatetime(ByteArrayInputStream inputStream, ZoneId serverTimezone) throws IOException {
int[] split = split(inputStream.readLong(8), 100, 6);
int year = split[5];
int month = split[4]; // 1-based month number
@ -375,7 +382,8 @@ protected static Serializable deserializeDatetime(ByteArrayInputStream inputStre
if (year == 0 || month == 0 || day == 0) {
return null;
}
return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
final LocalDateTime localTime = LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
return LocalDateTime.ofInstant(localTime.atZone(serverTimezone).toInstant(), ZoneOffset.UTC);
}
/**
@ -389,7 +397,7 @@ protected static Serializable deserializeDatetime(ByteArrayInputStream inputStre
* @return the {@link LocalDateTime} object
* @throws IOException if there is an error reading from the binlog event data
*/
protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream) throws IOException {
protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStream inputStream, ZoneId serverTimezone) throws IOException {
/*
* (in big endian)
*
@ -416,7 +424,8 @@ protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStre
if (year == 0 || month == 0 || day == 0) {
return null;
}
return LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
final LocalDateTime localTime = LocalDateTime.of(year, month, day, hours, minutes, seconds, nanoOfSecond);
return LocalDateTime.ofInstant(localTime.atZone(serverTimezone).toInstant(), ZoneOffset.UTC);
}
/**
@ -431,7 +440,7 @@ protected static Serializable deserializeDatetimeV2(int meta, ByteArrayInputStre
protected static Serializable deserializeTimestamp(ByteArrayInputStream inputStream) throws IOException {
long epochSecond = inputStream.readLong(4);
int nanoSeconds = 0; // no fractional seconds
return ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneId.systemDefault());
return ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC);
}
/**
@ -447,7 +456,7 @@ protected static Serializable deserializeTimestamp(ByteArrayInputStream inputStr
protected static Serializable deserializeTimestampV2(int meta, ByteArrayInputStream inputStream) throws IOException {
long epochSecond = bigEndianLong(inputStream.read(4), 0, 4);
int nanoSeconds = deserializeFractionalSecondsInNanos(meta, inputStream);
return ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneId.systemDefault());
return ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSecond, nanoSeconds), ZoneOffset.UTC);
}
/**

View File

@ -16,6 +16,7 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -266,8 +267,8 @@ public void shouldHandleTimestampTimezones() throws Exception {
assertThat(sourceRecords.size()).isEqualTo(1);
// MySQL container is in UTC and the test time is during summer time period
ZonedDateTime expectedTimestamp = ZonedDateTime.ofInstant(
LocalDateTime.parse("2014-09-08T17:51:04.780").atZone(ZoneId.of("UTC")).toInstant(),
ZoneId.systemDefault());
LocalDateTime.parse("2014-09-09T04:51:04.780").atZone(ZoneId.of("UTC")).toInstant(),
ZoneId.ofOffset("", ZoneOffset.UTC));
String expectedTimestampString = expectedTimestamp.format(ZonedTimestamp.FORMATTER);
SourceRecord sourceRecord = sourceRecords.get(0);
Struct value = (Struct) sourceRecord.value();

View File

@ -23,7 +23,6 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAdjuster;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
@ -79,6 +78,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with("database.serverTimezone", DATABASE.timezone())
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -173,8 +173,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
ZoneOffset.UTC);
assertThat(c3DateTime.getYear()).isEqualTo(2014);
assertThat(c3DateTime.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(8);
assertThat(c3DateTime.getHour()).isEqualTo(17);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(9);
assertThat(c3DateTime.getHour()).isEqualTo(4);
assertThat(c3DateTime.getMinute()).isEqualTo(51);
assertThat(c3DateTime.getSecond()).isEqualTo(4);
assertThat(c3DateTime.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780));
@ -224,13 +224,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
String c4 = after.getString("c4"); // timestamp
OffsetDateTime c4DateTime = OffsetDateTime.parse(c4, ZonedTimestamp.FORMATTER);
// We're running the connector in the same timezone as the server, so the timezone in the timestamp
// should match our current TZ's offset at the given time...
ZoneOffset expectedOffset = ZonedDateTime.of(
LocalDate.of(1970, 1, 1).atTime(0, 0),
TimeZone.getDefault().toZoneId()
).getOffset();
assertThat(c4DateTime.getOffset()).isEqualTo(expectedOffset);
// Timestamp is stored as UTC
assertThat(c4DateTime.getOffset()).isEqualTo(ZoneOffset.UTC);
// In case the timestamp string not in our timezone, convert to UTC so we can compare ...
c4DateTime = c4DateTime.withOffsetSameInstant(ZoneOffset.of("Z"));
@ -341,6 +336,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(MySqlConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.CONNECT)
.with("database.serverTimezone", DATABASE.timezone())
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -432,8 +428,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
ZoneOffset.UTC);
assertThat(c3DateTime.getYear()).isEqualTo(2014);
assertThat(c3DateTime.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(8);
assertThat(c3DateTime.getHour()).isEqualTo(17);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(9);
assertThat(c3DateTime.getHour()).isEqualTo(4);
assertThat(c3DateTime.getMinute()).isEqualTo(51);
assertThat(c3DateTime.getSecond()).isEqualTo(4);
assertThat(c3DateTime.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780));
@ -484,13 +480,8 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
String c4 = after.getString("c4"); // MySQL timestamp, so always ZonedTimestamp
OffsetDateTime c4DateTime = OffsetDateTime.parse(c4, ZonedTimestamp.FORMATTER);
// We're running the connector in the same timezone as the server, so the timezone in the timestamp
// should match our TZ's offset at the given time...
ZoneOffset expectedOffset = ZonedDateTime.of(
LocalDate.of(1970, 1, 1).atTime(0, 0),
TimeZone.getDefault().toZoneId()
).getOffset();
assertThat(c4DateTime.getOffset()).isEqualTo(expectedOffset);
// Timestamp is stored as UTC
assertThat(c4DateTime.getOffset()).isEqualTo(ZoneOffset.UTC);
// In case the timestamp string not in our timezone, convert to UTC so we can compare ...
c4DateTime = c4DateTime.withOffsetSameInstant(ZoneOffset.of("Z"));
@ -545,6 +536,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
// Use the DB configuration to define the connector's configuration ...
config = DATABASE.defaultConfig().build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -613,9 +605,11 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// c3 DATETIME(2),
// c4 TIMESTAMP(2)
//
// Instant in UTC - 2014-09-09 04:51:04.777
// Instant in US/Samoa - 2014-09-08 17:51:04.777
// {"c1" : "16321", "c2" : "17:51:04.777", "c3" : "1410198664780", "c4" : "2014-09-08T17:51:04.78-05:00"}
// '2014-09-08'
// '2014-09-08' - date type is not dependent on timezone so no shift is needed
Integer c1 = after.getInt32("c1"); // epoch days
LocalDate c1Date = LocalDate.ofEpochDay(c1);
assertThat(c1Date.getYear()).isEqualTo(2014);
@ -623,7 +617,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertThat(c1Date.getDayOfMonth()).isEqualTo(8);
assertThat(io.debezium.time.Date.toEpochDay(c1Date, ADJUSTER)).isEqualTo(c1);
// '17:51:04.777'
// '17:51:04.777' - time is Duration so no timeshift is needed
Long c2 = after.getInt64("c2");
Duration c2Time = Duration.ofNanos(c2 * 1_000);
assertThat(c2Time.toHours()).isEqualTo(17);
@ -633,38 +627,32 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
assertThat(c2Time.toNanos()).isEqualTo(64264780000000L);
assertThat(c2Time).isEqualTo(Duration.ofHours(17).plusMinutes(51).plusSeconds(4).plusMillis(780));
// '2014-09-08 17:51:04.777'
// '2014-09-08 17:51:04.777' -> '2014-09-09 04:51:04.78'
Long c3 = after.getInt64("c3"); // epoch millis
long c3Seconds = c3 / 1000;
long c3Millis = c3 % 1000;
LocalDateTime c3DateTime = LocalDateTime.ofEpochSecond(c3Seconds,
(int) TimeUnit.MILLISECONDS.toNanos(c3Millis),
ZoneOffset.UTC);
(int) TimeUnit.MILLISECONDS.toNanos(c3Millis),
ZoneOffset.UTC);
assertThat(c3DateTime.getYear()).isEqualTo(2014);
assertThat(c3DateTime.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(8);
assertThat(c3DateTime.getHour()).isEqualTo(17);
assertThat(c3DateTime.getDayOfMonth()).isEqualTo(9);
assertThat(c3DateTime.getHour()).isEqualTo(4);
assertThat(c3DateTime.getMinute()).isEqualTo(51);
assertThat(c3DateTime.getSecond()).isEqualTo(4);
assertThat(c3DateTime.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780));
assertThat(io.debezium.time.Timestamp.toEpochMillis(c3DateTime, ADJUSTER)).isEqualTo(c3);
// '2014-09-08 17:51:04.777'
// '2014-09-08 17:51:04.777' -> '2014-09-09 04:51:04.78'
String c4 = after.getString("c4"); // timestamp
ZoneOffset localOffset = ZonedDateTime.of(
LocalDate.of(2014, 9, 8).atTime(0, 0),
TimeZone.getDefault().toZoneId()
).getOffset();
OffsetDateTime c4DateTime = OffsetDateTime.parse(c4, ZonedTimestamp.FORMATTER);
// In case the timestamp string not in our timezone, convert to ours so we can compare ...
c4DateTime = c4DateTime.withOffsetSameInstant(localOffset);
assertThat(c4DateTime.getYear()).isEqualTo(2014);
assertThat(c4DateTime.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(c4DateTime.getDayOfMonth()).isEqualTo(8);
assertThat(c4DateTime.getDayOfMonth()).isEqualTo(9);
// Difference depends upon whether the zone we're in is also using DST as it is on the date in question ...
assertThat(c4DateTime.getHour() == 16 || c4DateTime.getHour() == 17).isTrue();
assertThat(c4DateTime.getHour()).isEqualTo(4);
assertThat(c4DateTime.getMinute()).isEqualTo(51);
assertThat(c4DateTime.getSecond()).isEqualTo(4);
assertThat(c4DateTime.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780));
@ -933,7 +921,7 @@ private void assertTimestamp(String c4) {
// '2014-09-08 17:51:04.777'
// MySQL container is in UTC and the test time is during summer time period
ZonedDateTime expectedTimestamp = ZonedDateTime.ofInstant(
LocalDateTime.parse("2014-09-08T17:51:04.780").atZone(ZoneId.of("UTC")).toInstant(),
LocalDateTime.parse("2014-09-08T17:51:04.780").atZone(ZoneId.of("US/Samoa")).toInstant(),
ZoneId.systemDefault());
ZoneId defaultZoneId = ZoneId.systemDefault();
ZonedDateTime c4DateTime = ZonedDateTime.parse(c4, ZonedTimestamp.FORMATTER).withZoneSameInstant(defaultZoneId);

View File

@ -11,6 +11,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -35,6 +36,8 @@
*
*/
public class UniqueDatabase {
private static final ZoneId TIMEZONE = ZoneId.of("US/Samoa");
private static final String DEFAULT_DATABASE = "mysql";
private static final String[] CREATE_DATABASE_DDL = new String[] {
"CREATE DATABASE $DBNAME$;",
@ -194,4 +197,11 @@ public Configuration.Builder defaultConfig() {
public String getIdentifier() {
return identifier;
}
/**
* @return timezone in which the database is located
*/
public ZoneId timezone() {
return TIMEZONE;
}
}

View File

@ -23,9 +23,10 @@
<postgres.db.name>postgres</postgres.db.name>
<postgres.encoding>UTF8</postgres.encoding>
<postgres.system.lang>en_US.utf8</postgres.system.lang>
<docker.filter>debezium/postgres:${version.postgres.server}</docker.filter>
<postgres.image>debezium/postgres:${version.postgres.server}</postgres.image>
<docker.skip>false</docker.skip>
<docker.showLogs>true</docker.showLogs>
<docker.initimage>ln -fs /usr/share/zoneinfo/US/Samoa /etc/localtime &amp;&amp; echo timezone=US/Samoa >> /usr/share/postgresql/postgresql.conf.sample</docker.initimage>
<protobuf.output.directory>${project.basedir}/generated-sources</protobuf.output.directory>
</properties>
@ -130,7 +131,7 @@
<images>
<image>
<!-- A Docker image using the Postgres Server with the DBZ decoderbufs plugin -->
<name>${docker.filter}</name>
<name>debezium/postgres-server-test-database</name>
<run>
<namingStrategy>none</namingStrategy>
<env>
@ -153,6 +154,10 @@
<log>(?s)PostgreSQL init process complete.*database system is ready to accept connections</log>
</wait>
</run>
<build>
<from>${postgres.image}</from>
<runCmds>${docker.initimage}</runCmds>
</build>
</image>
</images>
</configuration>
@ -165,6 +170,7 @@
<id>start</id>
<phase>pre-integration-test</phase>
<goals>
<goal>build</goal>
<goal>start</goal>
</goals>
</execution>

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql;
import java.math.BigDecimal;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -653,6 +654,15 @@ public String getPostgresPluginName() {
+ "'string' uses string to represent values (including the special ones like NaN or Infinity); "
+ "'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in consumers.");
public static final Field SERVER_ZONE_OFFSET = Field.create("server.zone.offset")
.withDisplayName("Timezone offset of database server")
.withType(Type.STRING)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withValidation(Field::isZoneOffset)
.withDescription("Specifies timezone offset of the timezone where database server is located. "
+ "This value is used to create timestamps without timezones to the value as defined by server timezone.");
public static final Field STATUS_UPDATE_INTERVAL_MS = Field.create("status.update.interval.ms")
.withDisplayName("Status update interval (ms)")
.withType(Type.INT) // Postgres doesn't accept long for this value
@ -702,7 +712,8 @@ public String getPostgresPluginName() {
SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD,
SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY,
STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES,
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, CommonConnectorConfig.TOMBSTONES_ON_DELETE,
SERVER_ZONE_OFFSET);
private final Configuration config;
private final String serverName;
@ -754,6 +765,10 @@ protected Integer statusUpdateIntervalMillis() {
return config.getInteger(STATUS_UPDATE_INTERVAL_MS, null);
}
protected ZoneOffset serverZoneOffset() {
return ZoneOffset.of(config.getString(SERVER_ZONE_OFFSET, "Z"));
}
protected TemporalPrecisionMode temporalPrecisionMode() {
return temporalPrecisionMode;
}
@ -840,7 +855,7 @@ protected static ConfigDef configDef() {
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE);
CommonConnectorConfig.TOMBSTONES_ON_DELETE, SERVER_ZONE_OFFSET);
Field.group(config, "Connector", TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.POLL_INTERVAL_MS, CommonConnectorConfig.MAX_BATCH_SIZE, CommonConnectorConfig.MAX_QUEUE_SIZE,
SNAPSHOT_MODE, SNAPSHOT_LOCK_TIMEOUT_MS, TIME_PRECISION_MODE, DECIMAL_HANDLING_MODE, ROWS_FETCH_SIZE);
return config;

View File

@ -65,7 +65,7 @@ protected PostgresSchema(PostgresConnectorConfig config, TypeRegistry typeRegist
this.topicSelector = topicSelector;
this.valueConverter = new PostgresValueConverter(config.decimalHandlingMode(), config.temporalPrecisionMode(),
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry);
ZoneOffset.UTC, null, config.includeUnknownDatatypes(), typeRegistry, config.serverZoneOffset());
this.schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
this.schemaBuilder = new TableSchemaBuilder(valueConverter, this.schemaNameAdjuster, SourceInfo.SCHEMA);

View File

@ -60,6 +60,7 @@ protected ReplicationConnection createReplicationConnection() throws SQLExceptio
.dropSlotOnClose(config.dropSlotOnStop())
.statusUpdateIntervalMillis(config.statusUpdateIntervalMillis())
.withTypeRegistry(schema.getTypeRegistry())
.withServerTimezone(config.serverZoneOffset())
.build();
}

View File

@ -10,11 +10,13 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Date;
@ -92,12 +94,18 @@ public class PostgresValueConverter extends JdbcValueConverters {
*/
private final boolean includeUnknownDatatypes;
/**
* Offset of timezone used by the database server
*/
private final ZoneOffset serverZoneOffset;
private final TypeRegistry typeRegistry;
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry) {
protected PostgresValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, ZoneOffset defaultOffset, BigIntUnsignedMode bigIntUnsignedMode, boolean includeUnknownDatatypes, TypeRegistry typeRegistry, ZoneOffset serverZoneOffset) {
super(decimalMode, temporalPrecisionMode, defaultOffset, null, bigIntUnsignedMode);
this.includeUnknownDatatypes = includeUnknownDatatypes;
this.typeRegistry = typeRegistry;
this.serverZoneOffset = serverZoneOffset;
}
@Override
@ -217,6 +225,8 @@ public ValueConverter converter(Column column, Field fieldDefn) {
return convertBits(column, fieldDefn);
case PgOid.INTERVAL:
return data -> convertInterval(column, fieldDefn, data);
case PgOid.TIMESTAMP:
return ((ValueConverter)(data-> convertTimestampToUTC(column, fieldDefn, data))).and(super.converter(column, fieldDefn));
case PgOid.TIMESTAMPTZ:
return data -> convertTimestampWithZone(column, fieldDefn, data);
case PgOid.TIMETZ:
@ -622,6 +632,20 @@ public static Optional<SpecialValueDecimal> toSpecialValue(String value) {
return Optional.empty();
}
protected Object convertTimestampToUTC(Column column, Field fieldDefn, Object data) {
if (data == null) {
return null;
}
if (!(data instanceof Timestamp)) {
return data;
}
final Timestamp timestamp = (Timestamp) data;
final LocalDateTime serverLocalTime = timestamp.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
final LocalDateTime utcTime = LocalDateTime
.ofInstant(serverLocalTime.atOffset(serverZoneOffset).toInstant(), ZoneOffset.UTC);
return utcTime;
}
@Override
protected int getTimePrecision(Column column) {
return column.scale();

View File

@ -8,6 +8,7 @@
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.ZoneOffset;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
@ -29,8 +30,9 @@ public interface MessageDecoder {
* @param buffer - binary representation of replication message
* @param processor - message processing on arrival
* @param typeRegistry - registry with known types
* @param serverTimezone - a timezone in which the database server is located
*/
void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException;
void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException;
/**
* Allows MessageDecoder to configure options with which the replication stream is started.

View File

@ -10,6 +10,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@ -45,6 +46,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private final Integer statusUpdateIntervalMillis;
private final MessageDecoder messageDecoder;
private final TypeRegistry typeRegistry;
private final ZoneOffset serverTimezone;
private long defaultStartingPos;
@ -64,8 +66,9 @@ private PostgresReplicationConnection(Configuration config,
PostgresConnectorConfig.LogicalDecoder plugin,
boolean dropSlotOnClose,
Integer statusUpdateIntervalMillis,
TypeRegistry typeRegistry) {
super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);
TypeRegistry typeRegistry,
ZoneOffset serverTimezone) {
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
this.originalConfig = config;
this.slotName = slotName;
@ -74,6 +77,7 @@ private PostgresReplicationConnection(Configuration config,
this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
this.messageDecoder = plugin.messageDecoder();
this.typeRegistry = typeRegistry;
this.serverTimezone = serverTimezone;
try {
initReplicationSlot();
@ -215,7 +219,7 @@ public void readPending(ReplicationMessageProcessor processor) throws SQLExcepti
private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
lastReceivedLSN = stream.getLastReceiveLSN();
messageDecoder.processMessage(buffer, processor, typeRegistry);
messageDecoder.processMessage(buffer, processor, typeRegistry, serverTimezone);
}
@Override
@ -324,6 +328,7 @@ protected static class ReplicationConnectionBuilder implements Builder {
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Integer statusUpdateIntervalMillis;
private TypeRegistry typeRegistry;
private ZoneOffset serverTimezone;
protected ReplicationConnectionBuilder(Configuration config) {
assert config != null;
@ -359,7 +364,7 @@ public ReplicationConnectionBuilder statusUpdateIntervalMillis(final Integer sta
@Override
public ReplicationConnection build() {
assert plugin != null : "Decoding plugin name is not set";
return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry);
return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis, typeRegistry, serverTimezone);
}
@Override
@ -367,5 +372,11 @@ public Builder withTypeRegistry(TypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
return this;
}
@Override
public Builder withServerTimezone(ZoneOffset serverTimezone) {
this.serverTimezone = serverTimezone;
return this;
}
}
}

View File

@ -7,6 +7,7 @@
package io.debezium.connector.postgresql.connection;
import java.sql.SQLException;
import java.time.ZoneOffset;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
@ -129,10 +130,13 @@ interface Builder {
Builder withTypeRegistry(TypeRegistry typeRegistry);
Builder withServerTimezone(ZoneOffset serverTimezone);
/**
* Creates a new {@link ReplicationConnection} instance
* @return a connection, never null
*/
ReplicationConnection build();
}
}

View File

@ -7,6 +7,7 @@
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Arrays;
import org.apache.kafka.connect.errors.ConnectException;
@ -30,7 +31,7 @@
public class PgProtoMessageDecoder implements MessageDecoder {
@Override
public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException(
@ -45,7 +46,7 @@ public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor
message.getNewTupleCount(),
message.getNewTypeinfoCount()));
}
processor.process(new PgProtoReplicationMessage(message, typeRegistry));
processor.process(new PgProtoReplicationMessage(message, typeRegistry, serverTimezone));
} catch (InvalidProtocolBufferException e) {
throw new ConnectException(e);
}

View File

@ -9,6 +9,9 @@
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
@ -31,6 +34,7 @@
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.time.Conversions;
import io.debezium.util.Strings;
/**
@ -44,10 +48,12 @@ class PgProtoReplicationMessage implements ReplicationMessage {
private final PgProto.RowMessage rawMessage;
private final TypeRegistry typeRegistry;
private final ZoneOffset serverTimezone;
public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry) {
public PgProtoReplicationMessage(PgProto.RowMessage rawMessage, TypeRegistry typeRegistry, ZoneOffset serverTimezone) {
this.rawMessage = rawMessage;
this.typeRegistry = typeRegistry;
this.serverTimezone = serverTimezone;
}
@Override
@ -169,6 +175,14 @@ else if (datumMessage.hasDatumString()) {
case PgOid.DATE:
return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null;
case PgOid.TIMESTAMP:
if (!datumMessage.hasDatumInt64()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
// but we'll convert them to nanos which is the smallest unit
final LocalDateTime serverLocal = Conversions.toLocalDateTimeUTC(datumMessage.getDatumInt64());
final Instant utc = serverLocal.atOffset(serverTimezone).toInstant();
return Conversions.toEpochNanos(utc);
case PgOid.TIMESTAMPTZ:
case PgOid.TIME:
if (!datumMessage.hasDatumInt64()) {

View File

@ -8,6 +8,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Iterator;
@ -42,7 +43,7 @@ public class NonStreamingWal2JsonMessageDecoder implements MessageDecoder {
private boolean containsMetadata = false;
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
@ -59,7 +60,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
Iterator<Entry> it = changes.iterator();
while (it.hasNext()) {
Value value = it.next().getValue();
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry));
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), containsMetadata, !it.hasNext(), typeRegistry, serverTimezone));
}
} catch (final IOException e) {
throw new ConnectException(e);

View File

@ -8,6 +8,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Arrays;
import org.apache.kafka.connect.errors.ConnectException;
@ -106,7 +107,7 @@ public class StreamingWal2JsonMessageDecoder implements MessageDecoder {
private long commitTime;
@Override
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor, TypeRegistry typeRegistry, ZoneOffset serverTimezone) throws SQLException, InterruptedException {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
@ -145,14 +146,14 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
}
else if (firstChar == COMMA) {
// following changes, they have an extra comma at the start of message
doProcessMessage(processor, typeRegistry, currentChunk, false);
doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, false);
replaceFirstNonWhiteChar(content, SPACE);
currentChunk = content;
}
else if (firstChar == RIGHT_BRACKET) {
// No more changes
if (currentChunk != null) {
doProcessMessage(processor, typeRegistry, currentChunk, true);
doProcessMessage(processor, typeRegistry, currentChunk, serverTimezone, true);
}
messageInProgress = false;
}
@ -197,12 +198,12 @@ private boolean isWhitespace(byte c) {
return (c >= TAB && c <= CR) || c == SPACE;
}
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage)
private void doProcessMessage(ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, ZoneOffset serverTimezone, boolean lastMessage)
throws IOException, SQLException, InterruptedException {
final Document change = DocumentReader.floatNumbersAsTextReader().read(content);
LOGGER.trace("Change arrived for decoding {}", change);
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry));
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, change, containsMetadata, lastMessage, typeRegistry, serverTimezone));
}
@Override

View File

@ -8,6 +8,9 @@
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -38,6 +41,7 @@
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.Value;
import io.debezium.time.Conversions;
import io.debezium.util.Strings;
/**
@ -55,14 +59,16 @@ class Wal2JsonReplicationMessage implements ReplicationMessage {
private final boolean hasMetadata;
private final boolean lastEventForLsn;
private final TypeRegistry typeRegistry;
private final ZoneOffset serverTimezone;
public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry) {
public Wal2JsonReplicationMessage(long txId, long commitTime, Document rawMessage, boolean hasMetadata, boolean lastEventForLsn, TypeRegistry typeRegistry, ZoneOffset serverTimezone) {
this.txId = txId;
this.commitTime = commitTime;
this.rawMessage = rawMessage;
this.hasMetadata = hasMetadata;
this.lastEventForLsn = lastEventForLsn;
this.typeRegistry = typeRegistry;
this.serverTimezone = serverTimezone;
}
@Override
@ -253,7 +259,9 @@ else if (rawValue.isBigInteger()) {
case "timestamp":
case "timestamp without time zone":
return DateTimeFormat.get().timestamp(rawValue.asString());
final LocalDateTime serverLocal = Conversions.fromNanosToLocalDateTimeUTC(DateTimeFormat.get().timestamp(rawValue.asString()));
final Instant utc = serverLocal.atOffset(serverTimezone).toInstant();
return Conversions.toEpochNanos(utc);
case "time":
case "time without time zone":

View File

@ -296,9 +296,9 @@ protected List<SchemaAndValueField> schemaAndValuesForBinTypes() {
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;
@ -321,9 +321,9 @@ protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypes() {
}
protected List<SchemaAndValueField> schemaAndValuesForDateTimeTypesAdaptiveTimeMicroseconds() {
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-04T13:51:30.123456"), null);
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-25T22:10:12.608"), null);
long expectedTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
long expectedTsMs = Timestamp.toEpochMillis(LocalDateTime.parse("2016-11-05T00:51:30.123456"), null); // Need to compare it with value in UTC
long expectedNegTs = MicroTimestamp.toEpochMicros(LocalDateTime.parse("1936-10-26T09:10:12.608"), null); // Need to compare it with value in UTC
String expectedTz = "2016-11-04T11:51:30Z"; //timestamp is stored with TZ, should be read back with UTC
int expectedDate = Date.toEpochDay(LocalDate.parse("2016-11-04"), null);
long expectedTi = LocalTime.parse("13:51:30").toNanoOfDay() / 1_000;

View File

@ -45,7 +45,9 @@ public void before() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
TestHelper.executeDDL("postgis_create_tables.ddl");
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().build());
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
.build());
TopicSelector selector = TopicSelector.create(config);
context = new PostgresTaskContext(
config,
@ -180,6 +182,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
PostgresConnectorConfig config = new PostgresConnectorConfig(
TestHelper.defaultConfig()
.with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
.build());
TopicSelector selector = TopicSelector.create(config);

View File

@ -63,7 +63,10 @@ public void before() throws Exception {
"CREATE TABLE table_with_interval (id SERIAL PRIMARY KEY, title VARCHAR(512) NOT NULL, time_limit INTERVAL DEFAULT '60 days'::INTERVAL NOT NULL);" +
"INSERT INTO test_table(text) VALUES ('insert');";
TestHelper.execute(statements);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build());
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true)
.with(PostgresConnectorConfig.SERVER_ZONE_OFFSET, TestHelper.databaseTimeZone())
.build());
setupRecordsProducer(config);
}
@ -644,7 +647,6 @@ private void setupRecordsProducer(PostgresConnectorConfig config) {
new PostgresSchema(config, TestHelper.getTypeRegistry(), selector),
selector
);
recordsProducer = new RecordsStreamProducer(context, new SourceInfo(config.serverName()));
}

View File

@ -13,6 +13,7 @@
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.time.ZoneOffset;
import java.util.Set;
import java.util.stream.Collectors;
@ -52,6 +53,7 @@ public static ReplicationConnection createForReplication(String slotName, boolea
.withSlot(slotName)
.withTypeRegistry(getTypeRegistry())
.dropSlotOnClose(dropOnClose)
.withServerTimezone(databaseTimeZone())
.build();
}
@ -176,4 +178,8 @@ protected static boolean shouldSSLConnectionFail() {
protected static int waitTimeForRecords() {
return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "records.waittime", "2"));
}
protected static ZoneOffset databaseTimeZone() {
return ZoneOffset.of(System.getProperty(TEST_PROPERTY_PREFIX + "database.timeoffset", "-11:00"));
}
}

View File

@ -5,6 +5,8 @@
*/
package io.debezium.config;
import java.time.DateTimeException;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1189,4 +1191,15 @@ public static int isDouble(Configuration config, Field field, ValidationOutput p
return 0;
}
public static int isZoneOffset(Configuration config, Field field, ValidationOutput problems) {
String value = config.getString(field);
if (value == null) return 0;
try {
ZoneOffset.of(value);
} catch (DateTimeException e) {
problems.accept(field, value, "A zone offset string representation is expected");
return 1;
}
return 0;
}
}

View File

@ -6,9 +6,11 @@
package io.debezium.time;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
/**
@ -16,9 +18,10 @@
*
* @author Randall Hauch
*/
final class Conversions {
public final class Conversions {
static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
static final long MICROSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toMicros(1);
static final long NANOSECONDS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);
static final long NANOSECONDS_PER_MICROSECOND = TimeUnit.MICROSECONDS.toNanos(1);
@ -184,4 +187,47 @@ static long toEpochNanos(LocalDate date) {
return epochDay * Conversions.NANOSECONDS_PER_DAY;
}
/**
* Get the UTC-based {@link LocalDateTime} for given microseconds epoch
*
* @param microseconds - timestamp in microseconds
* @return timestamp in UTC timezone
*/
public static LocalDateTime toLocalDateTimeUTC(long microseconds) {
long seconds = microseconds / MICROSECONDS_PER_SECOND;
// typecasting is safe as microseconds and nanoseconds in second fit in int range
int microsecondsOfSecond = (int)(microseconds % MICROSECONDS_PER_SECOND);
if (microsecondsOfSecond < 0) {
seconds--;
microsecondsOfSecond = (int)Conversions.MICROSECONDS_PER_SECOND + microsecondsOfSecond;
}
return LocalDateTime.ofEpochSecond(seconds, (int)(microsecondsOfSecond * NANOSECONDS_PER_MICROSECOND), ZoneOffset.UTC);
}
/**
* Get the UTC-based {@link LocalDateTime} for given nanoseconds epoch
*
* @param nanoseconds - timestamp in nanoseconds
* @return timestamp in UTC timezone
*/
public static LocalDateTime fromNanosToLocalDateTimeUTC(long nanoseconds) {
long seconds = nanoseconds / NANOSECONDS_PER_SECOND;
// typecasting is safe as microseconds and nanoseconds in second fit in int range
int nanosecondsOfSecond = (int)(nanoseconds % NANOSECONDS_PER_SECOND);
if (nanosecondsOfSecond < 0) {
seconds--;
nanosecondsOfSecond = (int)Conversions.NANOSECONDS_PER_SECOND + nanosecondsOfSecond;
}
return LocalDateTime.ofEpochSecond(seconds, nanosecondsOfSecond, ZoneOffset.UTC);
}
/**
* Get the number of nanoseconds past epoch of the given {@link Instant}.
*
* @param instant the Java instant value
* @return the epoch nanoseconds
*/
public static long toEpochNanos(Instant instant) {
return TimeUnit.NANOSECONDS.convert(instant.getEpochSecond() * MICROSECONDS_PER_SECOND + instant.getNano() / NANOSECONDS_PER_MICROSECOND, TimeUnit.MICROSECONDS);
}
}