DBZ-85 Added test case and made correction to temporal values

Added an integration test case to diagnose the loss of the fractional seconds from MySQL temporal values. The problem appears to be a bug in the MySQL Binary Log Connector library that we used, and this bug was reported as https://github.com/shyiko/mysql-binlog-connector-java/issues/103. That was fixed in version 0.3.2 of the library, which Stanley was kind enough to release for us.

During testing, though, several issues were discovered in how temporal values are handled and converted from the MySQL events, through the MySQL Binary Log client library, and through the Debezium MySQL connector to conform with Kafka Connect's various temporal logical schema types. Most of the issues involved converting most of the temporal values from local time zone (which is how they are created by the MySQL Binary Log client) into UTC (which is how Kafka Connect expects them). Really, java.util.Date doesn't have time zone information and instead tracks the number of milliseconds past epoch, but the conversion of normal timestamp information to the milliseconds past epoch in UTC depends on the time zone in which that conversion happens.
This commit is contained in:
Randall Hauch 2016-07-19 14:04:42 -05:00
parent 4a84a1d8d9
commit 30777e3345
8 changed files with 697 additions and 52 deletions

View File

@ -11,14 +11,20 @@
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TimeZoneAdapter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
@ -70,6 +76,18 @@ public class MySqlSchema {
/**
* Create a schema component given the supplied {@link MySqlConnectorConfig MySQL connector configuration}.
* <p>
* This component sets up a {@link TimeZoneAdapter} that is specific to how the MySQL Binary Log client library
* works. The {@link AbstractRowsEventDataDeserializer} class has various methods to instantiate the
* {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and {@link java.sql.Timestamp} temporal values,
* where the values for {@link java.util.Date}, {@link java.sql.Date}, and {@link java.sql.Time} are all in terms of
* the <em>local time zone</em> (since it uses {@link java.util.Calendar#getInstance()}), but where the
* {@link java.sql.Timestamp} values are created differently using the milliseconds past epoch and therefore in terms of
* the <em>UTC time zone</em>.
* <p>
* And, because Kafka Connect {@link Time}, {@link Date}, and {@link Timestamp} logical
* schema types all expect the {@link java.util.Date} to be in terms of the <em>UTC time zone</em>, the
* {@link TimeZoneAdapter} also needs to produce {@link java.util.Date} values that will be correct in UTC.
*
* @param config the connector configuration, which is presumed to be valid
* @param serverName the name of the server
@ -80,10 +98,18 @@ public MySqlSchema(Configuration config, String serverName) {
this.tables = new Tables();
this.ddlChanges = new DdlChanges(this.ddlParser.terminator());
this.ddlParser.addListener(ddlChanges);
this.schemaBuilder = new TableSchemaBuilder(schemaNameValidator::validate);
if ( serverName != null ) serverName = serverName.trim();
// Specific to how the MySQL Binary Log client library creates temporal values ...
TimeZoneAdapter tzAdapter = TimeZoneAdapter.create()
.withLocalZoneForUtilDate()
.withLocalZoneForSqlDate()
.withLocalZoneForSqlTime()
.withUtcZoneForSqlTimestamp()
.withUtcTargetZone();
this.schemaBuilder = new TableSchemaBuilder(tzAdapter, schemaNameValidator::validate);
if (serverName != null) serverName = serverName.trim();
this.serverName = serverName;
if ( this.serverName == null || serverName.isEmpty() ) {
if (this.serverName == null || serverName.isEmpty()) {
this.schemaPrefix = "";
} else {
this.schemaPrefix = serverName.endsWith(".") ? serverName : serverName + ".";
@ -97,7 +123,7 @@ public MySqlSchema(Configuration config, String serverName) {
}
// Do not remove the prefix from the subset of config properties ...
Configuration dbHistoryConfig = config.subset(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING, false);
this.dbHistory.configure(dbHistoryConfig,HISTORY_COMPARATOR); // validates
this.dbHistory.configure(dbHistoryConfig, HISTORY_COMPARATOR); // validates
}
/**
@ -212,8 +238,8 @@ protected void changeTablesAndRecordInHistory(SourceInfo source, Callable<Void>
changeFunction.call();
} catch (Exception e) {
this.tables = copy;
if ( e instanceof SQLException) throw (SQLException)e;
this.logger.error("Unexpected error whle changing model of MySQL schemas: {}",e.getMessage(),e);
if (e instanceof SQLException) throw (SQLException) e;
this.logger.error("Unexpected error whle changing model of MySQL schemas: {}", e.getMessage(), e);
}
// At least one table has changed or was removed, so first refresh the Kafka Connect schemas ...
@ -309,12 +335,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem
// to the same _affected_ database...
ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> {
if (filters.databaseFilter().test(dbName)) {
if ( dbName == null ) dbName = "";
if (dbName == null) dbName = "";
statementConsumer.consume(dbName, ddlStatements);
}
});
} else if (filters.databaseFilter().test(databaseName)) {
if ( databaseName == null ) databaseName = "";
if (databaseName == null) databaseName = "";
statementConsumer.consume(databaseName, ddlStatements);
}
}

View File

@ -220,3 +220,12 @@ CREATE TABLE dbz84_integer_types_table (
);
INSERT INTO dbz84_integer_types_table
VALUES(127,-128,128,255, default,201,202,203, default,301,302,303, default,401,402,403, default,501,502,503);
-- DBZ-85 handle fractional part of seconds
CREATE TABLE dbz_85_fractest (
c1 DATE,
c2 TIME(2),
c3 DATETIME(2),
c4 TIMESTAMP(2)
);
INSERT INTO dbz_85_fractest VALUES ('2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777');

View File

@ -7,7 +7,13 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Month;
import java.time.ZoneId;
import org.apache.kafka.connect.data.Struct;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -16,6 +22,7 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
@ -69,22 +76,75 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(3 + 2); // 3 schema change record, 2 inserts
Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(4 + 3); // 4 schema change record, 3 inserts
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(4);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(3);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(4);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(3);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(4);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
// Check that all records are valid, can be serialized and deserialized ...
records.forEach(this::validate);
records.forEach(record->{
Struct value = (Struct)record.value();
if ( record.topic().endsWith("dbz_85_fractest")) {
// The microseconds of all three should be exactly 780
Struct after = value.getStruct(Envelope.FieldName.AFTER);
java.util.Date c1 = (java.util.Date)after.get("c1");
java.util.Date c2 = (java.util.Date)after.get("c2");
java.util.Date c3 = (java.util.Date)after.get("c3");
java.util.Date c4 = (java.util.Date)after.get("c4");
Testing.debug("c1 = " + c1.getTime());
Testing.debug("c2 = " + c2.getTime());
Testing.debug("c3 = " + c3.getTime());
Testing.debug("c4 = " + c4.getTime());
assertThat(c1.getTime() % 1000).isEqualTo(0); // date only, no time
assertThat(c2.getTime() % 1000).isEqualTo(780);
assertThat(c3.getTime() % 1000).isEqualTo(780);
assertThat(c4.getTime() % 1000).isEqualTo(780);
assertThat(c1.getTime()).isEqualTo(1410134400000L);
assertThat(c2.getTime()).isEqualTo(64264780L);
assertThat(c3.getTime()).isEqualTo(1410198664780L);
assertThat(c4.getTime()).isEqualTo(1410198664780L);
// None of these Dates have timezone information, so to convert to locals we have to use our local timezone ...
ZoneId utc = ZoneId.of("UTC");
LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate();
LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime();
LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime();
LocalDateTime localC4 = c4.toInstant().atZone(utc).toLocalDateTime();
// row is ('2014-09-08', '17:51:04.78', '2014-09-08 17:51:04.78', '2014-09-08 17:51:04.78')
final int expectedNanos = 780 * 1000 * 1000;
assertThat(localC1.getYear()).isEqualTo(2014);
assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC1.getDayOfMonth()).isEqualTo(8);
assertThat(localC2.getHour()).isEqualTo(17);
assertThat(localC2.getMinute()).isEqualTo(51);
assertThat(localC2.getSecond()).isEqualTo(4);
assertThat(localC2.getNano()).isEqualTo(expectedNanos);
assertThat(localC3.getYear()).isEqualTo(2014);
assertThat(localC3.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC3.getDayOfMonth()).isEqualTo(8);
assertThat(localC3.getHour()).isEqualTo(17);
assertThat(localC3.getMinute()).isEqualTo(51);
assertThat(localC3.getSecond()).isEqualTo(4);
assertThat(localC3.getNano()).isEqualTo(expectedNanos);
assertThat(localC4.getYear()).isEqualTo(2014);
assertThat(localC4.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(localC4.getDayOfMonth()).isEqualTo(8);
assertThat(localC4.getHour()).isEqualTo(17);
assertThat(localC4.getMinute()).isEqualTo(51);
assertThat(localC4.getSecond()).isEqualTo(4);
assertThat(localC4.getNano()).isEqualTo(expectedNanos);
}
});
}
}

View File

@ -6,6 +6,9 @@
package io.debezium.data;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@ -22,7 +25,7 @@
* @author Randall Hauch
*/
public class SchemaUtil {
private SchemaUtil() {
}
@ -228,8 +231,24 @@ public RecordWriter append(Object obj) {
}
appendAdditional("value", record.value());
sb.append('}');
} else if ( obj instanceof java.sql.Time ){
java.sql.Time time = (java.sql.Time)obj;
append(DateTimeFormatter.ISO_LOCAL_TIME.format(time.toLocalTime()));
} else if ( obj instanceof java.sql.Date ){
java.sql.Date date = (java.sql.Date)obj;
append(DateTimeFormatter.ISO_DATE.format(date.toLocalDate()));
} else if ( obj instanceof java.sql.Timestamp ){
java.sql.Timestamp ts = (java.sql.Timestamp)obj;
Instant instant = ts.toInstant();
append(DateTimeFormatter.ISO_INSTANT.format(instant));
} else if ( obj instanceof java.util.Date ){
java.util.Date date = (java.util.Date)obj;
append(DateTimeFormatter.ISO_INSTANT.format(date.toInstant()));
} else if ( obj instanceof TemporalAccessor ){
TemporalAccessor temporal = (TemporalAccessor)obj;
append(DateTimeFormatter.ISO_INSTANT.format(temporal));
} else {
sb.append(obj.toString());
append(obj.toString());
}
return this;
}

View File

@ -0,0 +1,330 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.jdbc;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import io.debezium.annotation.Immutable;
/**
* An adapter that can convert {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, and
* {@link java.sql.Timestamp} objects to {@link ZonedDateTime} instances, where the time zone in which the temporal objects
* were created by the database/driver can be adjusted.
*
* @author Randall Hauch
*/
@Immutable
public class TimeZoneAdapter {
private static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
public static final ZoneId UTC = ZoneId.of("UTC");
/**
* Create a new adapter with UTC as the target zone and for a database that uses UTC for all temporal values.
*
* @return the new adapter
*/
public static TimeZoneAdapter create() {
return new TimeZoneAdapter(UTC, UTC, UTC, UTC, UTC);
}
/**
* Create a new adapter for a database that uses the specified zone for all temporal values.
*
* @param zoneId the zone in which all temporal values are created by the database; may not be null
* @return the new adapter
*/
public static TimeZoneAdapter originatingIn(ZoneId zoneId) {
return new TimeZoneAdapter(ZoneId.systemDefault(), zoneId, zoneId, zoneId, zoneId);
}
/**
* Create a new adapter for a database that creates all temporal values in UTC.
*
* @return the new adapter
*/
public static TimeZoneAdapter originatingInUtc() {
return originatingIn(UTC);
}
/**
* Create a new adapter for a database that creates all temporal values in the local system time zone,
* which is the same time zone used by {@link java.util.Calendar#getInstance()}.
*
* @return the new adapter
*/
public static TimeZoneAdapter originatingInLocal() {
return originatingIn(ZoneId.systemDefault()); // same as Calendar.getInstance().getTimeZone().toZoneId()
}
private final ZoneId targetZoneId;
private final ZoneId utilDateZoneId;
private final ZoneId sqlDateZoneId;
private final ZoneId sqlTimeZoneId;
private final ZoneId sqlTimestampZoneId;
/**
* Create an adapter for temporal values defined in terms of the given zone.
*
* @param targetZoneId the zone in which the output temporal values are defined; may not be null
* @param utilDateZoneId the zone in which {@link java.util.Date} values are defined; may not be null
* @param sqlDateZoneId the zone in which {@link java.sql.Date} values are defined; may not be null
* @param sqlTimeZoneId the zone in which {@link java.sql.Time} values are defined; may not be null
* @param sqlTimestampZoneId the zone in which {@link java.sql.Timestamp} values are defined; may not be null
*/
protected TimeZoneAdapter(ZoneId targetZoneId, ZoneId utilDateZoneId, ZoneId sqlDateZoneId, ZoneId sqlTimeZoneId,
ZoneId sqlTimestampZoneId) {
this.targetZoneId = targetZoneId;
this.utilDateZoneId = utilDateZoneId;
this.sqlDateZoneId = sqlDateZoneId;
this.sqlTimeZoneId = sqlTimeZoneId;
this.sqlTimestampZoneId = sqlTimestampZoneId;
}
protected ZoneId targetZoneId() {
return targetZoneId;
}
/**
* Convert the specified database {@link java.util.Date}, {@link java.sql.Date}, {@link java.sql.Time}, or
* {@link java.sql.Timestamp} objects to a date and time in the same time zone in which the database created the
* value. If only {@link java.sql.Time time} information is provided in the input value, the date information will
* be set to the first day of the epoch. If only {@link java.sql.Date date} information is provided in the input
* value, the time information will be at midnight on the specified day.
*
* @param dbDate the database-generated value; may not be null
* @return the date time in the same zone used by the database; never null
*/
public ZonedDateTime toZonedDateTime(java.util.Date dbDate) {
if (dbDate instanceof java.sql.Date) {
return toZonedDateTime((java.sql.Date) dbDate);
}
if (dbDate instanceof java.sql.Time) {
return toZonedDateTime((java.sql.Time) dbDate);
}
if (dbDate instanceof java.sql.Timestamp) {
return toZonedDateTime((java.sql.Timestamp) dbDate);
}
return dbDate.toInstant().atZone(UTC) // milliseconds is in terms of UTC
.withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created
.withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
}
/**
* Convert the specified database {@link java.sql.Date} to a date (at midnight) in the same time zone in which the
* database created the value.
*
* @param dbDate the database-generated value; may not be null
* @return the date (at midnight) in the same zone used by the database; never null
*/
public ZonedDateTime toZonedDateTime(java.sql.Date dbDate) {
long millis = dbDate.getTime();
Instant instant = Instant.ofEpochMilli(millis).truncatedTo(ChronoUnit.DAYS);
return instant.atZone(sqlDateZoneId).withZoneSameInstant(targetZoneId);
}
/**
* Convert the specified database {@link java.sql.Time} to a time (on the first epoch day) in the same time zone in which
* the database created the value.
*
* @param dbTime the database-generated value; may not be null
* @return the time (on the first epoch day) in the same zone used by the database; never null
*/
public ZonedDateTime toZonedDateTime(java.sql.Time dbTime) {
long millis = dbTime.getTime();
LocalTime local = LocalTime.ofNanoOfDay(millis * 1000 * 1000);
return ZonedDateTime.of(EPOCH, local, UTC) // milliseconds is in terms of UTC
.withZoneSameInstant(sqlTimeZoneId) // correct value in the zone where it was created
.withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
}
/**
* Convert the specified database {@link java.sql.Timestamp} to a timestamp in the same time zone in which
* the database created the value.
*
* @param dbTimestamp the database-generated value; may not be null
* @return the timestamp in the same zone used by the database; never null
*/
public ZonedDateTime toZonedDateTime(java.sql.Timestamp dbTimestamp) {
return dbTimestamp.toInstant().atZone(UTC) // milliseconds is in terms of UTC
.withZoneSameInstant(sqlTimestampZoneId) // correct value in the zone where it was created
.withZoneSameLocal(targetZoneId); // use same value, but in our desired timezone
}
/**
* Create a new adapter that produces temporal values in the specified time zone.
*
* @param zoneId the zone in which all temporal values are to be defined; may not be null
* @return the new adapter
*/
public TimeZoneAdapter withTargetZone(ZoneId zoneId) {
if (targetZoneId.equals(zoneId)) return this;
return new TimeZoneAdapter(zoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId);
}
/**
* Create a new adapter for a database that uses the specified zone for all temporal values and this adapter's target zone.
*
* @param zoneId the zone in which all temporal values are created by the database; may not be null
* @return the new adapter
*/
public TimeZoneAdapter withZoneForAll(ZoneId zoneId) {
return new TimeZoneAdapter(targetZoneId, zoneId, zoneId, zoneId, zoneId);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
* zone for {@link java.util.Date} temporal values.
*
* @param zoneId the zone in which all {@link java.util.Date} values are created by the database; may not be null
* @return the new adapter; never null
*/
public TimeZoneAdapter withZoneForUtilDate(ZoneId zoneId) {
if (utilDateZoneId.equals(zoneId)) return this;
return new TimeZoneAdapter(targetZoneId, zoneId, sqlDateZoneId, sqlTimeZoneId, sqlTimestampZoneId);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
* zone for {@link java.sql.Date} temporal values.
*
* @param zoneId the zone in which all {@link java.sql.Date} values are created by the database; may not be null
* @return the new adapter; never null
*/
public TimeZoneAdapter withZoneForSqlDate(ZoneId zoneId) {
if (sqlDateZoneId.equals(zoneId)) return this;
return new TimeZoneAdapter(targetZoneId, utilDateZoneId, zoneId, sqlTimeZoneId, sqlTimestampZoneId);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
* zone for {@link java.sql.Time} temporal values.
*
* @param zoneId the zone in which all {@link java.sql.Time} values are created by the database; may not be null
* @return the new adapter; never null
*/
public TimeZoneAdapter withZoneForSqlTime(ZoneId zoneId) {
if (sqlTimeZoneId.equals(zoneId)) return this;
return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, zoneId, sqlTimestampZoneId);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the specified
* zone for {@link java.sql.Timestamp} temporal values.
*
* @param zoneId the zone in which all {@link java.sql.Timestamp} values are created by the database; may not be null
* @return the new adapter; never null
*/
public TimeZoneAdapter withZoneForSqlTimestamp(ZoneId zoneId) {
if (sqlTimestampZoneId.equals(zoneId)) return this;
return new TimeZoneAdapter(targetZoneId, utilDateZoneId, sqlDateZoneId, sqlTimeZoneId, zoneId);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for the target.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withUtcTargetZone() {
return withTargetZone(UTC);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.util.Date} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withUtcZoneForUtilDate() {
return withZoneForUtilDate(UTC);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Date} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withUtcZoneForSqlDate() {
return withZoneForSqlDate(UTC);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Time} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withUtcZoneForSqlTime() {
return withZoneForSqlTime(UTC);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Timestamp} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withUtcZoneForSqlTimestamp() {
return withZoneForSqlTimestamp(UTC);
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for the target.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withLocalTargetZone() {
return withTargetZone(ZoneId.systemDefault());
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.util.Date} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withLocalZoneForUtilDate() {
return withZoneForUtilDate(ZoneId.systemDefault());
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Date} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withLocalZoneForSqlDate() {
return withZoneForSqlDate(ZoneId.systemDefault());
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Time} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withLocalZoneForSqlTime() {
return withZoneForSqlTime(ZoneId.systemDefault());
}
/**
* Create a new adapter for a database that uses the same time zones as this adapter except it uses the UTC
* zone for {@link java.sql.Timestamp} temporal values.
*
* @return the new adapter; never null
*/
public TimeZoneAdapter withLocalZoneForSqlTimestamp() {
return withZoneForSqlTimestamp(ZoneId.systemDefault());
}
}

View File

@ -10,11 +10,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@ -43,6 +43,7 @@
import io.debezium.data.IsoTimestamp;
import io.debezium.data.SchemaUtil;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TimeZoneAdapter;
import io.debezium.relational.mapping.ColumnMapper;
import io.debezium.relational.mapping.ColumnMappers;
@ -77,14 +78,26 @@ public class TableSchemaBuilder {
private static final LocalDate EPOCH_DAY = LocalDate.ofEpochDay(0);
private final Function<String, String> schemaNameValidator;
private final TimeZoneAdapter timeZoneAdapter;
/**
* Create a new instance of the builder.
* Create a new instance of the builder that uses the {@link TimeZoneAdapter#create() default time zone adapter}.
*
* @param schemaNameValidator the validation function for schema names; may not be null
*/
public TableSchemaBuilder(Function<String, String> schemaNameValidator) {
this(TimeZoneAdapter.create(),schemaNameValidator);
}
/**
* Create a new instance of the builder.
*
* @param timeZoneAdapter the adapter for temporal objects created by the source database; may not be null
* @param schemaNameValidator the validation function for schema names; may not be null
*/
public TableSchemaBuilder(TimeZoneAdapter timeZoneAdapter, Function<String, String> schemaNameValidator) {
this.schemaNameValidator = schemaNameValidator;
this.timeZoneAdapter = timeZoneAdapter;
}
/**
@ -612,6 +625,7 @@ protected Object handleUnknownData(Column column, Field fieldDefn, Object data)
protected Object convertTimestampWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetDateTime dateTime = null;
LoggerFactory.getLogger(getClass()).info("TimestampWithZone: " + data + " , class=" + data.getClass());
if (data instanceof OffsetDateTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
dateTime = (OffsetDateTime) data;
@ -673,6 +687,7 @@ protected OffsetDateTime unexpectedTimestampWithZone(Object value, Field fieldDe
protected Object convertTimeWithZone(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
OffsetTime time = null;
LoggerFactory.getLogger(getClass()).info("TimeWithZone: " + data + " , class=" + data.getClass());
if (data instanceof OffsetTime) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
time = (OffsetTime) data;
@ -727,15 +742,10 @@ protected OffsetTime unexpectedTimeWithZone(Object value, Field fieldDefn) {
protected Object convertTimestamp(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Timestamp) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
date = (java.util.Date) data;
} else if (data instanceof java.sql.Date) {
// This should still work, even though it should have just date info
date = (java.util.Date) data;
} else if (data instanceof java.util.Date) {
// Possible that some implementations might use this.
date = (java.util.Date) data;
LoggerFactory.getLogger(getClass()).info("Timestamp: " + data + " , class=" + data.getClass());
if (data instanceof java.util.Date) {
ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalDate) {
// If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalDate local = (java.time.LocalDate) data;
@ -780,16 +790,10 @@ protected java.util.Date unexpectedTimestamp(Object value, Field fieldDefn) {
protected Object convertTime(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Time) {
// JDBC specification indicates that this will be the canonical object for this JDBC type.
// Contains only time info, with the date set to the epoch day ...
date = (java.sql.Date) data;
} else if (data instanceof java.util.Date) {
// Possible that some implementations might use this. We ignore any date info by converting to an
// instant and changing the date to the epoch date, and finally creating a new java.util.Date ...
date = (java.util.Date) data;
Instant instant = Instant.ofEpochMilli(date.getTime()).with(ChronoField.EPOCH_DAY, 0);
date = new java.util.Date(instant.toEpochMilli());
LoggerFactory.getLogger(getClass()).info("Time: " + data + " , class=" + data.getClass());
if (data instanceof java.util.Date) {
ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalTime) {
// If we get a local time (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalTime local = (java.time.LocalTime) data;
@ -834,21 +838,10 @@ protected java.util.Date unexpectedTime(Object value, Field fieldDefn) {
protected Object convertDate(Column column, Field fieldDefn, Object data) {
if (data == null) return null;
java.util.Date date = null;
if (data instanceof java.sql.Date) {
// JDBC specification indicates that this will be the nominal object for this JDBC type.
// Contains only date info, with all time values set to all zeros (e.g. midnight).
// However, the java.sql.Date object *may* contain timezone information for some DBMS+Driver combinations.
// Therefore, first convert it to a local LocalDate, then to a LocalDateTime at midnight, and then to an
// instant in UTC ...
java.sql.Date sqlDate = (java.sql.Date) data;
LocalDate localDate = sqlDate.toLocalDate();
date = java.util.Date.from(localDate.atStartOfDay().toInstant(ZoneOffset.UTC));
} else if (data instanceof java.util.Date) {
// Possible that some implementations might use this. We should be prepared to ignore any time,
// information by truncating to days and creating a new java.util.Date ...
date = (java.util.Date) data;
Instant instant = Instant.ofEpochMilli(date.getTime()).truncatedTo(ChronoUnit.DAYS);
date = new java.util.Date(instant.toEpochMilli());
LoggerFactory.getLogger(getClass()).info("Date: " + data + " , class=" + data.getClass());
if (data instanceof java.util.Date) {
ZonedDateTime zdt = timeZoneAdapter.toZonedDateTime((java.util.Date)data);
date = java.util.Date.from(zdt.toInstant());
} else if (data instanceof java.time.LocalDate) {
// If we get a local date (no TZ info), we need to just convert to a util.Date (no TZ info) ...
java.time.LocalDate local = (java.time.LocalDate) data;

View File

@ -0,0 +1,208 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.jdbc;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.Month;
import java.time.ZonedDateTime;
import java.util.Calendar;
import org.junit.Before;
import org.junit.Test;
import static org.fest.assertions.Assertions.assertThat;
/**
* @author Randall Hauch
*
*/
public class TimeZoneAdapterTest {
private TimeZoneAdapter adapter;
@Before
public void beforeEach() {
adapter = TimeZoneAdapter.create()
.withLocalZoneForUtilDate()
.withLocalZoneForSqlDate()
.withLocalZoneForSqlTime()
.withLocalZoneForSqlTimestamp()
.withUtcTargetZone();
}
@Test
public void shouldAdaptSqlDate() {
// '2014-09-08', '17:51:04.777', '2014-09-08 17:51:04.777', '2014-09-08 17:51:04.777'
java.sql.Date sqlDate = createSqlDate(2014, Month.SEPTEMBER, 8);
ZonedDateTime zdt = adapter.toZonedDateTime(sqlDate);
// The date should match ...
LocalDate date = zdt.toLocalDate();
assertThat(date.getYear()).isEqualTo(2014);
assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(date.getDayOfMonth()).isEqualTo(8);
// There should be no time component ...
LocalTime time = zdt.toLocalTime();
assertThat(time.getHour()).isEqualTo(0);
assertThat(time.getMinute()).isEqualTo(0);
assertThat(time.getSecond()).isEqualTo(0);
assertThat(time.getNano()).isEqualTo(0);
// The zone should be our target ...
assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId());
}
@Test
public void shouldAdaptSqlTime() {
// '17:51:04.777'
java.sql.Time sqlTime = createSqlTime(17, 51, 04, 777);
ZonedDateTime zdt = adapter.toZonedDateTime(sqlTime);
// The date should be at epoch ...
LocalDate date = zdt.toLocalDate();
assertThat(date.getYear()).isEqualTo(1970);
assertThat(date.getMonth()).isEqualTo(Month.JANUARY);
assertThat(date.getDayOfMonth()).isEqualTo(1);
// The time should match exactly ...
LocalTime time = zdt.toLocalTime();
assertThat(time.getHour()).isEqualTo(17);
assertThat(time.getMinute()).isEqualTo(51);
assertThat(time.getSecond()).isEqualTo(4);
assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000);
// The zone should be our target ...
assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId());
}
@Test
public void shouldAdaptSqlTimestamp() {
adapter = TimeZoneAdapter.create()
.withLocalZoneForSqlTimestamp()
.withUtcTargetZone();
// '2014-09-08 17:51:04.777'
// This technique creates the timestamp using the milliseconds from epoch in terms of the local zone ...
java.sql.Timestamp sqlTimestamp = createSqlTimestamp(2014, Month.SEPTEMBER, 8, 17, 51, 04, 777);
ZonedDateTime zdt = adapter.toZonedDateTime(sqlTimestamp);
// The date should match ...
LocalDate date = zdt.toLocalDate();
assertThat(date.getYear()).isEqualTo(2014);
assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(date.getDayOfMonth()).isEqualTo(8);
// The time should match exactly ...
LocalTime time = zdt.toLocalTime();
assertThat(time.getHour()).isEqualTo(17);
assertThat(time.getMinute()).isEqualTo(51);
assertThat(time.getSecond()).isEqualTo(4);
assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000);
// The zone should be our target ...
assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId());
}
@Test
public void shouldAdaptSqlTimestampViaSecondsAndMillis() {
adapter = TimeZoneAdapter.create()
.withUtcZoneForSqlTimestamp()
.withUtcTargetZone();
// '2014-09-08 17:51:04.777'
// This technique creates the timestamp using the milliseconds from epoch in terms of UTC ...
java.sql.Timestamp sqlTimestamp = createSqlTimestamp(1410198664L, 777);
ZonedDateTime zdt = adapter.toZonedDateTime(sqlTimestamp);
// The date should match ...
LocalDate date = zdt.toLocalDate();
assertThat(date.getYear()).isEqualTo(2014);
assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(date.getDayOfMonth()).isEqualTo(8);
// The time should match exactly ...
LocalTime time = zdt.toLocalTime();
assertThat(time.getHour()).isEqualTo(17);
assertThat(time.getMinute()).isEqualTo(51);
assertThat(time.getSecond()).isEqualTo(4);
assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000);
// The zone should be our target ...
assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId());
}
@Test
public void shouldAdaptUtilDate() {
// '2014-09-08 17:51:04.777'
java.util.Date utilDate = createUtilDate(2014, Month.SEPTEMBER, 8, 17, 51, 04, 777);
ZonedDateTime zdt = adapter.toZonedDateTime(utilDate);
// The date should match ...
LocalDate date = zdt.toLocalDate();
assertThat(date.getYear()).isEqualTo(2014);
assertThat(date.getMonth()).isEqualTo(Month.SEPTEMBER);
assertThat(date.getDayOfMonth()).isEqualTo(8);
// The time should match exactly ...
LocalTime time = zdt.toLocalTime();
assertThat(time.getHour()).isEqualTo(17);
assertThat(time.getMinute()).isEqualTo(51);
assertThat(time.getSecond()).isEqualTo(4);
assertThat(time.getNano()).isEqualTo(777 * 1000 * 1000);
// The zone should be our target ...
assertThat(zdt.getZone()).isEqualTo(adapter.targetZoneId());
}
protected java.sql.Date createSqlDate(int year, Month month, int dayOfMonth) {
Calendar cal = Calendar.getInstance();
cal.clear();
cal.set(Calendar.YEAR, year);
cal.set(Calendar.MONTH, month.getValue() - 1);
cal.set(Calendar.DATE, dayOfMonth);
return new java.sql.Date(cal.getTimeInMillis());
}
protected java.sql.Time createSqlTime(int hourOfDay, int minute, int second, int milliseconds) {
Calendar c = Calendar.getInstance();
c.clear();
c.set(Calendar.HOUR_OF_DAY, hourOfDay);
c.set(Calendar.MINUTE, minute);
c.set(Calendar.SECOND, second);
c.set(Calendar.MILLISECOND, milliseconds);
return new java.sql.Time(c.getTimeInMillis());
}
/**
* This sets the calendar via the milliseconds past epoch, and this behaves differently than actually setting the various
* components of the calendar (see {@link #createSqlTimestamp(int, Month, int, int, int, int, int)}). This is how the
* MySQL Binary Log client library creates timestamps (v2).
*
* @param secondsFromEpoch the number of seconds since epoch
* @param millis the number of milliseconds
* @return the SQL timestamp
*/
protected java.sql.Timestamp createSqlTimestamp(long secondsFromEpoch, int millis) {
Calendar c = Calendar.getInstance();
c.setTimeInMillis(secondsFromEpoch * 1000);
c.set(Calendar.MILLISECOND, millis);
return new java.sql.Timestamp(c.getTimeInMillis());
}
protected java.sql.Timestamp createSqlTimestamp(int year, Month month, int dayOfMonth, int hourOfDay, int minute, int second,
int milliseconds) {
Calendar c = Calendar.getInstance();
c.set(Calendar.YEAR, year);
c.set(Calendar.MONTH, month.getValue() - 1);
c.set(Calendar.DAY_OF_MONTH, dayOfMonth);
c.set(Calendar.HOUR_OF_DAY, hourOfDay);
c.set(Calendar.MINUTE, minute);
c.set(Calendar.SECOND, second);
c.set(Calendar.MILLISECOND, milliseconds);
return new java.sql.Timestamp(c.getTimeInMillis());
}
protected java.util.Date createUtilDate(int year, Month month, int dayOfMonth, int hourOfDay, int minute, int second,
int milliseconds) {
Calendar c = Calendar.getInstance();
c.set(Calendar.YEAR, year);
c.set(Calendar.MONTH, month.getValue() - 1);
c.set(Calendar.DAY_OF_MONTH, dayOfMonth);
c.set(Calendar.HOUR_OF_DAY, hourOfDay);
c.set(Calendar.MINUTE, minute);
c.set(Calendar.SECOND, second);
c.set(Calendar.MILLISECOND, milliseconds);
return c.getTime();
}
}

View File

@ -64,7 +64,7 @@
<version.postgresql.server>9.4</version.postgresql.server>
<version.mysql.server>5.7</version.mysql.server>
<version.mysql.driver>5.1.39</version.mysql.driver>
<version.mysql.binlog>0.3.1</version.mysql.binlog>
<version.mysql.binlog>0.3.2</version.mysql.binlog>
<version.mongo.server>3.2.6</version.mongo.server>
<version.mongo.driver>3.2.2</version.mongo.driver>