diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index 34b62b504..f255e86b3 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -23,6 +23,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.temporal.TemporalAdjuster; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Struct; @@ -829,6 +830,94 @@ else if (record.topic().endsWith("dbz_342_timetest")) { } } + @Test + public void shouldConsumeDatesCorrectlyWhenClientTimezonePrecedesServerTimezoneUsingSnapshot() throws SQLException, InterruptedException { + TimeZone originalTimeZone = TimeZone.getDefault(); + try { + // Set the timezone of the JVM to an offset that is earlier than the + // MySQL server's offset (GMT-11). This causes MySQL JDBC to + // shift the returned dates to one day earlier. + // See: https://bugs.mysql.com/bug.php?id=91112 + // Since v0.10, Debezium uses its own date parsing logic and is + // therefore not subject to the bug described above, but this test + // protects against future regressions. + TimeZone.setDefault(TimeZone.getTimeZone("GMT-12")); + + // Use the DB configuration to define the connector's configuration ... + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName("dbz_85_fractest")) + .with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, true) + .build(); + // Start the connector ... + start(MySqlConnector.class, config); + + // --------------------------------------------------------------------------------------------------------------- + // Consume all of the events due to startup and initialization of the database + // --------------------------------------------------------------------------------------------------------------- + // Testing.Debug.enable(); + int numDdlRecords = 6; + int numDataRecords = 1; + SourceRecords records = consumeRecordsByTopic(numDdlRecords + numDataRecords); + stopConnector(); + assertThat(records).isNotNull(); + assertThat(records.recordsForTopic(DATABASE.getServerName()).size()).isEqualTo(numDdlRecords); + assertThat(records.recordsForTopic(DATABASE.topicForTable("dbz_85_fractest")).size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(2); + + records.forEach(this::validate); + records.forEach(record -> { + Struct value = (Struct) record.value(); + if (record.topic().endsWith("dbz_85_fractest")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + + // '2014-09-08' + Integer c1 = after.getInt32("c1"); // epoch days + LocalDate c1Date = LocalDate.ofEpochDay(c1); + assertThat(c1Date.getYear()).isEqualTo(2014); + assertThat(c1Date.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(c1Date.getDayOfMonth()).isEqualTo(8); + assertThat(io.debezium.time.Date.toEpochDay(c1Date, ADJUSTER)).isEqualTo(c1); + + // '17:51:04.777' + Long c2 = after.getInt64("c2"); + Duration c2Time = Duration.ofNanos(c2 * 1_000); + assertThat(c2Time.toHours()).isEqualTo(17); + assertThat(c2Time.toMinutes()).isEqualTo(1071); + assertThat(c2Time.getSeconds()).isEqualTo(64264); + assertThat(c2Time.getNano()).isEqualTo(780000000); + assertThat(c2Time.toNanos()).isEqualTo(64264780000000L); + assertThat(c2Time).isEqualTo(Duration.ofHours(17).plusMinutes(51).plusSeconds(4).plusMillis(780)); + + // '2014-09-08 17:51:04.777' + // DATETIME is a logical date and time, it doesn't contain any TZ information; + // it is mapped to a point on the time line by interpreting the value at UTC + Long c3 = after.getInt64("c3"); // epoch millis + long c3Seconds = c3 / 1000; + long c3Millis = c3 % 1000; + LocalDateTime c3DateTime = LocalDateTime.ofEpochSecond(c3Seconds, + (int) TimeUnit.MILLISECONDS.toNanos(c3Millis), + ZoneOffset.UTC); + + assertThat(c3DateTime.getYear()).isEqualTo(2014); + assertThat(c3DateTime.getMonth()).isEqualTo(Month.SEPTEMBER); + assertThat(c3DateTime.getDayOfMonth()).isEqualTo(8); + assertThat(c3DateTime.getHour()).isEqualTo(17); + assertThat(c3DateTime.getMinute()).isEqualTo(51); + assertThat(c3DateTime.getSecond()).isEqualTo(4); + assertThat(c3DateTime.getNano()).isEqualTo((int) TimeUnit.MILLISECONDS.toNanos(780)); + assertThat(io.debezium.time.Timestamp.toEpochMillis(c3DateTime, ADJUSTER)).isEqualTo(c3); + + // '2014-09-08 17:51:04.777' + String c4 = after.getString("c4"); // timestamp + assertTimestamp(c4); + } + }); + } + finally { + TimeZone.setDefault(originalTimeZone); + } + } + @Test @FixFor("DBZ-147") public void shouldConsumeAllEventsFromDecimalTableInDatabaseUsingBinlogAndNoSnapshot() throws SQLException, InterruptedException {