DBZ-7145 Introduce a new SMT to extract table schema information for jdbc sink connector

This commit is contained in:
harveyyue 2023-10-30 10:40:01 +08:00 committed by Jiri Pechanec
parent 018ecfe9f6
commit b6924e6efc
7 changed files with 483 additions and 16 deletions

View File

@ -5,6 +5,12 @@
*/
package io.debezium.data;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.COLUMN_COMMENT_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.COLUMN_NAME_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.TYPE_NAME_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.TYPE_SCALE_PARAMETER_KEY;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
@ -12,10 +18,13 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@ -303,4 +312,80 @@ protected void appendAdditional(String name, Object value) {
}
}
/**
* Copy all properties to a new schema builder
*
* @param source Connect schema
* @return Connect schema build
*/
public static SchemaBuilder copySchema(Schema source) {
SchemaBuilder builder = org.apache.kafka.connect.transforms.util.SchemaUtil.copySchemaBasics(source);
if (source.isOptional()) {
builder.optional();
}
else {
builder.required();
}
for (org.apache.kafka.connect.data.Field field : source.fields()) {
builder.field(field.name(), field.schema());
}
return builder;
}
/**
* Extract source column type from connect schema's parameters
*
* @param schema Connect schema
* @return the source column type
*/
public static Optional<String> getSourceColumnType(Schema schema) {
return getSchemaParameter(schema, TYPE_NAME_PARAMETER_KEY);
}
/**
* Extract source column length from connect schema's parameters
*
* @param schema Connect schema
* @return the source column length
*/
public static Optional<String> getSourceColumnSize(Schema schema) {
return getSchemaParameter(schema, TYPE_LENGTH_PARAMETER_KEY);
}
/**
* Extract source column scale from connect schema's parameters
*
* @param schema Connect schema
* @return the source column scale
*/
public static Optional<String> getSourceColumnPrecision(Schema schema) {
return getSchemaParameter(schema, TYPE_SCALE_PARAMETER_KEY);
}
/**
* Extract source column name from connect schema's parameters
*
* @param schema Connect schema
* @return the source column name
*/
public static Optional<String> getSourceColumnName(Schema schema) {
return getSchemaParameter(schema, COLUMN_NAME_PARAMETER_KEY);
}
/**
* Extract source column comment from connect schema's parameters
*
* @param schema Connect schema
* @return the source column comment
*/
public static Optional<String> getSourceColumnComment(Schema schema) {
return getSchemaParameter(schema, COLUMN_COMMENT_PARAMETER_KEY);
}
public static Optional<String> getSchemaParameter(Schema schema, String parameterName) {
if (!Objects.isNull(schema.parameters())) {
return Optional.ofNullable(schema.parameters().get(parameterName));
}
return Optional.empty();
}
}

View File

@ -102,10 +102,10 @@ public Builder(TableIdToStringMapper tableIdMapper) {
* @return this object so that methods can be chained together; never null
*/
public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) {
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> fullyQualifiedColumnName(tableId, column));
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(fullyQualifiedColumnNames, this::fullyQualifiedColumnName);
rules.add(new MapperRule(columnMatcher, mapper));
if (tableIdMapper != null) {
columnMatcher = Predicates.includes(fullyQualifiedColumnNames, (tableId, column) -> mappedTableColumnName(tableId, column));
columnMatcher = Predicates.includes(fullyQualifiedColumnNames, this::mappedTableColumnName);
rules.add(new MapperRule(columnMatcher, mapper));
}
return this;
@ -122,10 +122,10 @@ public String mappedTableColumnName(TableId tableId, Column column) {
}
public Builder mapByDatatype(String columnDatatypes, ColumnMapper mapper) {
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> fullyQualifiedColumnDatatype(tableId, column));
BiPredicate<TableId, Column> columnMatcher = Predicates.includes(columnDatatypes, this::fullyQualifiedColumnDatatype);
rules.add(new MapperRule(columnMatcher, mapper));
if (tableIdMapper != null) {
columnMatcher = Predicates.includes(columnDatatypes, (tableId, column) -> mappedTableColumnDatatype(tableId, column));
columnMatcher = Predicates.includes(columnDatatypes, this::mappedTableColumnDatatype);
rules.add(new MapperRule(columnMatcher, mapper));
}
return this;
@ -328,10 +328,7 @@ public ValueConverter mappingConverterFor(TableId tableId, Column column) {
*/
public ColumnMapper mapperFor(TableId tableId, Column column) {
Optional<MapperRule> matchingRule = rules.stream().filter(rule -> rule.matches(tableId, column)).findFirst();
if (matchingRule.isPresent()) {
return matchingRule.get().mapper;
}
return null;
return matchingRule.map(mapperRule -> mapperRule.mapper).orElse(null);
}
@Immutable

View File

@ -22,11 +22,11 @@
*/
public class PropagateSourceMetadataToSchemaParameter implements ColumnMapper {
private static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
private static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
private static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
private static final String COLUMN_COMMENT_PARAMETER_KEY = "__debezium.source.column.comment";
private static final String COLUMN_NAME_PARAMETER_KEY = "__debezium.source.column.name";
public static final String TYPE_NAME_PARAMETER_KEY = "__debezium.source.column.type";
public static final String TYPE_LENGTH_PARAMETER_KEY = "__debezium.source.column.length";
public static final String TYPE_SCALE_PARAMETER_KEY = "__debezium.source.column.scale";
public static final String COLUMN_COMMENT_PARAMETER_KEY = "__debezium.source.column.comment";
public static final String COLUMN_NAME_PARAMETER_KEY = "__debezium.source.column.name";
@Override
public ValueConverter create(Column column) {

View File

@ -78,6 +78,20 @@ public class SchemaFactory {
private static final String SCHEMA_HISTORY_CHANGE_SCHEMA_NAME = "io.debezium.connector.schema.Change";
private static final int SCHEMA_HISTORY_CHANGE_SCHEMA_VERSION = 1;
/*
* Source schema block's schemas
*/
private static final String SOURCE_SCHEMA_NAME = "io.debezium.connector.source.Schema";
private static final Integer SOURCE_SCHEMA_VERSION = 1;
private static final String SOURCE_SCHEMA_TABLE_SCHEMA_NAME = "io.debezium.connector.source.schema.Table";
private static final Integer SOURCE_SCHEMA_TABLE_SCHEMA_VERSION = 1;
private static final String SOURCE_SCHEMA_COLUMN_SCHEMA_NAME = "io.debezium.connector.source.schema.Column";
private static final Integer SOURCE_SCHEMA_COLUMN_SCHEMA_VERSION = 1;
/*
* Notification schemas
*/
private static final String NOTIFICATION_KEY_SCHEMA_NAME = "io.debezium.connector.common.NotificationKey";
private static final Integer NOTIFICATION_KEY_SCHEMA_VERSION = 1;
private static final String NOTIFICATION_VALUE_SCHEMA_NAME = "io.debezium.connector.common.Notification";
@ -223,6 +237,35 @@ public Schema schemaHistoryConnectorValueSchema(SchemaNameAdjuster adjuster, Com
.build();
}
public Schema sourceSchemaBlockSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(SOURCE_SCHEMA_NAME))
.version(SOURCE_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.ID_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.TABLE_KEY, sourceSchemaBlockTableSchema(adjuster))
.build();
}
public Schema sourceSchemaBlockTableSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(SOURCE_SCHEMA_TABLE_SCHEMA_NAME)
.version(SOURCE_SCHEMA_TABLE_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.COLUMNS_KEY, SchemaBuilder.array(sourceSchemaBlockColumnSchema(adjuster)).build())
.build();
}
public Schema sourceSchemaBlockColumnSchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(SOURCE_SCHEMA_COLUMN_SCHEMA_NAME))
.version(SOURCE_SCHEMA_COLUMN_SCHEMA_VERSION)
.field(ConnectTableChangeSerializer.NAME_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.TYPE_NAME_KEY, Schema.STRING_SCHEMA)
.field(ConnectTableChangeSerializer.LENGTH_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(ConnectTableChangeSerializer.SCALE_KEY, Schema.OPTIONAL_INT32_SCHEMA)
.field(ConnectTableChangeSerializer.COMMENT_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
}
public Schema notificationKeySchema(SchemaNameAdjuster adjuster) {
return SchemaBuilder.struct()
.name(adjuster.adjust(NOTIFICATION_KEY_SCHEMA_NAME))

View File

@ -0,0 +1,202 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static io.debezium.config.CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE;
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static io.debezium.data.Envelope.FieldName.SOURCE;
import static io.debezium.data.SchemaUtil.getSourceColumnComment;
import static io.debezium.data.SchemaUtil.getSourceColumnName;
import static io.debezium.data.SchemaUtil.getSourceColumnPrecision;
import static io.debezium.data.SchemaUtil.getSourceColumnSize;
import static io.debezium.data.SchemaUtil.getSourceColumnType;
import static io.debezium.relational.history.ConnectTableChangeSerializer.COLUMNS_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.COMMENT_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.ID_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.LENGTH_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.NAME_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.SCALE_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.TABLE_KEY;
import static io.debezium.relational.history.ConnectTableChangeSerializer.TYPE_NAME_KEY;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig.SchemaNameAdjustmentMode;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.SchemaUtil;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.util.BoundedConcurrentHashMap;
public class ExtractSchemaToNewRecord<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractSchemaToNewRecord.class);
public static final String SOURCE_SCHEMA_KEY = "sourceSchema";
private final ExtractField<R> afterDelegate = ConnectRecordUtil.extractAfterDelegate();
private final ExtractField<R> beforeDelegate = ConnectRecordUtil.extractBeforeDelegate();
private final BoundedConcurrentHashMap<Schema, NewRecordValueMetadata> recordValueSchemaCache = new BoundedConcurrentHashMap<>(10240);
private final Field.Set configFields = Field.setOf();
private SchemaNameAdjuster schemaNameAdjuster;
private SmtManager<R> smtManager;
@Override
public R apply(R record) {
// Handling tombstone record
if (record.value() == null) {
return record;
}
if (!smtManager.isValidEnvelope(record)) {
return record;
}
R afterRecord = afterDelegate.apply(record);
R beforeRecord = beforeDelegate.apply(record);
Struct oldValue = (Struct) record.value();
// Handling truncate record
if (afterRecord.value() == null && beforeRecord.value() == null) {
return record;
}
NewRecordValueMetadata newRecordValueMetadata = recordValueSchemaCache.computeIfAbsent(record.valueSchema(),
key -> makeUpdatedSchema(key, oldValue, afterRecord));
Struct newValue = new Struct(newRecordValueMetadata.schema);
for (org.apache.kafka.connect.data.Field field : record.valueSchema().fields()) {
Object value = oldValue.get(field);
if (value != null) {
newValue.put(field, value);
}
}
org.apache.kafka.connect.data.Field sourceSchemaField = newRecordValueMetadata.schema.field(SOURCE_SCHEMA_KEY);
newValue.put(sourceSchemaField, newRecordValueMetadata.metadataValue);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
newRecordValueMetadata.schema,
newValue,
record.timestamp(),
record.headers());
}
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
Field.group(config, null, configFields.asArray());
return config;
}
@Override
public void close() {
afterDelegate.close();
beforeDelegate.close();
}
@Override
public void configure(Map<String, ?> configs) {
Configuration config = Configuration.from(configs);
smtManager = new SmtManager<>(config);
if (!config.validateAndRecord(validateConfigFields(), LOGGER::error)) {
throw new DebeziumException("Unable to validate config.");
}
schemaNameAdjuster = SchemaNameAdjustmentMode.parse(config.getString(SCHEMA_NAME_ADJUSTMENT_MODE))
.createAdjuster();
}
private Iterable<Field> validateConfigFields() {
return configFields;
}
private NewRecordValueMetadata makeUpdatedSchema(Schema originalSchema, Struct originalValue, R afterRecord) {
// Build new value schema with adding "sourceSchema" property
SchemaBuilder builder = SchemaUtil.copySchema(originalSchema);
Schema sourceSchemaBlockSchema = SchemaFactory.get().sourceSchemaBlockSchema(schemaNameAdjuster);
builder.field(SOURCE_SCHEMA_KEY, sourceSchemaBlockSchema);
Schema newValueSchema = builder.build();
// Source Schema struct
Struct sourceSchemaStruct = new Struct(sourceSchemaBlockSchema);
Struct sourceStruct = (Struct) originalValue.get(SOURCE);
sourceSchemaStruct.put(ID_KEY, sourceStruct.get(TABLE_NAME_KEY));
// Table struct
Struct table = new Struct(SchemaFactory.get().sourceSchemaBlockTableSchema(schemaNameAdjuster));
List<Struct> columns = new ArrayList<>();
afterRecord.valueSchema().fields().forEach(field -> {
// Column struct
Struct column = new Struct(SchemaFactory.get().sourceSchemaBlockColumnSchema(schemaNameAdjuster));
Optional<String> nameOpt = getSourceColumnName(field.schema());
Optional<String> typeNameOpt = getSourceColumnType(field.schema());
if (nameOpt.isEmpty() || typeNameOpt.isEmpty()) {
throw new DebeziumException("Ensure that enable configurations \"column.propagate.source.type\" " +
"or \"datatype.propagate.source.type\" and the value is set to \".*\"");
}
nameOpt.ifPresent(s -> column.put(NAME_KEY, s));
typeNameOpt.ifPresent(s -> column.put(TYPE_NAME_KEY, s));
getSourceColumnSize(field.schema()).ifPresent(s -> column.put(LENGTH_KEY, Integer.parseInt(s)));
getSourceColumnPrecision(field.schema()).ifPresent(s -> column.put(SCALE_KEY, Integer.parseInt(s)));
getSourceColumnComment(field.schema()).ifPresent(s -> column.put(COMMENT_KEY, s));
columns.add(column);
});
table.put(COLUMNS_KEY, columns);
sourceSchemaStruct.put(TABLE_KEY, table);
return new NewRecordValueMetadata(newValueSchema, sourceSchemaStruct);
}
private static class NewRecordValueMetadata {
private final Schema schema;
private final Struct metadataValue;
NewRecordValueMetadata(Schema schema, Struct metadataValue) {
this.schema = schema;
this.metadataValue = metadataValue;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NewRecordValueMetadata metadata = (NewRecordValueMetadata) o;
return Objects.equals(schema, metadata.schema) &&
Objects.equals(metadataValue, metadata.metadataValue);
}
@Override
public int hashCode() {
return Objects.hash(schema, metadataValue);
}
@Override
public String toString() {
return "NewRecordValueMetadata{" + schema + ":" + metadataValue + "}";
}
}
}

View File

@ -5,6 +5,10 @@
*/
package io.debezium.transforms;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.COLUMN_NAME_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.TYPE_LENGTH_PARAMETER_KEY;
import static io.debezium.relational.mapping.PropagateSourceMetadataToSchemaParameter.TYPE_NAME_PARAMETER_KEY;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
@ -19,6 +23,7 @@
import io.debezium.data.Envelope;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.util.Collect;
/**
* A base abstract class for the Extract-based single message transform tests.
@ -40,14 +45,27 @@ public abstract class AbstractExtractStateTest {
protected static final String DROP_FIELDS_FROM_KEY = "drop.fields.from.key";
protected static final String DROP_FIELDS_KEEP_SCHEMA_COMPATIBLE = "drop.fields.keep.schema.compatible";
Schema idSchema = SchemaBuilder
.int8()
.parameters(Collect.hashMapOf(COLUMN_NAME_PARAMETER_KEY, "id", TYPE_NAME_PARAMETER_KEY, "int"))
.build();
Schema nameSchema = SchemaBuilder
.string()
.parameters(Collect.hashMapOf(
COLUMN_NAME_PARAMETER_KEY, "name",
TYPE_NAME_PARAMETER_KEY, "varchar",
TYPE_LENGTH_PARAMETER_KEY, "255"))
.build();
protected final Schema recordSchema = SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.STRING_SCHEMA)
.field("id", idSchema)
.field("name", nameSchema)
.build();
protected final Schema sourceSchema = SchemaBuilder.struct()
.field("lsn", Schema.INT32_SCHEMA)
.field("ts_ms", Schema.OPTIONAL_INT32_SCHEMA)
.field("db", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.build();
protected final Envelope envelope = Envelope.defineSchema()
@ -60,6 +78,8 @@ protected SourceRecord createDeleteRecord() {
final Schema deleteSourceSchema = SchemaBuilder.struct()
.field("lsn", SchemaBuilder.int32())
.field("version", SchemaBuilder.string())
.field("db", Schema.OPTIONAL_STRING_SCHEMA)
.field("table", Schema.OPTIONAL_STRING_SCHEMA)
.build();
Envelope deleteEnvelope = Envelope.defineSchema()
@ -75,8 +95,10 @@ protected SourceRecord createDeleteRecord() {
before.put("name", "myRecord");
source.put("lsn", 1234);
source.put("version", "version!");
source.put("db", "test_db");
source.put("table", "test_table");
final Struct payload = deleteEnvelope.delete(before, source, Instant.now());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", deleteEnvelope.schema(), payload);
}
protected SourceRecord createTombstoneRecord() {
@ -99,6 +121,8 @@ protected SourceRecord createCreateRecord() {
after.put("name", "myRecord");
source.put("lsn", 1234);
source.put("ts_ms", 12836);
source.put("db", "test_db");
source.put("table", "test_table");
final Struct payload = envelope.create(after, source, Instant.now());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
}
@ -192,6 +216,8 @@ protected SourceRecord createUpdateRecord() {
after.put("id", (byte) 1);
after.put("name", "updatedRecord");
source.put("lsn", 1234);
source.put("db", "test_db");
source.put("table", "test_table");
transaction.put("id", "571");
transaction.put("total_order", 42L);
transaction.put("data_collection_order", 42L);
@ -258,6 +284,8 @@ protected SourceRecord createUpdateRecordWithKey() {
after.put("id", (byte) 1);
after.put("name", "updatedRecord");
source.put("lsn", 1234);
source.put("db", "test_db");
source.put("table", "test_table");
transaction.put("id", "571");
transaction.put("total_order", 42L);
transaction.put("data_collection_order", 42L);

View File

@ -0,0 +1,112 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms;
import static io.debezium.transforms.ExtractSchemaToNewRecord.SOURCE_SCHEMA_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
import io.debezium.DebeziumException;
public class ExtractSchemaToNewRecordTest extends AbstractExtractStateTest {
@Test
public void testHandleCreatedRecord() {
SourceRecord createdRecord = createCreateRecord();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
SourceRecord transformRecord = transform.apply(createdRecord);
assertMetadataBlock(transformRecord);
}
}
@Test
public void testHandleUpdateRecord() {
SourceRecord updatedRecord = createUpdateRecord();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
SourceRecord transformRecord = transform.apply(updatedRecord);
assertMetadataBlock(transformRecord);
}
}
@Test
public void testHandleDeletedRecord() {
SourceRecord deletedRecord = createDeleteRecord();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
SourceRecord transformRecord = transform.apply(deletedRecord);
assertMetadataBlock(transformRecord);
}
}
@Test
public void testHandleTombstoneRecord() {
SourceRecord tombstoneRecord = createTombstoneRecord();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
SourceRecord transformRecord = transform.apply(tombstoneRecord);
assertThat(transformRecord).isEqualTo(tombstoneRecord);
}
}
@Test
public void testHandleTruncatedRecord() {
SourceRecord truncateRecord = createTruncateRecord();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
SourceRecord transformRecord = transform.apply(truncateRecord);
assertThat(transformRecord).isEqualTo(truncateRecord);
}
}
@Test
public void testHandleUpdateRecordWithoutSchemaParameters() {
SourceRecord truncateRecord = createUpdateRecordWithKey();
try (ExtractSchemaToNewRecord<SourceRecord> transform = new ExtractSchemaToNewRecord<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
transform.apply(truncateRecord);
}
catch (DebeziumException ex) {
assertThat(ex.getMessage())
.isEqualTo("Ensure that enable configurations \"column.propagate.source.type\" or \"datatype.propagate.source.type\" and the value is set to \".*\"");
}
}
private void assertMetadataBlock(SourceRecord transformRecord) {
Struct metadata = (Struct) ((Struct) transformRecord.value()).get(SOURCE_SCHEMA_KEY);
Struct table = (Struct) metadata.get("table");
List<Struct> columns = (List<Struct>) table.get("columns");
Struct id = columns.get(0);
Struct name = columns.get(1);
assertThat(metadata.get("id")).isEqualTo("test_table");
assertThat(id.get("name")).isEqualTo("id");
assertThat(id.get("typeName")).isEqualTo("int");
assertThat(name.get("name")).isEqualTo("name");
assertThat(name.get("typeName")).isEqualTo("varchar");
assertThat(name.get("length")).isEqualTo(255);
}
}