From 78477e7622e6aca7cd63e79de76844b1dd3e6fed Mon Sep 17 00:00:00 2001 From: ani-sha Date: Mon, 28 Aug 2023 17:46:54 +0530 Subject: [PATCH] DBZ-6567 Handle DST for test cases, use correct zone rules --- .../transforms/TimezoneConverter.java | 5 + .../transforms/TimezoneConverterTest.java | 128 +++++++++--------- 2 files changed, 69 insertions(+), 64 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java index f72d94284..5aa139259 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -254,6 +254,7 @@ private Object getTimestampWithTimezone(String schemaName, Object fieldValue) { switch (schemaName) { case ZonedTimestamp.SCHEMA_NAME: OffsetDateTime offsetDateTime = OffsetDateTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_DATE_TIME); + zoneOffset = zoneId.getRules().getOffset(offsetDateTime.toLocalDateTime()); OffsetDateTime offsetDateTimeWithZone = offsetDateTime.withOffsetSameInstant(zoneOffset); updatedFieldValue = ZonedTimestamp.toIsoString(offsetDateTimeWithZone, null); break; @@ -266,23 +267,27 @@ private Object getTimestampWithTimezone(String schemaName, Object fieldValue) { long microTimestamp = (long) fieldValue; Instant microInstant = Instant.ofEpochSecond(microTimestamp / 1_000_000, (microTimestamp % 1_000_000) * 1_000); LocalDateTime microLocalDateTime = microInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + zoneOffset = zoneId.getRules().getOffset(microLocalDateTime); updatedFieldValue = microLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000; break; case NanoTimestamp.SCHEMA_NAME: long nanoTimestamp = (long) fieldValue; Instant nanoInstant = Instant.ofEpochSecond(nanoTimestamp / 1_000_000_000, (nanoTimestamp % 1_000_000_000)); LocalDateTime nanoLocalDateTime = nanoInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + zoneOffset = zoneId.getRules().getOffset(nanoLocalDateTime); updatedFieldValue = nanoLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000_000; break; case Timestamp.SCHEMA_NAME: Instant instant = Instant.ofEpochMilli((long) fieldValue); LocalDateTime localDateTime = instant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + zoneOffset = zoneId.getRules().getOffset(localDateTime); updatedFieldValue = localDateTime.atOffset(zoneOffset).toInstant().toEpochMilli(); break; case org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME: Date date = (Date) fieldValue; Instant timestampInstant = date.toInstant(); LocalDateTime timestampLocalDateTime = timestampInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + zoneOffset = zoneId.getRules().getOffset(timestampLocalDateTime); updatedFieldValue = Date.from(timestampLocalDateTime.atOffset(zoneOffset).toInstant()); break; } diff --git a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java index 643b74f4f..f88e0db6d 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java @@ -62,8 +62,8 @@ public void testMultipleDebeziumTimestamps() { before.put("order_date_micros", 1529507596945104L); before.put("order_date_nanos", 1531481025340000000L); before.put("order_date_timestamp", 1514908810123L); - before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+02:00"); - before.put("order_date_zoned_time", "11:15:30.123456789+02:00"); + before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+00:00"); + before.put("order_date_zoned_time", "11:15:30.123456789+00:00"); source.put("table", "orders"); source.put("lsn", 1); @@ -92,8 +92,8 @@ public void testMultipleDebeziumTimestamps() { assertThat(transformedAfter.get("order_date_micros")).isEqualTo(1529487796945000L); assertThat(transformedAfter.get("order_date_nanos")).isEqualTo(1531461225340000000L); assertThat(transformedAfter.get("order_date_timestamp")).isEqualTo(1514889010123L); - assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T14:45:30.123456789+05:30"); - assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("14:45:30.123456789+05:30"); + assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T16:45:30.123456789+05:30"); + assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("16:45:30.123456789+05:30"); } @Test @@ -115,9 +115,9 @@ public void testSingleDebeziumTimestamp() { before.put("id", (byte) 1); before.put("name", "Srikanth"); - before.put("created_at", "2011-01-11T16:40:30.123456789+05:30"); - before.put("updated_at", "2011-02-02T11:04:30.123456789+05:30"); - before.put("order_date", "2011-04-09T13:00:30.123456789+05:30"); + before.put("created_at", "2011-01-11T16:40:30.123456789+00:00"); + before.put("updated_at", "2011-02-02T11:04:30.123456789+00:00"); + before.put("order_date", "2011-04-09T13:00:30.123456789+00:00"); source.put("table", "orders"); source.put("lsn", 1); @@ -143,9 +143,9 @@ public void testSingleDebeziumTimestamp() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T05:10:30.123456789-06:00"); - assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-01T23:34:30.123456789-06:00"); - assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T01:30:30.123456789-06:00"); + assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T11:40:30.123456789-05:00"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-02T06:04:30.123456789-05:00"); + assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T08:00:30.123456789-05:00"); } @Test @@ -199,15 +199,15 @@ public void testKafkaConnectTimestamp() { public void testIncludeListWithTablePrefix() { final Map props = new HashMap<>(); props.put("converted.timezone", "Atlantic/Azores"); - props.put("include.list", "source:customers:order_time"); + props.put("include.list", "source:customers:order_time,customers:created_at"); converter.configure(props); Schema recordSchema = SchemaBuilder.struct() .field("id", Schema.INT8_SCHEMA) .field("name", Schema.STRING_SCHEMA) - .field("created_at", ZonedTime.builder().optional().build()) - .field("updated_at", ZonedTime.builder().optional().build()) - .field("order_time", ZonedTime.builder().optional().build()) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("updated_at", ZonedTimestamp.builder().optional().build()) + .field("order_time", ZonedTimestamp.builder().optional().build()) .build(); final Struct before = new Struct(recordSchema); @@ -215,9 +215,9 @@ public void testIncludeListWithTablePrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("updated_at", "12:10:10+02:00"); - before.put("order_time", "12:10:10+02:00"); + before.put("created_at", "2020-01-01T11:55:30+00:00"); + before.put("updated_at", "2020-05-01T11:55:30+00:00"); + before.put("order_time", "2020-06-20T11:55:30+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -243,9 +243,9 @@ public void testIncludeListWithTablePrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); - assertThat(transformedAfter.get("updated_at")).isEqualTo("12:10:10+02:00"); - assertThat(transformedAfter.get("order_time")).isEqualTo("10:10:10Z"); + assertThat(transformedAfter.get("created_at")).isEqualTo("2020-01-01T10:55:30-01:00"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("2020-05-01T11:55:30+00:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("2020-06-20T11:55:30Z"); } @Test @@ -267,8 +267,8 @@ public void testIncludeListWithTopicPrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("order_time", "12:10:10+02:00"); + before.put("created_at", "11:55:30+00:00"); + before.put("order_time", "12:10:10+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -294,8 +294,8 @@ public void testIncludeListWithTopicPrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); - assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+00:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("17:40:10+05:30"); } @Test @@ -317,8 +317,8 @@ public void testIncludeListWithNoPrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("order_time", "12:10:10+02:00"); + before.put("created_at", "11:55:30+00:00"); + before.put("order_time", "12:10:10+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -344,8 +344,8 @@ public void testIncludeListWithNoPrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); - assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("17:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+00:00"); } @@ -369,9 +369,9 @@ public void testExcludeListWithTablePrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("updated_at", "12:10:10+02:00"); - before.put("order_time", "15:40:10+05:30"); + before.put("created_at", "11:55:30+00:00"); + before.put("updated_at", "12:10:10+00:00"); + before.put("order_time", "15:40:10+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -397,9 +397,9 @@ public void testExcludeListWithTablePrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("15:25:30+05:30"); - assertThat(transformedAfter.get("updated_at")).isEqualTo("15:40:10+05:30"); - assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("17:25:30+05:30"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("17:40:10+05:30"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00"); } @Test @@ -421,8 +421,8 @@ public void testExcludeListWithTopicPrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("order_time", "15:40:10+05:30"); + before.put("created_at", "11:55:30+00:00"); + before.put("order_time", "15:40:10+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -448,8 +448,8 @@ public void testExcludeListWithTopicPrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("04:55:30-05:00"); - assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("06:55:30-05:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00"); } @Test @@ -471,8 +471,8 @@ public void testExcludeListWithNoPrefix() { before.put("id", (byte) 1); before.put("name", "John Doe"); - before.put("created_at", "11:55:30+02:00"); - before.put("order_time", "15:40:10+05:30"); + before.put("created_at", "11:55:30+00:00"); + before.put("order_time", "15:40:10+00:00"); source.put("table", "customers"); source.put("lsn", 1); @@ -498,8 +498,8 @@ public void testExcludeListWithNoPrefix() { final Struct transformedValue = (Struct) transformedRecord.value(); final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedAfter.get("created_at")).isEqualTo("17:55:30+08:00"); - assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("19:55:30+08:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+00:00"); } @Test @@ -521,8 +521,8 @@ public void testIncludeListMultipleTables() { customersBefore.put("id", (byte) 1); customersBefore.put("name", "John Doe"); - customersBefore.put("created_at", "2020-01-01T11:55:30+02:00"); - customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30"); + customersBefore.put("created_at", "2020-01-01T11:55:30+00:00"); + customersBefore.put("updated_at", "2020-01-01T15:40:10+00:00"); customersSource.put("table", "customers1"); customersSource.put("lsn", 1); @@ -545,8 +545,8 @@ public void testIncludeListMultipleTables() { final SourceRecord transformedCustomersRecord = converter.apply(customersRecord); final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T12:55:30+03:00"); - assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T13:10:10+03:00"); + assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T14:55:30+03:00"); + assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T18:40:10+03:00"); Schema ordersRecordSchema = SchemaBuilder.struct() .field("id", Schema.INT8_SCHEMA) @@ -560,8 +560,8 @@ public void testIncludeListMultipleTables() { ordersBefore.put("id", (byte) 1); ordersBefore.put("customer_id", (byte) 1); - ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00"); - ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00"); + ordersBefore.put("created_at", "2023-08-01T11:50:45+00:00"); + ordersBefore.put("order_time", "2023-09-01T11:55:30+00:00"); ordersSource.put("table", "orders1"); ordersSource.put("lsn", 1); @@ -584,8 +584,8 @@ public void testIncludeListMultipleTables() { final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord); final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value(); final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T12:50:45+03:00"); - assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T12:55:30+03:00"); + assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T14:50:45+03:00"); + assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T14:55:30+03:00"); } @Test @@ -607,8 +607,8 @@ public void testExcludeListMultipleTables() { customersBefore.put("id", (byte) 1); customersBefore.put("name", "John Doe"); - customersBefore.put("created_at", "2020-01-01T11:55:30+02:00"); - customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30"); + customersBefore.put("created_at", "2020-01-01T11:55:30+00:00"); + customersBefore.put("updated_at", "2020-01-01T15:40:10+00:00"); customersSource.put("table", "customers1"); customersSource.put("lsn", 1); @@ -631,8 +631,8 @@ public void testExcludeListMultipleTables() { final SourceRecord transformedCustomersRecord = converter.apply(customersRecord); final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+02:00"); - assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+05:30"); + assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+00:00"); + assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+00:00"); Schema ordersRecordSchema = SchemaBuilder.struct() .field("id", Schema.INT8_SCHEMA) @@ -646,8 +646,8 @@ public void testExcludeListMultipleTables() { ordersBefore.put("id", (byte) 1); ordersBefore.put("customer_id", (byte) 1); - ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00"); - ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00"); + ordersBefore.put("created_at", "2023-08-01T11:50:45+00:00"); + ordersBefore.put("order_time", "2023-09-01T11:55:30+00:00"); ordersSource.put("table", "orders1"); ordersSource.put("lsn", 1); @@ -670,8 +670,8 @@ public void testExcludeListMultipleTables() { final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord); final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value(); final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER); - assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T03:50:45-06:00"); - assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T03:55:30-06:00"); + assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T05:50:45-06:00"); + assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T05:55:30-06:00"); } @Test @@ -723,7 +723,7 @@ public void testExcludeListWithMultipleFields() { final Struct customersBefore = new Struct(customersRecordSchema); customersBefore.put("id", (byte) 1); customersBefore.put("name", "Amy Rose"); - customersBefore.put("order_time", "10:19:25+05:30"); + customersBefore.put("order_time", "10:19:25+00:00"); final Struct customersSource = new Struct(sourceSchema); customersSource.put("table", "customers"); @@ -748,7 +748,7 @@ public void testExcludeListWithMultipleFields() { final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); assertThat(transformedCustomersAfter.get("name")).isEqualTo("Amy Rose"); - assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+05:30"); + assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+00:00"); Schema inventoryRecordSchema = SchemaBuilder.struct() .field("id", Schema.INT8_SCHEMA) @@ -760,8 +760,8 @@ public void testExcludeListWithMultipleFields() { final Struct inventoryBefore = new Struct(inventoryRecordSchema); inventoryBefore.put("id", (byte) 1); inventoryBefore.put("name", "Amy Rose"); - inventoryBefore.put("shipping_time", "19:19:25+05:30"); - inventoryBefore.put("order_time", "10:19:25+05:30"); + inventoryBefore.put("shipping_time", "19:19:25+00:00"); + inventoryBefore.put("order_time", "10:19:25+00:00"); final Struct inventorySource = new Struct(sourceSchema); inventorySource.put("table", "inventory"); @@ -786,8 +786,8 @@ public void testExcludeListWithMultipleFields() { final Struct transformedInventoryValue = (Struct) transformedInventoryRecord.value(); final Struct transformedInventoryAfter = transformedInventoryValue.getStruct(Envelope.FieldName.AFTER); assertThat(transformedInventoryAfter.get("name")).isEqualTo("Amy Rose"); - assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("16:49:25+03:00"); - assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+05:30"); + assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("22:19:25+03:00"); + assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+00:00"); } }