From 22500839deb92a9796c91161d10a488111b57f8c Mon Sep 17 00:00:00 2001 From: ani-sha Date: Mon, 14 Aug 2023 20:43:45 +0530 Subject: [PATCH] DBZ-6567 SMT for handling timezone based fields --- .../transforms/TimezoneConverter.java | 497 +++++++++++ .../transforms/TimezoneConverterTest.java | 793 ++++++++++++++++++ 2 files changed, 1290 insertions(+) create mode 100644 debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java create mode 100644 debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java diff --git a/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java new file mode 100644 index 000000000..f72d94284 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/transforms/TimezoneConverter.java @@ -0,0 +1,497 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TimeZone; +import java.util.regex.Pattern; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.data.Envelope; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTime; +import io.debezium.time.ZonedTimestamp; + +/** + * + * @author Anisha Mohanty + * + */ + +public class TimezoneConverter> implements Transformation { + private static final Logger LOGGER = LoggerFactory.getLogger(TimezoneConverter.class); + + private static final Field CONVERTED_TIMEZONE = Field.create("converted.timezone") + .withDisplayName("Converted Timezone") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.HIGH) + .withValidation(Field::isRequired) + .withDescription("A string that represents the timezone to which the time-based fields should be converted." + + "The format can be geographic (e.g. America/Los_Angeles), or it can be a UTC offset in the format of +/-hh:mm, (e.g. +08:00)."); + + private static final Field INCLUDE_LIST = Field.create("include.list") + .withDisplayName("Include List") + .withType(ConfigDef.Type.LIST) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.HIGH) + .withValidation(Field::isListOfRegex) + .withDescription("A comma-separated list of rules that specify what events should be evaluated for timezone conversion, using one of the following formats: " + + "`source:`:: Matches only Debezium change events with a source information block with the specified table name. All time-based fields will be converted. " + + "`source::`:: Matches only Debezium change events with a source information with the specified table name. Only the specified field name will be converted. " + + "`topic:`:: Matches the specified topic name, converting all time-based fields. " + + "`topic::`:: Matches the specified topic name, converting only the specified field name. " + + "`:`:: Uses a heuristic matching algorithm to matches the source information block table name if the source information block exists, otherwise matches against the topic name. The conversion is applied only to to the specified field name. "); + + private static final Field EXCLUDE_LIST = Field.create("exclude.list") + .withDisplayName("Exclude List") + .withType(ConfigDef.Type.LIST) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.HIGH) + .withValidation(Field::isListOfRegex) + .withDescription("A comma-separated list of rules that specify what events should be excluded from timezone conversion, using one of the following formats: " + + "`source:`:: Matches only Debezium change events with a source information block with the specified table name. All time-based fields will be excluded. " + + "`source::`:: Matches only Debezium change events with a source information with the specified table name. Only the specified field name will be excluded. " + + "`topic:`:: Matches the specified topic name, excluding all time-based fields. " + + "`topic::`:: Matches the specified topic name, excluding only the specified field name. " + + "`:`:: Uses a heuristic matching algorithm to matches the source information block table name if the source information block exists, otherwise matches against the topic name. The conversion is applied only to to the specified field name. "); + + private SmtManager smtManager; + private String convertedTimezone; + private List includeList; + private List excludeList; + private static final String SOURCE = "source"; + private static final String TOPIC = "topic"; + private static final Pattern TIMEZONE_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}$"); + private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$"); + private final Map> topicFieldsMap = new HashMap<>(); + private final Map> tableFieldsMap = new HashMap<>(); + private final Map> noPrefixFieldsMap = new HashMap<>(); + private static final List SUPPORTED_TIMESTAMP_LOGICAL_NAMES = List.of( + MicroTimestamp.SCHEMA_NAME, + NanoTimestamp.SCHEMA_NAME, + Timestamp.SCHEMA_NAME, + ZonedTimestamp.SCHEMA_NAME, + ZonedTime.SCHEMA_NAME, + org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME); + + @Override + public ConfigDef config() { + final ConfigDef config = new ConfigDef(); + Field.group(config, null, CONVERTED_TIMEZONE, INCLUDE_LIST, EXCLUDE_LIST); + return config; + } + + @Override + public R apply(R record) { + if (record.value() == null || !smtManager.isValidEnvelope(record)) { + return record; + } + + Struct value = (Struct) record.value(); + Schema schema = value.schema(); + + String table = getTableFromSource(value); + String topic = record.topic(); + + if (includeList.isEmpty() && excludeList.isEmpty()) { + handleAllRecords(value, table, topic); + } + else if (!includeList.isEmpty()) { + handleInclude(value, table, topic); + } + else { + handleExclude(value, table, topic); + } + + Struct updatedEnvelopeValue = handleEnvelopeValue(schema, value); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + updatedEnvelopeValue, + record.timestamp(), + record.headers()); + } + + @Override + public void configure(Map configs) { + final Configuration config = Configuration.from(configs); + smtManager = new SmtManager<>(config); + smtManager.validate(config, Field.setOf(CONVERTED_TIMEZONE, INCLUDE_LIST, EXCLUDE_LIST)); + + convertedTimezone = config.getString(CONVERTED_TIMEZONE); + includeList = config.getList(INCLUDE_LIST); + excludeList = config.getList(EXCLUDE_LIST); + + validateConfiguration(); + + if (!excludeList.isEmpty()) { + collectTablesAndTopics(excludeList); + } + else if (!includeList.isEmpty()) { + collectTablesAndTopics(includeList); + } + } + + private void collectTablesAndTopics(List list) { + String commonPrefix = null; + for (String item : list) { + FieldItem parseItem = parseItem(item); + String prefix = parseItem.prefix; + String matchName = parseItem.getMatchName(); + String field = parseItem.getFieldName(); + + if (prefix != null) { + commonPrefix = prefix; + } + + if (Objects.equals(commonPrefix, TOPIC)) { + if (!topicFieldsMap.containsKey(matchName)) { + topicFieldsMap.put(matchName, new HashSet<>()); + } + topicFieldsMap.get(matchName).add(field); + } + else if (Objects.equals(commonPrefix, SOURCE)) { + if (!tableFieldsMap.containsKey(matchName)) { + tableFieldsMap.put(matchName, new HashSet<>()); + } + tableFieldsMap.get(matchName).add(field); + } + else { + if (!noPrefixFieldsMap.containsKey(matchName)) { + noPrefixFieldsMap.put(matchName, new HashSet<>()); + } + noPrefixFieldsMap.get(matchName).add(field); + } + } + } + + private void validateConfiguration() { + if (!includeList.isEmpty()) { + if (!LIST_PATTERN.matcher(includeList.toString()).matches()) { + throw new DebeziumException( + "Invalid include list format. Please specify a list of rules in the format of \"source::\", \"topic::\", \":\""); + } + } + else if (!excludeList.isEmpty()) { + if (!LIST_PATTERN.matcher(excludeList.toString()).matches()) { + throw new DebeziumException( + "Invalid exclude list format. Please specify a list of rules in the format of \"source::\", \"topic::\", \":\""); + } + } + + if (!validateTimezoneString()) { + throw new DebeziumException( + "Invalid timezone format. Please specify either a geographic timezone (e.g. America/Los_Angeles) or a UTC offset in the format of +/-hh:mm, (e.g. +08:00)."); + } + + if (!includeList.isEmpty() && !excludeList.isEmpty()) { + throw new DebeziumException("Both include and exclude lists are specified. Please specify only one."); + } + } + + private boolean validateTimezoneString() { + if (TIMEZONE_PATTERN.matcher(convertedTimezone).matches()) { + return true; + } + else if (ZoneId.getAvailableZoneIds().contains(convertedTimezone)) { + return true; + } + else { + return Arrays.asList(TimeZone.getAvailableIDs()).contains(convertedTimezone); + } + } + + @Override + public void close() { + } + + private enum Type { + ALL, + INCLUDE, + EXCLUDE + } + + private Object getTimestampWithTimezone(String schemaName, Object fieldValue) { + Object updatedFieldValue = fieldValue; + ZoneId zoneId = ZoneId.of(convertedTimezone); + ZoneOffset zoneOffset = zoneId.getRules().getOffset(Instant.now()); + switch (schemaName) { + case ZonedTimestamp.SCHEMA_NAME: + OffsetDateTime offsetDateTime = OffsetDateTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_DATE_TIME); + OffsetDateTime offsetDateTimeWithZone = offsetDateTime.withOffsetSameInstant(zoneOffset); + updatedFieldValue = ZonedTimestamp.toIsoString(offsetDateTimeWithZone, null); + break; + case ZonedTime.SCHEMA_NAME: + OffsetTime offsetTime = OffsetTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_TIME); + OffsetTime offsetTimeWithZone = offsetTime.withOffsetSameInstant(zoneOffset); + updatedFieldValue = ZonedTime.toIsoString(offsetTimeWithZone, null); + break; + case MicroTimestamp.SCHEMA_NAME: + long microTimestamp = (long) fieldValue; + Instant microInstant = Instant.ofEpochSecond(microTimestamp / 1_000_000, (microTimestamp % 1_000_000) * 1_000); + LocalDateTime microLocalDateTime = microInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + updatedFieldValue = microLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000; + break; + case NanoTimestamp.SCHEMA_NAME: + long nanoTimestamp = (long) fieldValue; + Instant nanoInstant = Instant.ofEpochSecond(nanoTimestamp / 1_000_000_000, (nanoTimestamp % 1_000_000_000)); + LocalDateTime nanoLocalDateTime = nanoInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + updatedFieldValue = nanoLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000_000; + break; + case Timestamp.SCHEMA_NAME: + Instant instant = Instant.ofEpochMilli((long) fieldValue); + LocalDateTime localDateTime = instant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + updatedFieldValue = localDateTime.atOffset(zoneOffset).toInstant().toEpochMilli(); + break; + case org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME: + Date date = (Date) fieldValue; + Instant timestampInstant = date.toInstant(); + LocalDateTime timestampLocalDateTime = timestampInstant.atOffset(ZoneOffset.UTC).toLocalDateTime(); + updatedFieldValue = Date.from(timestampLocalDateTime.atOffset(zoneOffset).toInstant()); + break; + } + return updatedFieldValue; + } + + private boolean isTimeField(String schemaName) { + return schemaName != null && SUPPORTED_TIMESTAMP_LOGICAL_NAMES.contains(schemaName); + } + + private void handleStructs(Struct value, Type type, String matchName, Set fields) { + if (type == null || matchName == null) { + return; + } + + Struct before = getStruct(value, Envelope.FieldName.BEFORE); + Struct after = getStruct(value, Envelope.FieldName.AFTER); + + if (before != null) { + handleValueForFields(before, type, fields); + } + if (after != null) { + handleValueForFields(after, type, fields); + } + } + + private void handleValueForFields(Struct value, Type type, Set fields) { + Schema schema = value.schema(); + for (org.apache.kafka.connect.data.Field field : schema.fields()) { + if ((type == Type.ALL + || type == Type.INCLUDE && fields.contains(field.name()) + || (type == Type.EXCLUDE && !fields.contains(field.name())))) { + handleValueForField(value, field); + } + } + } + + private void handleValueForField(Struct struct, org.apache.kafka.connect.data.Field field) { + String fieldName = field.name(); + Schema schema = field.schema(); + + if (isTimeField(schema.name())) { + Object newValue = getTimestampWithTimezone(schema.name(), struct.get(fieldName)); + struct.put(fieldName, newValue); + } + } + + private Struct handleEnvelopeValue(Schema schema, Struct value) { + final Struct updatedEnvelopeValue = new Struct(schema); + Struct updatedAfterValue = getStruct(value, Envelope.FieldName.AFTER); + Struct updatedBeforeValue = getStruct(value, Envelope.FieldName.BEFORE); + if (updatedAfterValue != null) { + updatedEnvelopeValue.put(Envelope.FieldName.AFTER, updatedAfterValue); + } + if (updatedBeforeValue != null) { + updatedEnvelopeValue.put(Envelope.FieldName.BEFORE, updatedBeforeValue); + } + return updatedEnvelopeValue; + } + + private Struct getStruct(Struct struct, String structName) { + try { + return Requirements.requireStructOrNull(struct.getStruct(structName), ""); + } + catch (DataException ignored) { + } + return null; + } + + private String getTableFromSource(Struct value) { + try { + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + return source.getString("table"); + } + catch (DataException ignored) { + } + return null; + } + + private static class FieldItem { + private final String prefix; + private final String matchName; + private final String fieldName; + + FieldItem(String prefix, String matchName, String fieldName) { + this.prefix = prefix; + this.matchName = matchName; + this.fieldName = fieldName; + } + + public String getPrefix() { + return prefix; + } + + public String getMatchName() { + return matchName; + } + + public String getFieldName() { + return fieldName; + } + } + + private FieldItem parseItem(String item) { + String prefix = null; + String matchName = null; + String fieldName = null; + + String[] parts = item.split(":"); + if (parts.length == 1) { + // table or topic + matchName = parts[0]; + } + else if (parts.length >= 2 && parts.length <= 3) { + if (parts[0].equalsIgnoreCase(SOURCE) || parts[0].equalsIgnoreCase(TOPIC)) { + // source:table or topic:topic + // source:table1,table2 or topic:topic1,topic2 + prefix = parts[0]; + matchName = parts[1]; + if (parts.length == 3) { + // source:table:field or topic:topic:field + fieldName = parts[2]; + } + } + else { + // table:field or topic:field + matchName = parts[0]; + fieldName = parts[1]; + } + } + return new FieldItem(prefix, matchName, fieldName); + } + + private static class MatchFieldsResult { + private final String matchName; + private final Set fields; + + MatchFieldsResult(String matchName, Set fields) { + this.matchName = matchName; + this.fields = fields; + } + + public String getMatchName() { + return matchName; + } + + public Set getFields() { + return fields; + } + } + + private MatchFieldsResult handleMatchNameAndFields(String table, String topic) { + String matchName = null; + Set fields = Collections.emptySet(); + + if (topicFieldsMap.containsKey(topic)) { + matchName = topic; + fields = topicFieldsMap.get(topic); + } + else if (tableFieldsMap.containsKey(table)) { + matchName = table; + fields = tableFieldsMap.get(table); + } + else if (noPrefixFieldsMap.containsKey(topic)) { + matchName = topic; + fields = noPrefixFieldsMap.get(topic); + } + else if (noPrefixFieldsMap.containsKey(table)) { + matchName = table; + fields = noPrefixFieldsMap.get(table); + } + + return new MatchFieldsResult(matchName, fields); + } + + private void handleInclude(Struct value, String table, String topic) { + MatchFieldsResult matchFieldsResult = handleMatchNameAndFields(table, topic); + String matchName = matchFieldsResult.getMatchName(); + Set fields = matchFieldsResult.getFields(); + + if (matchName != null) { + if (!fields.contains(null)) { + handleStructs(value, Type.INCLUDE, matchName, fields); + } + else { + handleStructs(value, Type.ALL, matchName, fields); + } + } + else { + handleStructs(value, Type.ALL, table, Set.of("")); + } + } + + private void handleExclude(Struct value, String table, String topic) { + MatchFieldsResult matchFieldsResult = handleMatchNameAndFields(table, topic); + String matchName = matchFieldsResult.getMatchName(); + Set fields = matchFieldsResult.getFields(); + + if (matchName == null) { + handleStructs(value, Type.ALL, table != null ? table : topic, Set.of("")); + } + else if (!fields.contains(null)) { + handleStructs(value, Type.EXCLUDE, matchName, fields); + } + } + + private void handleAllRecords(Struct value, String table, String topic) { + if (!topicFieldsMap.containsKey(topic) && !tableFieldsMap.containsKey(table) && !noPrefixFieldsMap.containsKey(table)) { + handleStructs(value, Type.ALL, table != null ? table : topic, Set.of("")); + } + } +} diff --git a/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java new file mode 100644 index 000000000..643b74f4f --- /dev/null +++ b/debezium-core/src/test/java/io/debezium/transforms/TimezoneConverterTest.java @@ -0,0 +1,793 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.transforms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import io.debezium.DebeziumException; +import io.debezium.data.Envelope; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import io.debezium.time.ZonedTime; +import io.debezium.time.ZonedTimestamp; + +public class TimezoneConverterTest { + private final TimezoneConverter converter = new TimezoneConverter(); + protected final Schema sourceSchema = SchemaBuilder.struct() + .field("table", Schema.STRING_SCHEMA) + .field("lsn", Schema.INT32_SCHEMA) + .field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + + @Test + public void testMultipleDebeziumTimestamps() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+05:30"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("order_date_micros", MicroTimestamp.builder().optional().build()) + .field("order_date_nanos", NanoTimestamp.builder().optional().build()) + .field("order_date_timestamp", Timestamp.builder().optional().build()) + .field("order_date_zoned_timestamp", ZonedTimestamp.builder().optional().build()) + .field("order_date_zoned_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "Srikanth"); + before.put("order_date_micros", 1529507596945104L); + before.put("order_date_nanos", 1531481025340000000L); + before.put("order_date_timestamp", 1514908810123L); + before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+02:00"); + before.put("order_date_zoned_time", "11:15:30.123456789+02:00"); + + source.put("table", "orders"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("order_date_micros")).isEqualTo(1529487796945000L); + assertThat(transformedAfter.get("order_date_nanos")).isEqualTo(1531461225340000000L); + assertThat(transformedAfter.get("order_date_timestamp")).isEqualTo(1514889010123L); + assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T14:45:30.123456789+05:30"); + assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("14:45:30.123456789+05:30"); + } + + @Test + public void testSingleDebeziumTimestamp() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "Pacific/Easter"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("updated_at", ZonedTimestamp.builder().optional().build()) + .field("order_date", ZonedTimestamp.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "Srikanth"); + before.put("created_at", "2011-01-11T16:40:30.123456789+05:30"); + before.put("updated_at", "2011-02-02T11:04:30.123456789+05:30"); + before.put("order_date", "2011-04-09T13:00:30.123456789+05:30"); + + source.put("table", "orders"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T05:10:30.123456789-06:00"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-01T23:34:30.123456789-06:00"); + assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T01:30:30.123456789-06:00"); + } + + @Test + public void testKafkaConnectTimestamp() { + Map props = new HashMap<>(); + props.put("converted.timezone", "Africa/Cairo"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", org.apache.kafka.connect.data.Timestamp.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "Pierre Wright"); + before.put("created_at", Date.from(LocalDateTime.of(2018, 3, 27, 2, 0, 0).toInstant(ZoneOffset.UTC))); + + source.put("table", "orders"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isInstanceOf(Date.class); + assertThat(transformedAfter.get("created_at")).isEqualTo(Date.from(LocalDateTime.of(2018, 3, 27, 0, 0, 0).toInstant(ZoneOffset.UTC))); + } + + @Test + public void testIncludeListWithTablePrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "Atlantic/Azores"); + props.put("include.list", "source:customers:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("updated_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("updated_at", "12:10:10+02:00"); + before.put("order_time", "12:10:10+02:00"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("12:10:10+02:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("10:10:10Z"); + } + + @Test + public void testIncludeListWithTopicPrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+05:30"); + props.put("include.list", "topic:db.server1.table1:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("order_time", "12:10:10+02:00"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + } + + @Test + public void testIncludeListWithNoPrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "Asia/Kolkata"); + props.put("include.list", "db.server1.table1:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("order_time", "12:10:10+02:00"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00"); + + } + + @Test + public void testExcludeListWithTablePrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+05:30"); + props.put("exclude.list", "source:customers:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("updated_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("updated_at", "12:10:10+02:00"); + before.put("order_time", "15:40:10+05:30"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("15:25:30+05:30"); + assertThat(transformedAfter.get("updated_at")).isEqualTo("15:40:10+05:30"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + } + + @Test + public void testExcludeListWithTopicPrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "America/Chicago"); + props.put("exclude.list", "topic:db.server1.table1:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("order_time", "15:40:10+05:30"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("04:55:30-05:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + } + + @Test + public void testExcludeListWithNoPrefix() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+08:00"); + props.put("exclude.list", "db.server1.table1:order_time"); + converter.configure(props); + + Schema recordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct before = new Struct(recordSchema); + final Struct source = new Struct(sourceSchema); + + before.put("id", (byte) 1); + before.put("name", "John Doe"); + before.put("created_at", "11:55:30+02:00"); + before.put("order_time", "15:40:10+05:30"); + + source.put("table", "customers"); + source.put("lsn", 1); + source.put("ts_ms", 123456789); + + final Envelope envelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(recordSchema) + .withSource(sourceSchema) + .build(); + + final Struct payload = envelope.create(before, source, Instant.now()); + + SourceRecord record = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.table1", + envelope.schema(), + payload); + + final SourceRecord transformedRecord = converter.apply(record); + + final Struct transformedValue = (Struct) transformedRecord.value(); + final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER); + + assertThat(transformedAfter.get("created_at")).isEqualTo("17:55:30+08:00"); + assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30"); + } + + @Test + public void testIncludeListMultipleTables() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+03:00"); + props.put("include.list", "source:customers1,orders1,public1"); + converter.configure(props); + + Schema customersRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("updated_at", ZonedTimestamp.builder().optional().build()) + .build(); + + final Struct customersBefore = new Struct(customersRecordSchema); + final Struct customersSource = new Struct(sourceSchema); + + customersBefore.put("id", (byte) 1); + customersBefore.put("name", "John Doe"); + customersBefore.put("created_at", "2020-01-01T11:55:30+02:00"); + customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30"); + + customersSource.put("table", "customers1"); + customersSource.put("lsn", 1); + customersSource.put("ts_ms", 123456789); + + final Envelope customersEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(customersRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now()); + SourceRecord customersRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.customers", + customersEnvelope.schema(), + customersPayload); + + final SourceRecord transformedCustomersRecord = converter.apply(customersRecord); + final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); + final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T12:55:30+03:00"); + assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T13:10:10+03:00"); + + Schema ordersRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("customer_id", Schema.INT8_SCHEMA) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("order_time", ZonedTimestamp.builder().optional().build()) + .build(); + + final Struct ordersBefore = new Struct(ordersRecordSchema); + final Struct ordersSource = new Struct(sourceSchema); + + ordersBefore.put("id", (byte) 1); + ordersBefore.put("customer_id", (byte) 1); + ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00"); + ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00"); + + ordersSource.put("table", "orders1"); + ordersSource.put("lsn", 1); + ordersSource.put("ts_ms", 123456789); + + final Envelope ordersEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(ordersRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct ordersPayload = ordersEnvelope.create(ordersBefore, ordersSource, Instant.now()); + SourceRecord ordersRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.orders", + ordersEnvelope.schema(), + ordersPayload); + + final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord); + final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value(); + final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T12:50:45+03:00"); + assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T12:55:30+03:00"); + } + + @Test + public void testExcludeListMultipleTables() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "-06:00"); + props.put("exclude.list", "topic:db.server1.customers,db.server1.public"); + converter.configure(props); + + Schema customersRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("updated_at", ZonedTimestamp.builder().optional().build()) + .build(); + + final Struct customersBefore = new Struct(customersRecordSchema); + final Struct customersSource = new Struct(sourceSchema); + + customersBefore.put("id", (byte) 1); + customersBefore.put("name", "John Doe"); + customersBefore.put("created_at", "2020-01-01T11:55:30+02:00"); + customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30"); + + customersSource.put("table", "customers1"); + customersSource.put("lsn", 1); + customersSource.put("ts_ms", 123456789); + + final Envelope customersEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(customersRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now()); + SourceRecord customersRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.customers", + customersEnvelope.schema(), + customersPayload); + + final SourceRecord transformedCustomersRecord = converter.apply(customersRecord); + final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); + final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+02:00"); + assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+05:30"); + + Schema ordersRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("customer_id", Schema.INT8_SCHEMA) + .field("created_at", ZonedTimestamp.builder().optional().build()) + .field("order_time", ZonedTimestamp.builder().optional().build()) + .build(); + + final Struct ordersBefore = new Struct(ordersRecordSchema); + final Struct ordersSource = new Struct(sourceSchema); + + ordersBefore.put("id", (byte) 1); + ordersBefore.put("customer_id", (byte) 1); + ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00"); + ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00"); + + ordersSource.put("table", "orders1"); + ordersSource.put("lsn", 1); + ordersSource.put("ts_ms", 123456789); + + final Envelope ordersEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(ordersRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct ordersPayload = ordersEnvelope.create(ordersBefore, ordersSource, Instant.now()); + SourceRecord ordersRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.orders", + ordersEnvelope.schema(), + ordersPayload); + + final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord); + final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value(); + final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T03:50:45-06:00"); + assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T03:55:30-06:00"); + } + + @Test + public void testBothIncludeExcludeList() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "+05:30"); + props.put("include.list", "topic:db.server1.customers,db.server1.public"); + props.put("exclude.list", "topic:db.server1.customers,db.server1.public"); + + assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(DebeziumException.class); + assertThat(catchThrowable(() -> converter.configure(props))).hasMessageContaining("Both include and exclude lists are specified. Please specify only one."); + } + + @Test + public void testWithNoConfig() { + final Map props = new HashMap<>(); + props.put("include.list", "topic:db.server1.customers"); + + assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(ConfigException.class); + assertThat(catchThrowable(() -> converter.configure(props))) + .hasMessageContaining("Invalid value null for configuration converted.timezone: The 'converted.timezone' value is invalid: A value is required"); + } + + @Test + public void testWithInvalidConfig() { + final Map props = new HashMap<>(); + props.put("converted.timezone", "Asia"); + props.put("include.list", "topic:db.server1.customers"); + + assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(DebeziumException.class); + assertThat(catchThrowable(() -> converter.configure(props))) + .hasMessageContaining( + "Invalid timezone format. Please specify either a geographic timezone (e.g. America/Los_Angeles) or a UTC offset in the format of +/-hh:mm, (e.g. +08:00)."); + } + + @Test + public void testExcludeListWithMultipleFields() { + Map props = new HashMap<>(); + props.put("converted.timezone", "Europe/Moscow"); + props.put("exclude.list", "topic:db.server1.customers:order_time,db.server1.inventory:order_time"); + converter.configure(props); + + Schema customersRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct customersBefore = new Struct(customersRecordSchema); + customersBefore.put("id", (byte) 1); + customersBefore.put("name", "Amy Rose"); + customersBefore.put("order_time", "10:19:25+05:30"); + + final Struct customersSource = new Struct(sourceSchema); + customersSource.put("table", "customers"); + customersSource.put("lsn", 1); + customersSource.put("ts_ms", 123456789); + + final Envelope customersEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(customersRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now()); + SourceRecord customersRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.customers", + customersEnvelope.schema(), + customersPayload); + + final SourceRecord transformedCustomersRecord = converter.apply(customersRecord); + final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value(); + final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedCustomersAfter.get("name")).isEqualTo("Amy Rose"); + assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+05:30"); + + Schema inventoryRecordSchema = SchemaBuilder.struct() + .field("id", Schema.INT8_SCHEMA) + .field("name", Schema.STRING_SCHEMA) + .field("shipping_time", ZonedTime.builder().optional().build()) + .field("order_time", ZonedTime.builder().optional().build()) + .build(); + + final Struct inventoryBefore = new Struct(inventoryRecordSchema); + inventoryBefore.put("id", (byte) 1); + inventoryBefore.put("name", "Amy Rose"); + inventoryBefore.put("shipping_time", "19:19:25+05:30"); + inventoryBefore.put("order_time", "10:19:25+05:30"); + + final Struct inventorySource = new Struct(sourceSchema); + inventorySource.put("table", "inventory"); + inventorySource.put("lsn", 1); + inventorySource.put("ts_ms", 123456789); + + final Envelope inventoryEnvelope = Envelope.defineSchema() + .withName("dummy.Envelope") + .withRecord(inventoryRecordSchema) + .withSource(sourceSchema) + .build(); + + final Struct inventoryPayload = inventoryEnvelope.create(inventoryBefore, inventorySource, Instant.now()); + SourceRecord inventoryRecord = new SourceRecord( + new HashMap<>(), + new HashMap<>(), + "db.server1.inventory", + inventoryEnvelope.schema(), + inventoryPayload); + + final SourceRecord transformedInventoryRecord = converter.apply(inventoryRecord); + final Struct transformedInventoryValue = (Struct) transformedInventoryRecord.value(); + final Struct transformedInventoryAfter = transformedInventoryValue.getStruct(Envelope.FieldName.AFTER); + assertThat(transformedInventoryAfter.get("name")).isEqualTo("Amy Rose"); + assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("16:49:25+03:00"); + assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+05:30"); + + } +}