Fixes some more tests around date handling in the MySQL connector
This commit is contained in:
parent
096885ec8d
commit
eaf295fbf0
@ -232,7 +232,7 @@ protected boolean isBinlogAvailable() {
|
||||
}
|
||||
});
|
||||
} catch (SQLException e) {
|
||||
throw new ConnectException("Unexpected error while connnecting to MySQL and looking for binary logs: " + e.getMessage());
|
||||
throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
|
||||
}
|
||||
|
||||
// And compare with the one we're supposed to use ...
|
||||
@ -257,7 +257,7 @@ protected boolean isGtidModeEnabled() {
|
||||
}
|
||||
});
|
||||
} catch (SQLException e) {
|
||||
throw new ConnectException("Unexpected error while connnecting to MySQL and looking at GTID mode: " + e.getMessage());
|
||||
throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
|
||||
}
|
||||
|
||||
return !"OFF".equalsIgnoreCase(mode.get());
|
||||
@ -277,7 +277,7 @@ protected String knownGtidSet() {
|
||||
}
|
||||
});
|
||||
} catch (SQLException e) {
|
||||
throw new ConnectException("Unexpected error while connnecting to MySQL and looking at GTID mode: " + e.getMessage());
|
||||
throw new ConnectException("Unexpected error while connecting to MySQL and looking at GTID mode: ", e);
|
||||
}
|
||||
|
||||
return gtidSetStr.get();
|
||||
|
@ -5,6 +5,8 @@
|
||||
*/
|
||||
package io.debezium.connector.mysql;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.time.LocalDate;
|
||||
@ -12,14 +14,13 @@
|
||||
import java.time.LocalTime;
|
||||
import java.time.Month;
|
||||
import java.time.ZoneId;
|
||||
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.temporal.ChronoField;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.fest.assertions.Assertions.assertThat;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
|
||||
import io.debezium.data.Envelope;
|
||||
@ -110,12 +111,23 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
|
||||
assertThat(c2.getTime() % 1000).isEqualTo(780);
|
||||
assertThat(c3.getTime() % 1000).isEqualTo(780);
|
||||
assertThat(c4.getTime() % 1000).isEqualTo(780);
|
||||
assertThat(c1.getTime()).isEqualTo(1410134400000L);
|
||||
assertThat(c2.getTime()).isEqualTo(64264780L);
|
||||
assertThat(c3.getTime()).isEqualTo(1410198664780L);
|
||||
assertThat(c4.getTime()).isEqualTo(1410198664780L);
|
||||
// None of these Dates have timezone information, so to convert to locals we have to use our local timezone ...
|
||||
ZoneId utc = ZoneId.of("UTC");
|
||||
ZoneId defaultTZ = ZoneId.systemDefault();
|
||||
LocalDate expectedDate = LocalDate.of(2014, 9, 8);
|
||||
// the time is stored as 17:51:04.777 but rounded up to 780 due to the column configs
|
||||
LocalTime expectedTime = LocalTime.of(17, 51, 4).plus(780, ChronoUnit.MILLIS);
|
||||
// c1 '2014-09-08' is stored as a MySQL DATE (without any time) in the local TZ and then converted to
|
||||
// a truncated UTC by the connector, so we must assert against the same thing....
|
||||
ZonedDateTime expectedC1UTC = ZonedDateTime.of(expectedDate, LocalTime.of(0, 0), defaultTZ)
|
||||
.withZoneSameInstant(utc)
|
||||
.truncatedTo(ChronoUnit.DAYS);
|
||||
assertThat(c1.getTime()).isEqualTo(expectedC1UTC.toInstant().toEpochMilli());
|
||||
ZonedDateTime expectedC2UTC = ZonedDateTime.of(LocalDate.ofEpochDay(0), expectedTime, utc);
|
||||
assertThat(c2.getTime()).isEqualTo(expectedC2UTC.toInstant().toEpochMilli());
|
||||
ZonedDateTime expectedC3UTC = ZonedDateTime.of(expectedDate, expectedTime, utc);
|
||||
assertThat(c3.getTime()).isEqualTo(expectedC3UTC.toInstant().toEpochMilli());
|
||||
assertThat(c4.getTime()).isEqualTo(expectedC3UTC.toInstant().toEpochMilli());
|
||||
// None of these Dates have timezone information, so to convert to locals we have to use our local timezone ...
|
||||
LocalDate localC1 = c1.toInstant().atZone(utc).toLocalDate();
|
||||
LocalTime localC2 = c2.toInstant().atZone(utc).toLocalTime();
|
||||
LocalDateTime localC3 = c3.toInstant().atZone(utc).toLocalDateTime();
|
||||
@ -124,7 +136,7 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
|
||||
final int expectedNanos = 780 * 1000 * 1000;
|
||||
assertThat(localC1.getYear()).isEqualTo(2014);
|
||||
assertThat(localC1.getMonth()).isEqualTo(Month.SEPTEMBER);
|
||||
assertThat(localC1.getDayOfMonth()).isEqualTo(8);
|
||||
assertThat(localC1.getDayOfMonth()).isEqualTo(expectedC1UTC.get(ChronoField.DAY_OF_MONTH));
|
||||
assertThat(localC2.getHour()).isEqualTo(17);
|
||||
assertThat(localC2.getMinute()).isEqualTo(51);
|
||||
assertThat(localC2.getSecond()).isEqualTo(4);
|
||||
|
@ -298,21 +298,11 @@ protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordC
|
||||
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
|
||||
*/
|
||||
protected SourceRecords consumeRecordsByTopic(int numRecords) throws InterruptedException {
|
||||
return consumeRecordsByTopic(numRecords, new SourceRecords());
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to consume and capture exactly the specified number of records from the connector.
|
||||
*
|
||||
* @param numRecords the number of records that should be consumed
|
||||
* @param records the collector into which all consumed messages should be placed
|
||||
* @return the actual number of records that were consumed
|
||||
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
|
||||
*/
|
||||
protected SourceRecords consumeRecordsByTopic(int numRecords, SourceRecords records) throws InterruptedException {
|
||||
SourceRecords records = new SourceRecords();
|
||||
consumeRecords(numRecords, records::add);
|
||||
return records;
|
||||
}
|
||||
|
||||
|
||||
protected class SourceRecords {
|
||||
private final List<SourceRecord> records = new ArrayList<>();
|
||||
|
Loading…
Reference in New Issue
Block a user