DBZ-6567 Handle DST for test cases, use correct zone rules

This commit is contained in:
ani-sha 2023-08-28 17:46:54 +05:30 committed by Jiri Pechanec
parent 22500839de
commit 78477e7622
2 changed files with 69 additions and 64 deletions

View File

@ -254,6 +254,7 @@ private Object getTimestampWithTimezone(String schemaName, Object fieldValue) {
switch (schemaName) {
case ZonedTimestamp.SCHEMA_NAME:
OffsetDateTime offsetDateTime = OffsetDateTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
zoneOffset = zoneId.getRules().getOffset(offsetDateTime.toLocalDateTime());
OffsetDateTime offsetDateTimeWithZone = offsetDateTime.withOffsetSameInstant(zoneOffset);
updatedFieldValue = ZonedTimestamp.toIsoString(offsetDateTimeWithZone, null);
break;
@ -266,23 +267,27 @@ private Object getTimestampWithTimezone(String schemaName, Object fieldValue) {
long microTimestamp = (long) fieldValue;
Instant microInstant = Instant.ofEpochSecond(microTimestamp / 1_000_000, (microTimestamp % 1_000_000) * 1_000);
LocalDateTime microLocalDateTime = microInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
zoneOffset = zoneId.getRules().getOffset(microLocalDateTime);
updatedFieldValue = microLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000;
break;
case NanoTimestamp.SCHEMA_NAME:
long nanoTimestamp = (long) fieldValue;
Instant nanoInstant = Instant.ofEpochSecond(nanoTimestamp / 1_000_000_000, (nanoTimestamp % 1_000_000_000));
LocalDateTime nanoLocalDateTime = nanoInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
zoneOffset = zoneId.getRules().getOffset(nanoLocalDateTime);
updatedFieldValue = nanoLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000_000;
break;
case Timestamp.SCHEMA_NAME:
Instant instant = Instant.ofEpochMilli((long) fieldValue);
LocalDateTime localDateTime = instant.atOffset(ZoneOffset.UTC).toLocalDateTime();
zoneOffset = zoneId.getRules().getOffset(localDateTime);
updatedFieldValue = localDateTime.atOffset(zoneOffset).toInstant().toEpochMilli();
break;
case org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME:
Date date = (Date) fieldValue;
Instant timestampInstant = date.toInstant();
LocalDateTime timestampLocalDateTime = timestampInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
zoneOffset = zoneId.getRules().getOffset(timestampLocalDateTime);
updatedFieldValue = Date.from(timestampLocalDateTime.atOffset(zoneOffset).toInstant());
break;
}

View File

@ -62,8 +62,8 @@ public void testMultipleDebeziumTimestamps() {
before.put("order_date_micros", 1529507596945104L);
before.put("order_date_nanos", 1531481025340000000L);
before.put("order_date_timestamp", 1514908810123L);
before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+02:00");
before.put("order_date_zoned_time", "11:15:30.123456789+02:00");
before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+00:00");
before.put("order_date_zoned_time", "11:15:30.123456789+00:00");
source.put("table", "orders");
source.put("lsn", 1);
@ -92,8 +92,8 @@ public void testMultipleDebeziumTimestamps() {
assertThat(transformedAfter.get("order_date_micros")).isEqualTo(1529487796945000L);
assertThat(transformedAfter.get("order_date_nanos")).isEqualTo(1531461225340000000L);
assertThat(transformedAfter.get("order_date_timestamp")).isEqualTo(1514889010123L);
assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T14:45:30.123456789+05:30");
assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("14:45:30.123456789+05:30");
assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T16:45:30.123456789+05:30");
assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("16:45:30.123456789+05:30");
}
@Test
@ -115,9 +115,9 @@ public void testSingleDebeziumTimestamp() {
before.put("id", (byte) 1);
before.put("name", "Srikanth");
before.put("created_at", "2011-01-11T16:40:30.123456789+05:30");
before.put("updated_at", "2011-02-02T11:04:30.123456789+05:30");
before.put("order_date", "2011-04-09T13:00:30.123456789+05:30");
before.put("created_at", "2011-01-11T16:40:30.123456789+00:00");
before.put("updated_at", "2011-02-02T11:04:30.123456789+00:00");
before.put("order_date", "2011-04-09T13:00:30.123456789+00:00");
source.put("table", "orders");
source.put("lsn", 1);
@ -143,9 +143,9 @@ public void testSingleDebeziumTimestamp() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T05:10:30.123456789-06:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-01T23:34:30.123456789-06:00");
assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T01:30:30.123456789-06:00");
assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T11:40:30.123456789-05:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-02T06:04:30.123456789-05:00");
assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T08:00:30.123456789-05:00");
}
@Test
@ -199,15 +199,15 @@ public void testKafkaConnectTimestamp() {
public void testIncludeListWithTablePrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Atlantic/Azores");
props.put("include.list", "source:customers:order_time");
props.put("include.list", "source:customers:order_time,customers:created_at");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("updated_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("updated_at", ZonedTimestamp.builder().optional().build())
.field("order_time", ZonedTimestamp.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
@ -215,9 +215,9 @@ public void testIncludeListWithTablePrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("updated_at", "12:10:10+02:00");
before.put("order_time", "12:10:10+02:00");
before.put("created_at", "2020-01-01T11:55:30+00:00");
before.put("updated_at", "2020-05-01T11:55:30+00:00");
before.put("order_time", "2020-06-20T11:55:30+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -243,9 +243,9 @@ public void testIncludeListWithTablePrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("12:10:10+02:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("10:10:10Z");
assertThat(transformedAfter.get("created_at")).isEqualTo("2020-01-01T10:55:30-01:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("2020-05-01T11:55:30+00:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("2020-06-20T11:55:30Z");
}
@Test
@ -267,8 +267,8 @@ public void testIncludeListWithTopicPrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "12:10:10+02:00");
before.put("created_at", "11:55:30+00:00");
before.put("order_time", "12:10:10+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -294,8 +294,8 @@ public void testIncludeListWithTopicPrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+00:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("17:40:10+05:30");
}
@Test
@ -317,8 +317,8 @@ public void testIncludeListWithNoPrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "12:10:10+02:00");
before.put("created_at", "11:55:30+00:00");
before.put("order_time", "12:10:10+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -344,8 +344,8 @@ public void testIncludeListWithNoPrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("17:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+00:00");
}
@ -369,9 +369,9 @@ public void testExcludeListWithTablePrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("updated_at", "12:10:10+02:00");
before.put("order_time", "15:40:10+05:30");
before.put("created_at", "11:55:30+00:00");
before.put("updated_at", "12:10:10+00:00");
before.put("order_time", "15:40:10+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -397,9 +397,9 @@ public void testExcludeListWithTablePrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("15:25:30+05:30");
assertThat(transformedAfter.get("updated_at")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("17:25:30+05:30");
assertThat(transformedAfter.get("updated_at")).isEqualTo("17:40:10+05:30");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00");
}
@Test
@ -421,8 +421,8 @@ public void testExcludeListWithTopicPrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "15:40:10+05:30");
before.put("created_at", "11:55:30+00:00");
before.put("order_time", "15:40:10+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -448,8 +448,8 @@ public void testExcludeListWithTopicPrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("04:55:30-05:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("06:55:30-05:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00");
}
@Test
@ -471,8 +471,8 @@ public void testExcludeListWithNoPrefix() {
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "15:40:10+05:30");
before.put("created_at", "11:55:30+00:00");
before.put("order_time", "15:40:10+00:00");
source.put("table", "customers");
source.put("lsn", 1);
@ -498,8 +498,8 @@ public void testExcludeListWithNoPrefix() {
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("17:55:30+08:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("19:55:30+08:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00");
}
@Test
@ -521,8 +521,8 @@ public void testIncludeListMultipleTables() {
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "John Doe");
customersBefore.put("created_at", "2020-01-01T11:55:30+02:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30");
customersBefore.put("created_at", "2020-01-01T11:55:30+00:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+00:00");
customersSource.put("table", "customers1");
customersSource.put("lsn", 1);
@ -545,8 +545,8 @@ public void testIncludeListMultipleTables() {
final SourceRecord transformedCustomersRecord = converter.apply(customersRecord);
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T12:55:30+03:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T13:10:10+03:00");
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T14:55:30+03:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T18:40:10+03:00");
Schema ordersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
@ -560,8 +560,8 @@ public void testIncludeListMultipleTables() {
ordersBefore.put("id", (byte) 1);
ordersBefore.put("customer_id", (byte) 1);
ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00");
ordersBefore.put("created_at", "2023-08-01T11:50:45+00:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+00:00");
ordersSource.put("table", "orders1");
ordersSource.put("lsn", 1);
@ -584,8 +584,8 @@ public void testIncludeListMultipleTables() {
final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord);
final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value();
final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T12:50:45+03:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T12:55:30+03:00");
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T14:50:45+03:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T14:55:30+03:00");
}
@Test
@ -607,8 +607,8 @@ public void testExcludeListMultipleTables() {
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "John Doe");
customersBefore.put("created_at", "2020-01-01T11:55:30+02:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30");
customersBefore.put("created_at", "2020-01-01T11:55:30+00:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+00:00");
customersSource.put("table", "customers1");
customersSource.put("lsn", 1);
@ -631,8 +631,8 @@ public void testExcludeListMultipleTables() {
final SourceRecord transformedCustomersRecord = converter.apply(customersRecord);
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+02:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+05:30");
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+00:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+00:00");
Schema ordersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
@ -646,8 +646,8 @@ public void testExcludeListMultipleTables() {
ordersBefore.put("id", (byte) 1);
ordersBefore.put("customer_id", (byte) 1);
ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00");
ordersBefore.put("created_at", "2023-08-01T11:50:45+00:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+00:00");
ordersSource.put("table", "orders1");
ordersSource.put("lsn", 1);
@ -670,8 +670,8 @@ public void testExcludeListMultipleTables() {
final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord);
final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value();
final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T03:50:45-06:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T03:55:30-06:00");
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T05:50:45-06:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T05:55:30-06:00");
}
@Test
@ -723,7 +723,7 @@ public void testExcludeListWithMultipleFields() {
final Struct customersBefore = new Struct(customersRecordSchema);
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "Amy Rose");
customersBefore.put("order_time", "10:19:25+05:30");
customersBefore.put("order_time", "10:19:25+00:00");
final Struct customersSource = new Struct(sourceSchema);
customersSource.put("table", "customers");
@ -748,7 +748,7 @@ public void testExcludeListWithMultipleFields() {
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("name")).isEqualTo("Amy Rose");
assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+05:30");
assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+00:00");
Schema inventoryRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
@ -760,8 +760,8 @@ public void testExcludeListWithMultipleFields() {
final Struct inventoryBefore = new Struct(inventoryRecordSchema);
inventoryBefore.put("id", (byte) 1);
inventoryBefore.put("name", "Amy Rose");
inventoryBefore.put("shipping_time", "19:19:25+05:30");
inventoryBefore.put("order_time", "10:19:25+05:30");
inventoryBefore.put("shipping_time", "19:19:25+00:00");
inventoryBefore.put("order_time", "10:19:25+00:00");
final Struct inventorySource = new Struct(sourceSchema);
inventorySource.put("table", "inventory");
@ -786,8 +786,8 @@ public void testExcludeListWithMultipleFields() {
final Struct transformedInventoryValue = (Struct) transformedInventoryRecord.value();
final Struct transformedInventoryAfter = transformedInventoryValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedInventoryAfter.get("name")).isEqualTo("Amy Rose");
assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("16:49:25+03:00");
assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+05:30");
assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("22:19:25+03:00");
assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+00:00");
}
}