DBZ-1643 Test timestamp TZ
This commit is contained in:
parent
7c079ca5e5
commit
a80892f628
@ -9,11 +9,13 @@
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Instant;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TimeZone;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -221,6 +223,55 @@ public void readOnlyApplicationIntent() throws Exception {
|
||||
stopConnector();
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-1643")
|
||||
public void timestampAndTimezone() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
final int TABLES = 2;
|
||||
final int ID_START = 10;
|
||||
|
||||
final TimeZone currentTimeZone = TimeZone.getDefault();
|
||||
try {
|
||||
TimeZone.setDefault(TimeZone.getTimeZone("Atlantic/Cape_Verde"));
|
||||
final Configuration config = TestHelper.defaultConfig()
|
||||
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
|
||||
.build();
|
||||
|
||||
start(SqlServerConnector.class, config);
|
||||
assertConnectorIsRunning();
|
||||
|
||||
// Wait for snapshot completion
|
||||
consumeRecordsByTopic(1);
|
||||
|
||||
final Instant now = Instant.now();
|
||||
final Instant lowerBound = now.minusSeconds(5 * 60);
|
||||
final Instant upperBound = now.plusSeconds(5 * 60);
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final int id = ID_START + i;
|
||||
connection.execute(
|
||||
"INSERT INTO tablea VALUES(" + id + ", 'a')");
|
||||
connection.execute(
|
||||
"INSERT INTO tableb VALUES(" + id + ", 'b')");
|
||||
}
|
||||
|
||||
final SourceRecords records = consumeRecordsByTopic(RECORDS_PER_TABLE * TABLES);
|
||||
final List<SourceRecord> tableA = records.recordsForTopic("server1.dbo.tablea");
|
||||
final List<SourceRecord> tableB = records.recordsForTopic("server1.dbo.tableb");
|
||||
Assertions.assertThat(tableA).hasSize(RECORDS_PER_TABLE);
|
||||
Assertions.assertThat(tableB).hasSize(RECORDS_PER_TABLE);
|
||||
for (int i = 0; i < RECORDS_PER_TABLE; i++) {
|
||||
final SourceRecord recordA = tableA.get(i);
|
||||
final long timestamp = ((Struct) recordA.value()).getStruct("source").getInt64("ts_ms");
|
||||
final Instant instant = Instant.ofEpochMilli(timestamp);
|
||||
Assertions.assertThat(instant.isAfter(lowerBound) && instant.isBefore(upperBound)).isTrue();
|
||||
}
|
||||
stopConnector();
|
||||
}
|
||||
finally {
|
||||
TimeZone.setDefault(currentTimeZone);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void deleteWithoutTombstone() throws Exception {
|
||||
final int RECORDS_PER_TABLE = 5;
|
||||
|
Loading…
Reference in New Issue
Block a user