diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index f5edaf6e4..b99a7881e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -232,7 +232,7 @@ protected boolean isBinlogAvailable() { } }); } catch (SQLException e) { - throw new ConnectException("Unexpected error while connnecting to MySQL and looking for binary logs: " + e.getMessage()); + throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e); } // And compare with the one we're supposed to use ... @@ -257,7 +257,7 @@ protected boolean isGtidModeEnabled() { } }); } catch (SQLException e) { - throw new ConnectException("Unexpected error while connnecting to MySQL and looking at GTID mode: " + e.getMessage()); + throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e); } return !"OFF".equalsIgnoreCase(mode.get()); @@ -277,7 +277,7 @@ protected String knownGtidSet() { } }); } catch (SQLException e) { - throw new ConnectException("Unexpected error while connnecting to MySQL and looking at GTID mode: " + e.getMessage()); + throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e); } return gtidSetStr.get(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index a900d7cc6..a2afcfab6 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.mysql; +import static org.fest.assertions.Assertions.assertThat; + import java.nio.file.Path; import java.sql.SQLException; import java.time.LocalDate; @@ -12,14 +14,13 @@ import java.time.LocalTime; import java.time.Month; import java.time.ZoneId; - +import java.time.ZonedDateTime; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; import org.apache.kafka.connect.data.Struct; import org.junit.After; import org.junit.Before; import org.junit.Test; - -import static org.fest.assertions.Assertions.assertThat; - import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.data.Envelope; @@ -110,12 +111,23 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws assertThat(c2.getTime() % 1000).isEqualTo(780); assertThat(c3.getTime() % 1000).isEqualTo(780); assertThat(c4.getTime() % 1000).isEqualTo(780); - assertThat(c1.getTime()).isEqualTo(1410134400000L); - assertThat(c2.getTime()).isEqualTo(64264780L); - assertThat(c3.getTime()).isEqualTo(1410198664780L); - assertThat(c4.getTime()).isEqualTo(1410198664780L); - // None of these Dates have timezone information, so to convert to locals we have to use our local timezone ... ZoneId utc = ZoneId.of("UTC"); + ZoneId defaultTZ = ZoneId.systemDefault(); + LocalDate expectedDate = LocalDate.of(2014, 9, 8); + // the time is stored as 17:51:04.777 but rounded up to 780 due to the column configs + LocalTime expectedTime = LocalTime.of(17, 51, 4).plus(780, ChronoUnit.MILLIS); + // c1 '2014-09-08' is stored as a MySQL DATE (without any time) in the local TZ and then converted to + // a truncated UTC by the connector, so we must assert against the same thing.... + ZonedDateTime expectedC1UTC = ZonedDateTime.of(expectedDate, LocalTime.of(0, 0), defaultTZ) + .withZoneSameInstant(utc) + .truncatedTo(ChronoUnit.DAYS); + assertThat(c1.getTime()).isEqualTo(expectedC1UTC.toInstant().toEpochMilli()); + ZonedDateTime expectedC2UTC = ZonedDateTime.of(LocalDate.ofEpochDay(0), expectedTime, utc); + assertThat(c2.getTime()).isEqualTo(expectedC2UTC.toInstant().toEpochMilli()); + ZonedDateTime expectedC3UTC = ZonedDateTime.of(expectedDate, expectedTime, utc); + assertThat(c3.getTime()).isEqualTo(expectedC3UTC.toInstant().toEpochMilli()); + assertThat(c4.getTime()).isEqualTo(expectedC3UTC.toInstant().toEpochMilli()); + // None of these Dates have timezone information, so to convert to locals we have to use our local timezone ... LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate(); LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime(); LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime(); @@ -124,7 +136,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws final int expectedNanos = 780 * 1000 * 1000; assertThat(localC1.getYear()).isEqualTo(2014); assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER); - assertThat(localC1.getDayOfMonth()).isEqualTo(8); + assertThat(localC1.getDayOfMonth()).isEqualTo(expectedC1UTC.get(ChronoField.DAY_OF_MONTH)); assertThat(localC2.getHour()).isEqualTo(17); assertThat(localC2.getMinute()).isEqualTo(51); assertThat(localC2.getSecond()).isEqualTo(4); diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 817414891..7b065ae17 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -298,21 +298,11 @@ protected int consumeRecords(int numberOfRecords, Consumer recordC * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned */ protected SourceRecords consumeRecordsByTopic(int numRecords) throws InterruptedException { - return consumeRecordsByTopic(numRecords, new SourceRecords()); - } - - /** - * Try to consume and capture exactly the specified number of records from the connector. - * - * @param numRecords the number of records that should be consumed - * @param records the collector into which all consumed messages should be placed - * @return the actual number of records that were consumed - * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned - */ - protected SourceRecords consumeRecordsByTopic(int numRecords, SourceRecords records) throws InterruptedException { + SourceRecords records = new SourceRecords(); consumeRecords(numRecords, records::add); return records; } + protected class SourceRecords { private final List records = new ArrayList<>();