DBZ-6567 SMT for handling timezone based fields

This commit is contained in:
ani-sha 2023-08-14 20:43:45 +05:30 committed by Jiri Pechanec
parent 8ae86b047e
commit 22500839de
2 changed files with 1290 additions and 0 deletions

View File

@ -0,0 +1,497 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
/**
*
* @author Anisha Mohanty
*
*/
public class TimezoneConverter<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(TimezoneConverter.class);
private static final Field CONVERTED_TIMEZONE = Field.create("converted.timezone")
.withDisplayName("Converted Timezone")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isRequired)
.withDescription("A string that represents the timezone to which the time-based fields should be converted."
+ "The format can be geographic (e.g. America/Los_Angeles), or it can be a UTC offset in the format of +/-hh:mm, (e.g. +08:00).");
private static final Field INCLUDE_LIST = Field.create("include.list")
.withDisplayName("Include List")
.withType(ConfigDef.Type.LIST)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isListOfRegex)
.withDescription("A comma-separated list of rules that specify what events should be evaluated for timezone conversion, using one of the following formats: "
+ "`source:<tablename>`:: Matches only Debezium change events with a source information block with the specified table name. All time-based fields will be converted. "
+ "`source:<tablename>:<fieldname>`:: Matches only Debezium change events with a source information with the specified table name. Only the specified field name will be converted. "
+ "`topic:<topicname>`:: Matches the specified topic name, converting all time-based fields. "
+ "`topic:<topicname>:<fieldname>`:: Matches the specified topic name, converting only the specified field name. "
+ "`<matchname>:<fieldname>`:: Uses a heuristic matching algorithm to matches the source information block table name if the source information block exists, otherwise matches against the topic name. The conversion is applied only to to the specified field name. ");
private static final Field EXCLUDE_LIST = Field.create("exclude.list")
.withDisplayName("Exclude List")
.withType(ConfigDef.Type.LIST)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withValidation(Field::isListOfRegex)
.withDescription("A comma-separated list of rules that specify what events should be excluded from timezone conversion, using one of the following formats: "
+ "`source:<tablename>`:: Matches only Debezium change events with a source information block with the specified table name. All time-based fields will be excluded. "
+ "`source:<tablename>:<fieldnames>`:: Matches only Debezium change events with a source information with the specified table name. Only the specified field name will be excluded. "
+ "`topic:<topicname>`:: Matches the specified topic name, excluding all time-based fields. "
+ "`topic:<topicname>:<fieldnames>`:: Matches the specified topic name, excluding only the specified field name. "
+ "`<matchname>:<fieldnames>`:: Uses a heuristic matching algorithm to matches the source information block table name if the source information block exists, otherwise matches against the topic name. The conversion is applied only to to the specified field name. ");
private SmtManager<R> smtManager;
private String convertedTimezone;
private List<String> includeList;
private List<String> excludeList;
private static final String SOURCE = "source";
private static final String TOPIC = "topic";
private static final Pattern TIMEZONE_PATTERN = Pattern.compile("^[+-]\\d{2}:\\d{2}$");
private static final Pattern LIST_PATTERN = Pattern.compile("^\\[(source|topic|[\".\\w\\s_]+):([\".\\w\\s_]+(?::[\".\\w\\s_]+)?(?:,|]$))+$");
private final Map<String, Set<String>> topicFieldsMap = new HashMap<>();
private final Map<String, Set<String>> tableFieldsMap = new HashMap<>();
private final Map<String, Set<String>> noPrefixFieldsMap = new HashMap<>();
private static final List<String> SUPPORTED_TIMESTAMP_LOGICAL_NAMES = List.of(
MicroTimestamp.SCHEMA_NAME,
NanoTimestamp.SCHEMA_NAME,
Timestamp.SCHEMA_NAME,
ZonedTimestamp.SCHEMA_NAME,
ZonedTime.SCHEMA_NAME,
org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME);
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, CONVERTED_TIMEZONE, INCLUDE_LIST, EXCLUDE_LIST);
return config;
}
@Override
public R apply(R record) {
if (record.value() == null || !smtManager.isValidEnvelope(record)) {
return record;
}
Struct value = (Struct) record.value();
Schema schema = value.schema();
String table = getTableFromSource(value);
String topic = record.topic();
if (includeList.isEmpty() && excludeList.isEmpty()) {
handleAllRecords(value, table, topic);
}
else if (!includeList.isEmpty()) {
handleInclude(value, table, topic);
}
else {
handleExclude(value, table, topic);
}
Struct updatedEnvelopeValue = handleEnvelopeValue(schema, value);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
record.valueSchema(),
updatedEnvelopeValue,
record.timestamp(),
record.headers());
}
@Override
public void configure(Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
smtManager = new SmtManager<>(config);
smtManager.validate(config, Field.setOf(CONVERTED_TIMEZONE, INCLUDE_LIST, EXCLUDE_LIST));
convertedTimezone = config.getString(CONVERTED_TIMEZONE);
includeList = config.getList(INCLUDE_LIST);
excludeList = config.getList(EXCLUDE_LIST);
validateConfiguration();
if (!excludeList.isEmpty()) {
collectTablesAndTopics(excludeList);
}
else if (!includeList.isEmpty()) {
collectTablesAndTopics(includeList);
}
}
private void collectTablesAndTopics(List<String> list) {
String commonPrefix = null;
for (String item : list) {
FieldItem parseItem = parseItem(item);
String prefix = parseItem.prefix;
String matchName = parseItem.getMatchName();
String field = parseItem.getFieldName();
if (prefix != null) {
commonPrefix = prefix;
}
if (Objects.equals(commonPrefix, TOPIC)) {
if (!topicFieldsMap.containsKey(matchName)) {
topicFieldsMap.put(matchName, new HashSet<>());
}
topicFieldsMap.get(matchName).add(field);
}
else if (Objects.equals(commonPrefix, SOURCE)) {
if (!tableFieldsMap.containsKey(matchName)) {
tableFieldsMap.put(matchName, new HashSet<>());
}
tableFieldsMap.get(matchName).add(field);
}
else {
if (!noPrefixFieldsMap.containsKey(matchName)) {
noPrefixFieldsMap.put(matchName, new HashSet<>());
}
noPrefixFieldsMap.get(matchName).add(field);
}
}
}
private void validateConfiguration() {
if (!includeList.isEmpty()) {
if (!LIST_PATTERN.matcher(includeList.toString()).matches()) {
throw new DebeziumException(
"Invalid include list format. Please specify a list of rules in the format of \"source:<tablename>:<fieldnames>\", \"topic:<topicname>:<fieldnames>\", \"<matchname>:<fieldnames>\"");
}
}
else if (!excludeList.isEmpty()) {
if (!LIST_PATTERN.matcher(excludeList.toString()).matches()) {
throw new DebeziumException(
"Invalid exclude list format. Please specify a list of rules in the format of \"source:<tablename>:<fieldnames>\", \"topic:<topicname>:<fieldnames>\", \"<matchname>:<fieldnames>\"");
}
}
if (!validateTimezoneString()) {
throw new DebeziumException(
"Invalid timezone format. Please specify either a geographic timezone (e.g. America/Los_Angeles) or a UTC offset in the format of +/-hh:mm, (e.g. +08:00).");
}
if (!includeList.isEmpty() && !excludeList.isEmpty()) {
throw new DebeziumException("Both include and exclude lists are specified. Please specify only one.");
}
}
private boolean validateTimezoneString() {
if (TIMEZONE_PATTERN.matcher(convertedTimezone).matches()) {
return true;
}
else if (ZoneId.getAvailableZoneIds().contains(convertedTimezone)) {
return true;
}
else {
return Arrays.asList(TimeZone.getAvailableIDs()).contains(convertedTimezone);
}
}
@Override
public void close() {
}
private enum Type {
ALL,
INCLUDE,
EXCLUDE
}
private Object getTimestampWithTimezone(String schemaName, Object fieldValue) {
Object updatedFieldValue = fieldValue;
ZoneId zoneId = ZoneId.of(convertedTimezone);
ZoneOffset zoneOffset = zoneId.getRules().getOffset(Instant.now());
switch (schemaName) {
case ZonedTimestamp.SCHEMA_NAME:
OffsetDateTime offsetDateTime = OffsetDateTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
OffsetDateTime offsetDateTimeWithZone = offsetDateTime.withOffsetSameInstant(zoneOffset);
updatedFieldValue = ZonedTimestamp.toIsoString(offsetDateTimeWithZone, null);
break;
case ZonedTime.SCHEMA_NAME:
OffsetTime offsetTime = OffsetTime.parse((String) fieldValue, DateTimeFormatter.ISO_OFFSET_TIME);
OffsetTime offsetTimeWithZone = offsetTime.withOffsetSameInstant(zoneOffset);
updatedFieldValue = ZonedTime.toIsoString(offsetTimeWithZone, null);
break;
case MicroTimestamp.SCHEMA_NAME:
long microTimestamp = (long) fieldValue;
Instant microInstant = Instant.ofEpochSecond(microTimestamp / 1_000_000, (microTimestamp % 1_000_000) * 1_000);
LocalDateTime microLocalDateTime = microInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
updatedFieldValue = microLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000;
break;
case NanoTimestamp.SCHEMA_NAME:
long nanoTimestamp = (long) fieldValue;
Instant nanoInstant = Instant.ofEpochSecond(nanoTimestamp / 1_000_000_000, (nanoTimestamp % 1_000_000_000));
LocalDateTime nanoLocalDateTime = nanoInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
updatedFieldValue = nanoLocalDateTime.atOffset(zoneOffset).toInstant().toEpochMilli() * 1_000_000;
break;
case Timestamp.SCHEMA_NAME:
Instant instant = Instant.ofEpochMilli((long) fieldValue);
LocalDateTime localDateTime = instant.atOffset(ZoneOffset.UTC).toLocalDateTime();
updatedFieldValue = localDateTime.atOffset(zoneOffset).toInstant().toEpochMilli();
break;
case org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME:
Date date = (Date) fieldValue;
Instant timestampInstant = date.toInstant();
LocalDateTime timestampLocalDateTime = timestampInstant.atOffset(ZoneOffset.UTC).toLocalDateTime();
updatedFieldValue = Date.from(timestampLocalDateTime.atOffset(zoneOffset).toInstant());
break;
}
return updatedFieldValue;
}
private boolean isTimeField(String schemaName) {
return schemaName != null && SUPPORTED_TIMESTAMP_LOGICAL_NAMES.contains(schemaName);
}
private void handleStructs(Struct value, Type type, String matchName, Set<String> fields) {
if (type == null || matchName == null) {
return;
}
Struct before = getStruct(value, Envelope.FieldName.BEFORE);
Struct after = getStruct(value, Envelope.FieldName.AFTER);
if (before != null) {
handleValueForFields(before, type, fields);
}
if (after != null) {
handleValueForFields(after, type, fields);
}
}
private void handleValueForFields(Struct value, Type type, Set<String> fields) {
Schema schema = value.schema();
for (org.apache.kafka.connect.data.Field field : schema.fields()) {
if ((type == Type.ALL
|| type == Type.INCLUDE && fields.contains(field.name())
|| (type == Type.EXCLUDE && !fields.contains(field.name())))) {
handleValueForField(value, field);
}
}
}
private void handleValueForField(Struct struct, org.apache.kafka.connect.data.Field field) {
String fieldName = field.name();
Schema schema = field.schema();
if (isTimeField(schema.name())) {
Object newValue = getTimestampWithTimezone(schema.name(), struct.get(fieldName));
struct.put(fieldName, newValue);
}
}
private Struct handleEnvelopeValue(Schema schema, Struct value) {
final Struct updatedEnvelopeValue = new Struct(schema);
Struct updatedAfterValue = getStruct(value, Envelope.FieldName.AFTER);
Struct updatedBeforeValue = getStruct(value, Envelope.FieldName.BEFORE);
if (updatedAfterValue != null) {
updatedEnvelopeValue.put(Envelope.FieldName.AFTER, updatedAfterValue);
}
if (updatedBeforeValue != null) {
updatedEnvelopeValue.put(Envelope.FieldName.BEFORE, updatedBeforeValue);
}
return updatedEnvelopeValue;
}
private Struct getStruct(Struct struct, String structName) {
try {
return Requirements.requireStructOrNull(struct.getStruct(structName), "");
}
catch (DataException ignored) {
}
return null;
}
private String getTableFromSource(Struct value) {
try {
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
return source.getString("table");
}
catch (DataException ignored) {
}
return null;
}
private static class FieldItem {
private final String prefix;
private final String matchName;
private final String fieldName;
FieldItem(String prefix, String matchName, String fieldName) {
this.prefix = prefix;
this.matchName = matchName;
this.fieldName = fieldName;
}
public String getPrefix() {
return prefix;
}
public String getMatchName() {
return matchName;
}
public String getFieldName() {
return fieldName;
}
}
private FieldItem parseItem(String item) {
String prefix = null;
String matchName = null;
String fieldName = null;
String[] parts = item.split(":");
if (parts.length == 1) {
// table or topic
matchName = parts[0];
}
else if (parts.length >= 2 && parts.length <= 3) {
if (parts[0].equalsIgnoreCase(SOURCE) || parts[0].equalsIgnoreCase(TOPIC)) {
// source:table or topic:topic
// source:table1,table2 or topic:topic1,topic2
prefix = parts[0];
matchName = parts[1];
if (parts.length == 3) {
// source:table:field or topic:topic:field
fieldName = parts[2];
}
}
else {
// table:field or topic:field
matchName = parts[0];
fieldName = parts[1];
}
}
return new FieldItem(prefix, matchName, fieldName);
}
private static class MatchFieldsResult {
private final String matchName;
private final Set<String> fields;
MatchFieldsResult(String matchName, Set<String> fields) {
this.matchName = matchName;
this.fields = fields;
}
public String getMatchName() {
return matchName;
}
public Set<String> getFields() {
return fields;
}
}
private MatchFieldsResult handleMatchNameAndFields(String table, String topic) {
String matchName = null;
Set<String> fields = Collections.emptySet();
if (topicFieldsMap.containsKey(topic)) {
matchName = topic;
fields = topicFieldsMap.get(topic);
}
else if (tableFieldsMap.containsKey(table)) {
matchName = table;
fields = tableFieldsMap.get(table);
}
else if (noPrefixFieldsMap.containsKey(topic)) {
matchName = topic;
fields = noPrefixFieldsMap.get(topic);
}
else if (noPrefixFieldsMap.containsKey(table)) {
matchName = table;
fields = noPrefixFieldsMap.get(table);
}
return new MatchFieldsResult(matchName, fields);
}
private void handleInclude(Struct value, String table, String topic) {
MatchFieldsResult matchFieldsResult = handleMatchNameAndFields(table, topic);
String matchName = matchFieldsResult.getMatchName();
Set<String> fields = matchFieldsResult.getFields();
if (matchName != null) {
if (!fields.contains(null)) {
handleStructs(value, Type.INCLUDE, matchName, fields);
}
else {
handleStructs(value, Type.ALL, matchName, fields);
}
}
else {
handleStructs(value, Type.ALL, table, Set.of(""));
}
}
private void handleExclude(Struct value, String table, String topic) {
MatchFieldsResult matchFieldsResult = handleMatchNameAndFields(table, topic);
String matchName = matchFieldsResult.getMatchName();
Set<String> fields = matchFieldsResult.getFields();
if (matchName == null) {
handleStructs(value, Type.ALL, table != null ? table : topic, Set.of(""));
}
else if (!fields.contains(null)) {
handleStructs(value, Type.EXCLUDE, matchName, fields);
}
}
private void handleAllRecords(Struct value, String table, String topic) {
if (!topicFieldsMap.containsKey(topic) && !tableFieldsMap.containsKey(table) && !noPrefixFieldsMap.containsKey(table)) {
handleStructs(value, Type.ALL, table != null ? table : topic, Set.of(""));
}
}
}

View File

@ -0,0 +1,793 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import io.debezium.DebeziumException;
import io.debezium.data.Envelope;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTimestamp;
import io.debezium.time.Timestamp;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
public class TimezoneConverterTest {
private final TimezoneConverter<SourceRecord> converter = new TimezoneConverter();
protected final Schema sourceSchema = SchemaBuilder.struct()
.field("table", Schema.STRING_SCHEMA)
.field("lsn", Schema.INT32_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA)
.build();
@Test
public void testMultipleDebeziumTimestamps() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+05:30");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("order_date_micros", MicroTimestamp.builder().optional().build())
.field("order_date_nanos", NanoTimestamp.builder().optional().build())
.field("order_date_timestamp", Timestamp.builder().optional().build())
.field("order_date_zoned_timestamp", ZonedTimestamp.builder().optional().build())
.field("order_date_zoned_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "Srikanth");
before.put("order_date_micros", 1529507596945104L);
before.put("order_date_nanos", 1531481025340000000L);
before.put("order_date_timestamp", 1514908810123L);
before.put("order_date_zoned_timestamp", "2018-01-02T11:15:30.123456789+02:00");
before.put("order_date_zoned_time", "11:15:30.123456789+02:00");
source.put("table", "orders");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("order_date_micros")).isEqualTo(1529487796945000L);
assertThat(transformedAfter.get("order_date_nanos")).isEqualTo(1531461225340000000L);
assertThat(transformedAfter.get("order_date_timestamp")).isEqualTo(1514889010123L);
assertThat(transformedAfter.get("order_date_zoned_timestamp")).isEqualTo("2018-01-02T14:45:30.123456789+05:30");
assertThat(transformedAfter.get("order_date_zoned_time")).isEqualTo("14:45:30.123456789+05:30");
}
@Test
public void testSingleDebeziumTimestamp() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Pacific/Easter");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("updated_at", ZonedTimestamp.builder().optional().build())
.field("order_date", ZonedTimestamp.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "Srikanth");
before.put("created_at", "2011-01-11T16:40:30.123456789+05:30");
before.put("updated_at", "2011-02-02T11:04:30.123456789+05:30");
before.put("order_date", "2011-04-09T13:00:30.123456789+05:30");
source.put("table", "orders");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("2011-01-11T05:10:30.123456789-06:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("2011-02-01T23:34:30.123456789-06:00");
assertThat(transformedAfter.get("order_date")).isEqualTo("2011-04-09T01:30:30.123456789-06:00");
}
@Test
public void testKafkaConnectTimestamp() {
Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Africa/Cairo");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", org.apache.kafka.connect.data.Timestamp.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "Pierre Wright");
before.put("created_at", Date.from(LocalDateTime.of(2018, 3, 27, 2, 0, 0).toInstant(ZoneOffset.UTC)));
source.put("table", "orders");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isInstanceOf(Date.class);
assertThat(transformedAfter.get("created_at")).isEqualTo(Date.from(LocalDateTime.of(2018, 3, 27, 0, 0, 0).toInstant(ZoneOffset.UTC)));
}
@Test
public void testIncludeListWithTablePrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Atlantic/Azores");
props.put("include.list", "source:customers:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("updated_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("updated_at", "12:10:10+02:00");
before.put("order_time", "12:10:10+02:00");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
assertThat(transformedAfter.get("updated_at")).isEqualTo("12:10:10+02:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("10:10:10Z");
}
@Test
public void testIncludeListWithTopicPrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+05:30");
props.put("include.list", "topic:db.server1.table1:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "12:10:10+02:00");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
}
@Test
public void testIncludeListWithNoPrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Asia/Kolkata");
props.put("include.list", "db.server1.table1:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "12:10:10+02:00");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("created_at")).isEqualTo("11:55:30+02:00");
}
@Test
public void testExcludeListWithTablePrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+05:30");
props.put("exclude.list", "source:customers:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("updated_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("updated_at", "12:10:10+02:00");
before.put("order_time", "15:40:10+05:30");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("15:25:30+05:30");
assertThat(transformedAfter.get("updated_at")).isEqualTo("15:40:10+05:30");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
}
@Test
public void testExcludeListWithTopicPrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "America/Chicago");
props.put("exclude.list", "topic:db.server1.table1:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "15:40:10+05:30");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("04:55:30-05:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
}
@Test
public void testExcludeListWithNoPrefix() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+08:00");
props.put("exclude.list", "db.server1.table1:order_time");
converter.configure(props);
Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct before = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
before.put("id", (byte) 1);
before.put("name", "John Doe");
before.put("created_at", "11:55:30+02:00");
before.put("order_time", "15:40:10+05:30");
source.put("table", "customers");
source.put("lsn", 1);
source.put("ts_ms", 123456789);
final Envelope envelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(recordSchema)
.withSource(sourceSchema)
.build();
final Struct payload = envelope.create(before, source, Instant.now());
SourceRecord record = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.table1",
envelope.schema(),
payload);
final SourceRecord transformedRecord = converter.apply(record);
final Struct transformedValue = (Struct) transformedRecord.value();
final Struct transformedAfter = transformedValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedAfter.get("created_at")).isEqualTo("17:55:30+08:00");
assertThat(transformedAfter.get("order_time")).isEqualTo("15:40:10+05:30");
}
@Test
public void testIncludeListMultipleTables() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+03:00");
props.put("include.list", "source:customers1,orders1,public1");
converter.configure(props);
Schema customersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("updated_at", ZonedTimestamp.builder().optional().build())
.build();
final Struct customersBefore = new Struct(customersRecordSchema);
final Struct customersSource = new Struct(sourceSchema);
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "John Doe");
customersBefore.put("created_at", "2020-01-01T11:55:30+02:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30");
customersSource.put("table", "customers1");
customersSource.put("lsn", 1);
customersSource.put("ts_ms", 123456789);
final Envelope customersEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(customersRecordSchema)
.withSource(sourceSchema)
.build();
final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now());
SourceRecord customersRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.customers",
customersEnvelope.schema(),
customersPayload);
final SourceRecord transformedCustomersRecord = converter.apply(customersRecord);
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T12:55:30+03:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T13:10:10+03:00");
Schema ordersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("customer_id", Schema.INT8_SCHEMA)
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("order_time", ZonedTimestamp.builder().optional().build())
.build();
final Struct ordersBefore = new Struct(ordersRecordSchema);
final Struct ordersSource = new Struct(sourceSchema);
ordersBefore.put("id", (byte) 1);
ordersBefore.put("customer_id", (byte) 1);
ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00");
ordersSource.put("table", "orders1");
ordersSource.put("lsn", 1);
ordersSource.put("ts_ms", 123456789);
final Envelope ordersEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(ordersRecordSchema)
.withSource(sourceSchema)
.build();
final Struct ordersPayload = ordersEnvelope.create(ordersBefore, ordersSource, Instant.now());
SourceRecord ordersRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.orders",
ordersEnvelope.schema(),
ordersPayload);
final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord);
final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value();
final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T12:50:45+03:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T12:55:30+03:00");
}
@Test
public void testExcludeListMultipleTables() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "-06:00");
props.put("exclude.list", "topic:db.server1.customers,db.server1.public");
converter.configure(props);
Schema customersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("updated_at", ZonedTimestamp.builder().optional().build())
.build();
final Struct customersBefore = new Struct(customersRecordSchema);
final Struct customersSource = new Struct(sourceSchema);
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "John Doe");
customersBefore.put("created_at", "2020-01-01T11:55:30+02:00");
customersBefore.put("updated_at", "2020-01-01T15:40:10+05:30");
customersSource.put("table", "customers1");
customersSource.put("lsn", 1);
customersSource.put("ts_ms", 123456789);
final Envelope customersEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(customersRecordSchema)
.withSource(sourceSchema)
.build();
final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now());
SourceRecord customersRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.customers",
customersEnvelope.schema(),
customersPayload);
final SourceRecord transformedCustomersRecord = converter.apply(customersRecord);
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("created_at")).isEqualTo("2020-01-01T11:55:30+02:00");
assertThat(transformedCustomersAfter.get("updated_at")).isEqualTo("2020-01-01T15:40:10+05:30");
Schema ordersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("customer_id", Schema.INT8_SCHEMA)
.field("created_at", ZonedTimestamp.builder().optional().build())
.field("order_time", ZonedTimestamp.builder().optional().build())
.build();
final Struct ordersBefore = new Struct(ordersRecordSchema);
final Struct ordersSource = new Struct(sourceSchema);
ordersBefore.put("id", (byte) 1);
ordersBefore.put("customer_id", (byte) 1);
ordersBefore.put("created_at", "2023-08-01T11:50:45+02:00");
ordersBefore.put("order_time", "2023-09-01T11:55:30+02:00");
ordersSource.put("table", "orders1");
ordersSource.put("lsn", 1);
ordersSource.put("ts_ms", 123456789);
final Envelope ordersEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(ordersRecordSchema)
.withSource(sourceSchema)
.build();
final Struct ordersPayload = ordersEnvelope.create(ordersBefore, ordersSource, Instant.now());
SourceRecord ordersRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.orders",
ordersEnvelope.schema(),
ordersPayload);
final SourceRecord transformedOrdersRecord = converter.apply(ordersRecord);
final Struct transformedOrdersValue = (Struct) transformedOrdersRecord.value();
final Struct transformedOrdersAfter = transformedOrdersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedOrdersAfter.get("created_at")).isEqualTo("2023-08-01T03:50:45-06:00");
assertThat(transformedOrdersAfter.get("order_time")).isEqualTo("2023-09-01T03:55:30-06:00");
}
@Test
public void testBothIncludeExcludeList() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "+05:30");
props.put("include.list", "topic:db.server1.customers,db.server1.public");
props.put("exclude.list", "topic:db.server1.customers,db.server1.public");
assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(DebeziumException.class);
assertThat(catchThrowable(() -> converter.configure(props))).hasMessageContaining("Both include and exclude lists are specified. Please specify only one.");
}
@Test
public void testWithNoConfig() {
final Map<String, String> props = new HashMap<>();
props.put("include.list", "topic:db.server1.customers");
assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(ConfigException.class);
assertThat(catchThrowable(() -> converter.configure(props)))
.hasMessageContaining("Invalid value null for configuration converted.timezone: The 'converted.timezone' value is invalid: A value is required");
}
@Test
public void testWithInvalidConfig() {
final Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Asia");
props.put("include.list", "topic:db.server1.customers");
assertThat(catchThrowable(() -> converter.configure(props))).isInstanceOf(DebeziumException.class);
assertThat(catchThrowable(() -> converter.configure(props)))
.hasMessageContaining(
"Invalid timezone format. Please specify either a geographic timezone (e.g. America/Los_Angeles) or a UTC offset in the format of +/-hh:mm, (e.g. +08:00).");
}
@Test
public void testExcludeListWithMultipleFields() {
Map<String, String> props = new HashMap<>();
props.put("converted.timezone", "Europe/Moscow");
props.put("exclude.list", "topic:db.server1.customers:order_time,db.server1.inventory:order_time");
converter.configure(props);
Schema customersRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct customersBefore = new Struct(customersRecordSchema);
customersBefore.put("id", (byte) 1);
customersBefore.put("name", "Amy Rose");
customersBefore.put("order_time", "10:19:25+05:30");
final Struct customersSource = new Struct(sourceSchema);
customersSource.put("table", "customers");
customersSource.put("lsn", 1);
customersSource.put("ts_ms", 123456789);
final Envelope customersEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(customersRecordSchema)
.withSource(sourceSchema)
.build();
final Struct customersPayload = customersEnvelope.create(customersBefore, customersSource, Instant.now());
SourceRecord customersRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.customers",
customersEnvelope.schema(),
customersPayload);
final SourceRecord transformedCustomersRecord = converter.apply(customersRecord);
final Struct transformedCustomersValue = (Struct) transformedCustomersRecord.value();
final Struct transformedCustomersAfter = transformedCustomersValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedCustomersAfter.get("name")).isEqualTo("Amy Rose");
assertThat(transformedCustomersAfter.get("order_time")).isEqualTo("10:19:25+05:30");
Schema inventoryRecordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("shipping_time", ZonedTime.builder().optional().build())
.field("order_time", ZonedTime.builder().optional().build())
.build();
final Struct inventoryBefore = new Struct(inventoryRecordSchema);
inventoryBefore.put("id", (byte) 1);
inventoryBefore.put("name", "Amy Rose");
inventoryBefore.put("shipping_time", "19:19:25+05:30");
inventoryBefore.put("order_time", "10:19:25+05:30");
final Struct inventorySource = new Struct(sourceSchema);
inventorySource.put("table", "inventory");
inventorySource.put("lsn", 1);
inventorySource.put("ts_ms", 123456789);
final Envelope inventoryEnvelope = Envelope.defineSchema()
.withName("dummy.Envelope")
.withRecord(inventoryRecordSchema)
.withSource(sourceSchema)
.build();
final Struct inventoryPayload = inventoryEnvelope.create(inventoryBefore, inventorySource, Instant.now());
SourceRecord inventoryRecord = new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"db.server1.inventory",
inventoryEnvelope.schema(),
inventoryPayload);
final SourceRecord transformedInventoryRecord = converter.apply(inventoryRecord);
final Struct transformedInventoryValue = (Struct) transformedInventoryRecord.value();
final Struct transformedInventoryAfter = transformedInventoryValue.getStruct(Envelope.FieldName.AFTER);
assertThat(transformedInventoryAfter.get("name")).isEqualTo("Amy Rose");
assertThat(transformedInventoryAfter.get("shipping_time")).isEqualTo("16:49:25+03:00");
assertThat(transformedInventoryAfter.get("order_time")).isEqualTo("10:19:25+05:30");
}
}