DBZ-1452 Misc. fixes and refactoring;

* Adding support for "transaction" struct
* Documentation updates
* Only one "_" as separator between struct and field name
This commit is contained in:
Gunnar Morling 2020-02-12 13:00:17 +01:00 committed by Chris Cranford
parent 49bbec319b
commit 61c4f834a6
5 changed files with 238 additions and 152 deletions

View File

@ -35,6 +35,7 @@ Christian Posta
Cliff Wheadon
Collin Van Dyck
Cyril Scetbon
Daan Roosen
David Chen
David Feinblum
David Leibovic

View File

@ -7,9 +7,14 @@
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@ -30,8 +35,10 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DeleteHandling;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.Strings;
/**
* Debezium generates CDC (<code>Envelope</code>) records that are struct of values containing values
@ -45,30 +52,27 @@
* <p>
* The SMT by default drops the tombstone message created by Debezium and converts the delete message into
* a tombstone message that can be dropped, too, if required.
* * <p>
* The SMT also has the option to insert fields from the original record's 'source' struct into the new
* unwrapped record prefixed with "__" (for example __lsn in Postgres, or __file in MySQL)
* <p>
* The SMT also has the option to insert fields from the original record (e.g. 'op' or 'source.ts_ms' into the
* unwrapped record or ad them as header attributes.
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Jiri Pechanec
*/
public class ExtractNewRecordState<R extends ConnectRecord<R>> implements Transformation<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractNewRecordState.class);
private static final String PURPOSE = "source field insertion";
private static final int SCHEMA_CACHE_SIZE = 64;
private static final String OPERATION_FIELD_NAME = "op";
private static final String TIMESTAMP_FIELD_NAME = "ts_ms";
private static final String SOURCE_STRUCT_NAME = "source";
private static final Logger LOGGER = LoggerFactory.getLogger(ExtractNewRecordState.class);
private static final Pattern FIELD_SEPARATOR = Pattern.compile("\\.");
private boolean dropTombstones;
private DeleteHandling handleDeletes;
private boolean addOperationHeader;
private String[] addSourceFields;
private String[] addFields;
private String[] addHeaders;
private Map<String, String[]> fieldStructMap;
private List<FieldReference> additionalHeaders;
private List<FieldReference> additionalFields;
private String routeByField;
private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
@ -95,13 +99,8 @@ public void configure(final Map<String, ?> configs) {
addSourceFields = config.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).isEmpty() ? null
: config.getString(ExtractNewRecordStateConfigDefinition.ADD_SOURCE_FIELDS).split(",");
addFields = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS).isEmpty() ? null
: config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS).split(",");
addHeaders = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS).isEmpty() ? null
: config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS).split(",");
fieldStructMap = new HashMap<>();
additionalFields = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
additionalHeaders = FieldReference.fromConfiguration(config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
String routeFieldConfig = config.getString(ExtractNewRecordStateConfigDefinition.ROUTE_BY_FIELD);
routeByField = routeFieldConfig.isEmpty() ? null : routeFieldConfig;
@ -135,13 +134,9 @@ public R apply(final R record) {
LOGGER.trace("Tombstone {} arrived and requested to be dropped", record.key());
return null;
}
if (addHeaders != null) {
fieldStructMap = makeFieldMap(addHeaders);
Headers headersToAdd = makeHeaders(fieldStructMap, (Struct) record.value());
for (String header : addHeaders) {
record.headers().add(headersToAdd.lastWithName(makeFieldName(fieldStructMap.get(header))));
}
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
else if (addOperationHeader) {
operation = Envelope.Operation.DELETE;
@ -155,13 +150,9 @@ else if (addOperationHeader) {
return record;
}
if (addHeaders != null) {
fieldStructMap = makeFieldMap(addHeaders);
Headers headersToAdd = makeHeaders(fieldStructMap, (Struct) record.value());
for (String header : addHeaders) {
record.headers().add(headersToAdd.lastWithName(makeFieldName(fieldStructMap.get(header))));
}
if (!additionalHeaders.isEmpty()) {
Headers headersToAdd = makeHeaders(additionalHeaders, (Struct) record.value());
headersToAdd.forEach(h -> record.headers().add(h));
}
else if (addOperationHeader) {
LOGGER.warn("operation.header has been deprecated and is scheduled for removal. Use add.headers instead.");
@ -192,7 +183,7 @@ else if (addOperationHeader) {
case REWRITE:
LOGGER.trace("Delete message {} requested to be rewritten", record.key());
R oldRecord = beforeDelegate.apply(record);
oldRecord = addFields != null ? addFields(addFields, record, oldRecord) : addSourceFields(addSourceFields, record, oldRecord);
oldRecord = !additionalFields.isEmpty() ? addFields(additionalFields, record, oldRecord) : addSourceFields(addSourceFields, record, oldRecord);
return removedDelegate.apply(oldRecord);
default:
@ -207,7 +198,7 @@ else if (addOperationHeader) {
newRecord = setTopic(newTopicName, newRecord);
}
newRecord = addFields != null ? addFields(addFields, record, newRecord) : addSourceFields(addSourceFields, record, newRecord);
newRecord = !additionalFields.isEmpty() ? addFields(additionalFields, record, newRecord) : addSourceFields(addSourceFields, record, newRecord);
// Handling insert and update records
switch (handleDeletes) {
@ -233,44 +224,26 @@ private R setTopic(String updatedTopicValue, R record) {
record.timestamp());
}
private Headers makeHeaders(Map<String, String[]> fieldStructMap, Struct originalRecordValue) {
// Create an Headers object which contains the headers to be added
/**
* Create an Headers object which contains the headers to be added.
*/
private Headers makeHeaders(List<FieldReference> additionalHeaders, Struct originalRecordValue) {
Headers headers = new ConnectHeaders();
String[] fieldNameParts;
String fieldName;
for (String field : fieldStructMap.keySet()) {
fieldNameParts = fieldStructMap.get(field);
fieldName = getFieldName(fieldNameParts);
if (field.equals(OPERATION_FIELD_NAME) || field.equals(TIMESTAMP_FIELD_NAME)) {
headers.add(makeFieldName(fieldNameParts), originalRecordValue.get(fieldName), originalRecordValue.schema().field(fieldName).schema());
}
else {
Struct struct = originalRecordValue.getStruct(SOURCE_STRUCT_NAME);
if (struct.schema().field(fieldName) == null) {
throw new ConfigException("Field specified in 'add.headers' does not exist: " + field);
}
headers.add(makeFieldName(fieldNameParts), struct.get(fieldName), struct.schema().field(fieldName).schema());
}
for (FieldReference fieldReference : additionalHeaders) {
headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue),
fieldReference.getSchema(originalRecordValue.schema()));
}
return headers;
}
private R addFields(String[] addFields, R originalRecord, R unwrappedRecord) {
// Return if no fields to add
if (addFields == null) {
return unwrappedRecord;
}
Map<String, String[]> fieldStructMap = makeFieldMap(addFields);
private R addFields(List<FieldReference> additionalFields, R originalRecord, R unwrappedRecord) {
final Struct value = requireStruct(unwrappedRecord.value(), PURPOSE);
Struct originalRecordValue = (Struct) originalRecord.value();
Schema updatedSchema = schemaUpdateCache.computeIfAbsent(value.schema(),
s -> makeUpdatedSchema(fieldStructMap, value.schema(), originalRecordValue));
s -> makeUpdatedSchema(additionalFields, value.schema(), originalRecordValue));
// Update the value with the new fields
Struct updatedValue = new Struct(updatedSchema);
@ -279,14 +252,8 @@ private R addFields(String[] addFields, R originalRecord, R unwrappedRecord) {
}
for (String field : addFields) {
if (field.equals(OPERATION_FIELD_NAME) || field.equals(TIMESTAMP_FIELD_NAME)) {
updatedValue = updateValue(fieldStructMap.get(field), updatedValue, originalRecordValue);
}
else {
updatedValue = updateValue(fieldStructMap.get(field), updatedValue, originalRecordValue.getStruct(SOURCE_STRUCT_NAME));
}
for (FieldReference fieldReference : additionalFields) {
updatedValue = updateValue(fieldReference, updatedValue, originalRecordValue);
}
return unwrappedRecord.newRecord(
@ -299,7 +266,7 @@ private R addFields(String[] addFields, R originalRecord, R unwrappedRecord) {
unwrappedRecord.timestamp());
}
private Schema makeUpdatedSchema(Map<String, String[]> fieldStructMap, Schema schema, Struct originalRecordValue) {
private Schema makeUpdatedSchema(List<FieldReference> additionalFields, Schema schema, Struct originalRecordValue) {
// Get fields from original schema
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (org.apache.kafka.connect.data.Field field : schema.fields()) {
@ -307,61 +274,19 @@ private Schema makeUpdatedSchema(Map<String, String[]> fieldStructMap, Schema sc
}
// Update the schema with the new fields
for (String field : fieldStructMap.keySet()) {
if (field.equals(OPERATION_FIELD_NAME) || field.equals(TIMESTAMP_FIELD_NAME)) {
builder = updateSchema(fieldStructMap.get(field), field, builder, originalRecordValue.schema());
}
else {
builder = updateSchema(fieldStructMap.get(field), field, builder, originalRecordValue.getStruct(SOURCE_STRUCT_NAME).schema());
}
for (FieldReference fieldReference : additionalFields) {
builder = updateSchema(fieldReference, builder, originalRecordValue.schema());
}
return builder.build();
}
private SchemaBuilder updateSchema(String[] fieldNameParts, String field, SchemaBuilder builder, Schema structSchema) {
String fieldName = getFieldName(fieldNameParts);
if (structSchema.field(fieldName) == null) {
throw new ConfigException("Field specified in 'add.fields' does not exist: " + field);
}
builder.field(
makeFieldName(fieldNameParts), structSchema.field(fieldName).schema());
return builder;
private SchemaBuilder updateSchema(FieldReference fieldReference, SchemaBuilder builder, Schema originalRecordSchema) {
return builder.field(fieldReference.getNewFieldName(), fieldReference.getSchema(originalRecordSchema));
}
private Struct updateValue(String[] fieldNameParts, Struct updatedValue, Struct struct) {
String fieldName = getFieldName(fieldNameParts);
return updatedValue.put(makeFieldName(fieldNameParts), struct.get(fieldName));
}
private Map<String, String[]> makeFieldMap(String[] fields) {
// Convert the provided fields into a Map with the provided value as key and an the struct and field as value (in an array)
Map<String, String[]> fieldMap = new LinkedHashMap<>();
for (String field : fields) {
fieldMap.computeIfAbsent(field, s -> field.split("\\."));
}
return fieldMap;
}
private String getFieldName(String[] fieldNameParts) {
// Returns the field name without an (optional) struct specification
return fieldNameParts[fieldNameParts.length - 1];
}
private String makeFieldName(String[] fieldNameParts) {
// Makes a name for the field in the unwrapped record
if (fieldNameParts.length > 1) {
return ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + fieldNameParts[0] +
ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + fieldNameParts[1];
}
else {
return ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + fieldNameParts[0];
}
private Struct updateValue(FieldReference fieldReference, Struct updatedValue, Struct struct) {
return updatedValue.put(fieldReference.getNewFieldName(), fieldReference.getValue(struct));
}
private R addSourceFields(String[] addSourceFields, R originalRecord, R unwrappedRecord) {
@ -431,4 +356,100 @@ public void close() {
updatedDelegate.close();
}
/**
* Represents a field that should be added to the outgoing record as a header
* attribute or struct field.
*/
private static class FieldReference {
/**
* The struct ("source", "transaction") hosting the given field, or {@code null} for "op" and "ts_ms".
*/
private final String struct;
/**
* The simple field name.
*/
private final String field;
/**
* The name for the outgoing attribute/field, e.g. "__op" or "__source_ts_ms".
*/
private final String newFieldName;
private FieldReference(String field) {
String[] parts = FIELD_SEPARATOR.split(field);
if (parts.length == 1) {
this.struct = determineStruct(parts[0]);
this.field = parts[0];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + field;
}
else if (parts.length == 2) {
this.struct = parts[0];
if (!(this.struct.equals(Envelope.FieldName.SOURCE) || this.struct.equals(Envelope.FieldName.TRANSACTION))) {
throw new IllegalArgumentException("Unexpected field name: " + field);
}
this.field = parts[1];
this.newFieldName = ExtractNewRecordStateConfigDefinition.METADATA_FIELD_PREFIX + this.struct + "_" + this.field;
}
else {
throw new IllegalArgumentException("Unexpected field name: " + field);
}
}
/**
* Determines the struct hosting the given unqualified field.
*/
private static String determineStruct(String simpleFieldName) {
if (simpleFieldName.equals(Envelope.FieldName.OPERATION) || simpleFieldName.equals(Envelope.FieldName.TIMESTAMP)) {
return null;
}
else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY) ||
simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY) ||
simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY)) {
return Envelope.FieldName.TRANSACTION;
}
else {
return Envelope.FieldName.SOURCE;
}
}
static List<FieldReference> fromConfiguration(String addHeadersConfig) {
if (Strings.isNullOrEmpty(addHeadersConfig)) {
return Collections.emptyList();
}
else {
return Arrays.stream(addHeadersConfig.split(","))
.map(String::trim)
.map(FieldReference::new)
.collect(Collectors.toList());
}
}
String getNewFieldName() {
return newFieldName;
}
Object getValue(Struct originalRecordValue) {
Struct parentStruct = struct != null ? (Struct) originalRecordValue.get(struct) : originalRecordValue;
// transaction is optional; e.g. not present during snapshotting atm.
return parentStruct != null ? parentStruct.get(field) : null;
}
Schema getSchema(Schema originalRecordSchema) {
Schema parentSchema = struct != null ? originalRecordSchema.field(struct).schema() : originalRecordSchema;
org.apache.kafka.connect.data.Field schemaField = parentSchema.field(field);
if (schemaField == null) {
throw new IllegalArgumentException("Unexpected field name: " + field);
}
return SchemaUtil.copySchemaBasics(schemaField.schema()).optional().build();
}
}
}

View File

@ -93,8 +93,9 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.LOW)
.withDefault(false)
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY + "'");
.withDescription("DEPRECATED. Please use the 'add.fields' option instead. "
+ "Adds the operation type of the change event as a header."
+ "Its key is '" + ExtractNewRecordStateConfigDefinition.DEBEZIUM_OPERATION_HEADER_KEY + "'");
public static final Field ROUTE_BY_FIELD = Field.create("route.by.field")
.withDisplayName("The column which determines how the events will be routed, the value will replace the topic name.")
@ -109,17 +110,18 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDefault("")
.withDescription("Adds each field listed from the 'source' element of the payload, prefixed with source__ "
+ "Example: 'version,connector' would add source__version and source__connector fields");
.withDescription("DEPRECATED. Please use the 'add.fields' option instead. "
+ "Adds each field listed from the 'source' element of the payload, prefixed with__ "
+ "Example: 'version,connector' would add __version and __connector fields");
public static final Field ADD_FIELDS = Field.create("add.fields")
.withDisplayName("Adds the specified fields to the message if they exist.")
.withDisplayName("Adds the specified field(s) to the message if they exist.")
.withType(ConfigDef.Type.LIST)
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDefault("")
.withDescription("Adds each field listed, prefixed with __ (or __source__ if the struct is specified) "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source__ts_ms fields");
.withDescription("Adds each field listed, prefixed with __ (or __<struct>_ if the struct is specified) "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields");
public static final Field ADD_HEADERS = Field.create("add.headers")
.withDisplayName("Adds the specified fields to the header if they exist.")
@ -127,7 +129,7 @@ public static DeleteHandling parse(String value, String defaultValue) {
.withWidth(ConfigDef.Width.LONG)
.withImportance(ConfigDef.Importance.LOW)
.withDefault("")
.withDescription("Adds each field listed to the header, __ (or __source__ if the struct is specified) "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source__ts_ms fields");
.withDescription("Adds each field listed to the header, __ (or __<struct>_ if the struct is specified) "
+ "Example: 'version,connector,source.ts_ms' would add __version, __connector and __source_ts_ms fields");
}

View File

@ -22,6 +22,7 @@
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
/**
* @author Jiri Pechanec
@ -124,13 +125,18 @@ private SourceRecord createUpdateRecord() {
final Struct before = new Struct(recordSchema);
final Struct after = new Struct(recordSchema);
final Struct source = new Struct(sourceSchema);
final Struct transaction = new Struct(TransactionMonitor.TRANSACTION_BLOCK_SCHEMA);
before.put("id", (byte) 1);
before.put("name", "myRecord");
after.put("id", (byte) 1);
after.put("name", "updatedRecord");
source.put("lsn", 1234);
transaction.put("id", "571");
transaction.put("total_order", 42L);
transaction.put("data_collection_order", 42L);
final Struct payload = envelope.update(before, after, source, Instant.now());
payload.put("transaction", transaction);
return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", envelope.schema(), payload);
}
@ -179,7 +185,9 @@ private String getSourceRecordHeaderByKey(SourceRecord record, String headerKey)
return null;
}
return operationHeader.next().value().toString();
Object value = operationHeader.next().value();
return value != null ? value.toString() : null;
}
@Test
@ -321,6 +329,7 @@ public void testAddSourceField() {
}
@Test
@FixFor("DBZ-1452")
public void testAddField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
@ -334,34 +343,56 @@ public void testAddField() {
}
@Test
@FixFor("DBZ-1452")
public void testAddFields() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,lsn");
props.put(ADD_FIELDS, "op,lsn,id");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.UPDATE.code());
assertThat(((Struct) unwrapped.value()).get("__lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("__id")).isEqualTo("571");
}
}
@Test
@FixFor("DBZ-1452")
public void testAddFieldsForMissingOptionalField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,lsn,id");
transform.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(((Struct) unwrapped.value()).get("__lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("__id")).isEqualTo(null);
}
}
@Test
@FixFor("DBZ-1452")
public void testAddFieldsSpecifyStruct() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "op,source.lsn");
props.put(ADD_FIELDS, "op,source.lsn,transaction.id,transaction.total_order");
transform.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(((Struct) unwrapped.value()).get("__source__lsn")).isEqualTo(1234);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.UPDATE.code());
assertThat(((Struct) unwrapped.value()).get("__source_lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("__transaction_id")).isEqualTo("571");
assertThat(((Struct) unwrapped.value()).get("__transaction_total_order")).isEqualTo(42L);
}
}
@Test
@FixFor("DBZ-1452")
public void testAddHeader() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
@ -377,36 +408,64 @@ public void testAddHeader() {
}
@Test
@FixFor("DBZ-1452")
public void testAddHeaders() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,lsn");
props.put(ADD_HEADERS, "op,lsn,id");
transform.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(unwrapped.headers()).hasSize(2);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(unwrapped.headers()).hasSize(3);
String headerValue = getSourceRecordHeaderByKey(unwrapped, "__op");
assertThat(headerValue).isEqualTo(Envelope.Operation.CREATE.code());
assertThat(headerValue).isEqualTo(Envelope.Operation.UPDATE.code());
headerValue = getSourceRecordHeaderByKey(unwrapped, "__lsn");
assertThat(headerValue).isEqualTo(String.valueOf(1234));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__id");
assertThat(headerValue).isEqualTo(String.valueOf(571L));
}
}
@Test
public void testAddHeadersSpecifyStruct() {
@FixFor("DBZ-1452")
public void testAddHeadersForMissingOptionalField() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.lsn");
props.put(ADD_HEADERS, "op,lsn,id");
transform.configure(props);
final SourceRecord createRecord = createCreateRecord();
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(unwrapped.headers()).hasSize(2);
assertThat(unwrapped.headers()).hasSize(3);
String headerValue = getSourceRecordHeaderByKey(unwrapped, "__op");
assertThat(headerValue).isEqualTo(Envelope.Operation.CREATE.code());
headerValue = getSourceRecordHeaderByKey(unwrapped, "__source__lsn");
headerValue = getSourceRecordHeaderByKey(unwrapped, "__lsn");
assertThat(headerValue).isEqualTo(String.valueOf(1234));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__id");
assertThat(headerValue).isNull();
}
}
@Test
@FixFor("DBZ-1452")
public void testAddHeadersSpecifyStruct() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "op,source.lsn,transaction.id,transaction.total_order");
transform.configure(props);
final SourceRecord updateRecord = createUpdateRecord();
final SourceRecord unwrapped = transform.apply(updateRecord);
assertThat(unwrapped.headers()).hasSize(4);
String headerValue = getSourceRecordHeaderByKey(unwrapped, "__op");
assertThat(headerValue).isEqualTo(Envelope.Operation.UPDATE.code());
headerValue = getSourceRecordHeaderByKey(unwrapped, "__source_lsn");
assertThat(headerValue).isEqualTo(String.valueOf(1234));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__transaction_id");
assertThat(headerValue).isEqualTo(String.valueOf(571L));
headerValue = getSourceRecordHeaderByKey(unwrapped, "__transaction_total_order");
assertThat(headerValue).isEqualTo(String.valueOf(42L));
}
}
@ -512,6 +571,7 @@ public void testAddSourceFieldsHandleDeleteRewrite() {
}
@Test
@FixFor("DBZ-1452")
public void testAddFieldHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
@ -527,6 +587,7 @@ public void testAddFieldHandleDeleteRewrite() {
}
@Test
@FixFor("DBZ-1452")
public void testAddFieldsHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
@ -543,6 +604,7 @@ public void testAddFieldsHandleDeleteRewrite() {
}
@Test
@FixFor("DBZ-1452")
public void testAddFieldsSpecifyStructHandleDeleteRewrite() {
try (final ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>()) {
final Map<String, String> props = new HashMap<>();
@ -554,7 +616,7 @@ public void testAddFieldsSpecifyStructHandleDeleteRewrite() {
final SourceRecord unwrapped = transform.apply(deleteRecord);
assertThat(((Struct) unwrapped.value()).getString("__deleted")).isEqualTo("true");
assertThat(((Struct) unwrapped.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
assertThat(((Struct) unwrapped.value()).get("__source__lsn")).isEqualTo(1234);
assertThat(((Struct) unwrapped.value()).get("__source_lsn")).isEqualTo(1234);
}
}

View File

@ -14,10 +14,10 @@ This SMT is supported only for the SQL database connectors, it does not work wit
See xref:configuration/mongodb-event-flattening.adoc[here] for the MongoDB equivalent to this SMT.
====
Debezium generates a data change in a form of a complex message structure.
The message consists of three parts:
Debezium generates data change events in a form of a complex message structure.
Each events consists of three parts:
* metadata, comprising the type of operation, information on the event source and a timestamp
* metadata, comprising the type of operation, information on the event source, a timestamp, and optionally transaction information
* the row data before change
* the row data after change
@ -62,7 +62,7 @@ It
* extracts the `after` field from change events and replaces the original event just with this part
* optionally filters delete and tombstone records, as per the capabilities and requirements of downstream consumers
* optionally adds metadata fields from the change event
* optionally adds metadata fields from the change event to the outgoing flattened record
* optionally add metadata fields to the header
The SMT can be applied either to a source connector (Debezium) or a sink connector.
@ -102,7 +102,7 @@ The SMT by default filters out *both* delete records as widely used sink connect
The SMT can optionally add metadata fields from the original change event to the final flattened record. This functionality can be used to add things like the operation or the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available see xref:connectors/index.adoc[the documentation for each connector].
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>__", depending on the specification of the struct. Please use a comma separated list without spaces.
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>_", depending on the specification of the struct. Please use a comma separated list without spaces.
For example, the configuration
@ -115,7 +115,7 @@ transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
will add
----
{ "__op" : "c", __table": "MY_TABLE", "__lsn": "123456789", "__source__ts_ms" : "123456789", ...}
{ "__op" : "c", __table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ...}
----
to the final flattened record.
@ -126,7 +126,7 @@ For `DELETE` events, this option is only supported when the `delete.handling.mod
The SMT can optionally add metadata fields from the original change event to the header of the final flattened record. This functionality can be used to add things like the operation or the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available see xref:connectors/index.adoc[the documentation for each connector].
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>__", depending on the specification of the struct. Please use a comma separated list without spaces.
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>_", depending on the specification of the struct. Please use a comma separated list without spaces.
For example, the configuration
@ -136,7 +136,7 @@ transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.headers=op,table,lsn,source.ts_ms
----
will add headers `__op`, `__table`, `__lsn` and `__source_ts_ms` to the outgoing record.
=== Determine original operation [DEPRECATED]