This single message transformation (SMT) is supported for only the SQL database connectors. For the MongoDB connector, see the xref:{link-mongodb-event-flattening}[documentation for the MongoDB equivalent to this SMT].
A {prodname} data change event has a complex structure that provides a wealth of information. Kafka records that convey {prodname} change events contain all of this information.
However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values.
To provide this kind of record, {prodname} provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events.
The event flattening SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
You can configure the event flattening SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
* Keep Kafka records that contain change events for `DELETE` operations in the stream. The default behavior is that the SMT drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them.
* Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry.
Configure the {prodname} event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration. To obtain the default behavior, in a `.properties` file, you would specify something like the following:
As for any Kafka Connect connector configuration, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
`delete.handling.mode=rewrite`:: For `DELETE` operations, edits the Kafka record by flattening the `value` field that was in the change event. The `value` field directly contains the key/value pairs that were in the `before` field. The SMT adds `__deleted` and sets it to `true`, for example:
The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record's header or value to contain any of the following:
Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
== Options for applying the event-flattening transformation selectively
In addition to the change event messages that a {prodname} connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions.
Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it's best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see xref:{link-smt-predicates}#applying-transformation-selectively[Configure an SMT predicate for the transformation].
|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
Specify `rewrite` to keep the change event record in the stream and edit the record to have a `value` field that contains the key/value pairs that were in the `before` field and also add `+__deleted: true+` to the `value`. This is another way to indicate that the record has been deleted. +
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the {prodname} connector creates.
|To use row data to determine the topic to route the record to, set this option to an `after` field attribute. The SMT routes the record to the topic whose name matches the value of the specified `after` field attribute. For a `DELETE` operation, set this option to a `before` field attribute. +
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record's value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
When the SMT adds metadata fields to the simplified record's value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT does not add the field to the header.