tet123/documentation/modules/ROOT/pages/transformations/event-flattening.adoc

357 lines
19 KiB
Plaintext
Raw Normal View History

2021-08-20 12:21:04 +02:00
:page-aliases: configuration/event-flattening.adoc
// Category: debezium-using
// Type: assembly
// ModuleID: extracting-source-record-after-state-from-debezium-change-events
// Title: Extracting source record `after` state from {prodname} change events
[id="new-record-state-extraction"]
= New Record State Extraction
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
{prodname} connectors emits data change messages to represent each operation that they capture from a source database.
The messages that a connector sends to Apache Kafka have a complex structure that faithfully represent the details of the original database event.
Although this complex message format accurately details information about changes that happen in the system, the format might not be suitable for some downstream consumers.
Sink connectors, or other parts of the Kafka ecosystem might require messages that are formatted so that field names and values are presented in a simplified, flattened structure.
To simplify the format of the event records that the {prodname} connectors produce, you can use the {prodname} event flattening single message transformation (SMT).
Configure the transformation to support consumers that require Kafka records to be in a format that is simpler than the default format that that the connector produces.
Depending on your particular use case, you can apply the SMT to a {prodname} connector, or to a sink connector that consumes messages that the {prodname} connector produces.
To enable Apache Kafka to retain the {prodname} change event messages in their original format, configure the SMT for a sink connector.
The event flattening transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
[NOTE]
====
The information in this chapter describes the event flattening single message transformation (SMT) for {prodname} SQL-based database connectors.
For information about an equivalent SMT for the {prodname} MongoDB connector, see {link-prefix}:{link-mongodb-event-flattening}#mongodb-new-document-state-extraction[MongoDB New Document State Extraction].
====
ifdef::product[]
The following topics provide details:
* xref:description-of-debezium-change-event-structure[]
* xref:behavior-of-debezium-event-flattening-transformation[]
* xref:configuration-of-debezium-event-flattening-transformation[]
* xref:example-of-adding-debezium-metadata-to-the-kafka-record[]
* xref:options-for-configuring-debezium-event-flattening-transformation[]
endif::product[]
// Type: concept
// ModuleID: description-of-debezium-change-event-structure
// Title: Description of {prodname} change event structure
== Change event structure
{prodname} generates data change events that have a complex structure.
2020-02-12 19:37:58 +01:00
Each event consists of three parts:
* Metadata, which includes but is not limited to:
** The type of operation that changed the data.
** Source information, such as the names of the database and the table in which the change occurred.
** Timestamp that identifies when the change was made.
** Optional transaction information.
* Row data before the change
* Row data after the change
The following example shows part of the message structure for an `UPDATE` change event:
[source,json,indent=0]
----
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
2024-02-17 02:15:16 +01:00
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
----
For more information about the change event structure for a connector, see
ifdef::product[]
the documentation for the connector.
endif::product[]
ifdef::community[]
xref:{link-connectors}[the documentation for the connector].
endif::community[]
After the event flattening SMT processes the message in the previous example, it simplifies the message format, resulting in the message in the following example:
[source,json,indent=0]
----
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
----
// Type: concept
// ModuleID: behavior-of-debezium-event-flattening-transformation
// Title: Behavior of {prodname} event flattening transformation
[[event-flattening-behavior]]
== Behavior
The event flattening SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
You can configure the event flattening SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
* Add metadata from the change event to the simplified Kafka record. The default behavior is that the SMT does not add metadata.
* Keep Kafka records that contain change events for `DELETE` operations in the stream. The default behavior is that the SMT drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them.
A database `DELETE` operation causes {prodname} to generate two Kafka records:
* A record that contains `"op": "d",` the `before` row data, and some other fields.
* A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that
link:{link-kafka-docs}/#compaction[log compaction] can remove all records that have this key.
Instead of dropping the record that contains the `before` row data, you can configure the event flattening SMT to do one of the following:
* Keep the record in the stream and edit it to have only the `"value": "null"` field.
* Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry.
Similarly, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.
// Type: concept
// ModuleID: configuration-of-debezium-event-flattening-transformation
// Title: Configuration of {prodname} event flattening transformation
== Configuration
Configure the {prodname} event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration.
For example, to obtain the default behavior of the transformation, add it to the connector configuration without specifying any options, as in the following example:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
----
As with any Kafka Connect connector configuration, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following `.properties` example sets several event flattening SMT options:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
----
`drop.tombstones=false`:: Keeps tombstone records for `DELETE` operations in the event stream.
`delete.handling.mode=rewrite`:: For `DELETE` operations, edits the Kafka record by flattening the `value` field that was in the change event. The `value` field directly contains the key/value pairs that were in the `before` field. The SMT adds `__deleted` and sets it to `true`, for example:
+
[source,json,indent=0]
----
"value": {
"pk": 2,
"cola": null,
"__deleted": "true"
}
----
`add.fields=table,lsn`:: Adds change event metadata for the `table` and `lsn` fields to the simplified Kafka record.
.Customizing the configuration
The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes).
2023-10-10 00:14:21 +02:00
To apply the transformation to a subset of events, you can define xref:options-for-applying-the-event-flattening-transformation-selectively[an SMT predicate statement that selectively applies the transformation] to specific events only.
// Type: concept
// ModuleID: example-of-adding-debezium-metadata-to-the-kafka-record
// Title: Example of adding {prodname} metadata to the Kafka record
== Adding metadata
You can configure the event flattening SMT to add original change event metadata to the simplified Kafka record.
For example, you might want the simplified record's header or value to contain any of the following:
* The type of operation that made the change
* The name of the database or table that was changed
* Connector-specific fields such as the Postgres LSN field
ifdef::community[]
For more information on what is available see xref:{link-connectors}[the documentation for each connector].
endif::community[]
2021-10-14 16:57:26 +02:00
To add metadata to the simplified Kafka record's header, specify the `add.headers` option.
To add metadata to the simplified Kafka record's value, specify the `add.fields` option.
Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite
----
With that configuration, a simplified Kafka record would contain something like the following:
[source,json,indent=0]
----
{
...
"__op" : "c",
"__table": "MY_TABLE",
"__lsn": "123456789",
"__source_ts_ms" : "123456789",
...
}
----
Also, simplified Kafka records would have a `__db` header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata to a simplified Kafka record that is for a `DELETE` operation, you must also configure `delete.handling.mode=rewrite`.
// Type: concept
// Title: Options for applying the event flattening transformation selectively
// ModuleID: options-for-applying-the-event-flattening-transformation-selectively
2023-10-10 00:14:21 +02:00
[id="options-for-applying-the-event-flattening-transformation-selectively"]
== Options for applying the event-flattening transformation selectively
In addition to the change event messages that a {prodname} connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions.
Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it's best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see {link-prefix}:{link-smt-predicates}#applying-the-event-flattening-transformation-selectively[Configure an SMT predicate for the transformation].
ifdef::community[]
[id="configuration-options"]
endif::community[]
// Type: reference
// ModuleID: options-for-configuring-debezium-event-flattening-transformation
// Title: Options for configuring {prodname} event flattening transformation
== Configuration options
The following table describes the options that you can specify to configure the event flattening SMT.
.Descriptions of event flattening SMT configuration options
[cols="30%a,25%a,45%a",subs="+attributes",options="header"]
|===
|Option
|Default
|Description
|[[extract-new-record-state-drop-tombstones]]xref:extract-new-record-state-drop-tombstones[`drop.tombstones`]
|`true`
|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
[NOTE]
====
This option is scheduled for removal in a future release.
In its place, use the xref:extract-new-record-state-delete-tombstone-handling-mode[`delete.tombstone.handling.mode`] option.
====
|[[extract-new-record-state-delete-handling-mode]]xref:extract-new-record-state-delete-handling-mode[`delete.handling{zwsp}.mode`]
|`drop`
|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
+
Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. +
+
Specify `rewrite` to keep the change event record in the stream and edit the record to have a `value` field that contains the key/value pairs that were in the `before` field and also add `+__deleted: true+` to the `value`. This is another way to indicate that the record has been deleted. +
+
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the {prodname} connector creates.
[NOTE]
====
This option is scheduled for removal in a future release.
In its place, use the xref:extract-new-record-state-delete-tombstone-handling-mode[`delete.tombstone.handling.mode`] option.
====
|[[extract-new-record-state-delete-tombstone-handling-mode]]xref:extract-new-record-state-delete-tombstone-handling-mode[`delete.tombstone.handling.mode`]
|No default
|{prodname} generates a change event record for each `DELETE` operation. This setting determines how the event flattening SMT handles `DELETE` events from the stream.
[NOTE]
====
The setting for this option takes precedence over any conflicting settings that you might configure for the deprecated xref:extract-new-record-state-drop-tombstones[`drop.tombstones`] or xref:extract-new-record-state-delete-handling-mode[`delete.handling.mode`] options.
====
Set one of the following options:
`drop`:: The SMT removes both the `DELETE` event and `TOMBSTONE` from the stream.
`tombstone` (default):: The SMT retains `TOMBSTONE` records in the stream.
The `TOMBSTONE` record contains only the following value: `"value": "null"`.
`rewrite`:: The SMT retains the change event record in the stream and makes the following changes:
* Adds a `value` field to the record that contains the key/value pairs from the `before` field of the original record.
* Adds `+__deleted: true+` to the `value` of the record.
* Removes `TOMBSTONE` records.
+
This setting provides another way to indicate that the record has been deleted.
`rewrite-with-tombstone`:: The SMT behaves as it does when you select the `rewrite` option, except that it also retains `TOMBSTONE` records.
|[[extract-new-record-state-route-by-field]]xref:extract-new-record-state-route-by-field[`route.by.field`]
|
|To use row data to determine the topic to route the record to, set this option to an `after` field attribute. The SMT routes the record to the topic whose name matches the value of the specified `after` field attribute. For a `DELETE` operation, set this option to a `before` field attribute. +
+
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
+
If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
|[[extract-new-record-state-add-fields-prefix]]xref:extract-new-record-state-add-fields-prefix[`add.fields.prefix`]
| __ (double-underscore)
|Set this optional string to prefix a field.
|[[extract-new-record-state-add-fields]]xref:extract-new-record-state-add-fields[`add.fields`]
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record's value. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
+
When the SMT adds metadata fields to the simplified record's value, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT still adds the field to the record's value.
|[[extract-new-record-state-add-headers-prefix]]xref:extract-new-record-state-add-headers-prefix[`add.headers.prefix`]
| __ (double-underscore)
|Set this optional string to prefix a header.
|[[extract-new-record-state-add-headers]]xref:extract-new-record-state-add-headers[`add.headers`]
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
Optionally, you can override the field name via `<field name>:<new field name>`, e.g. like so: new field name like `version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP`. Please note that the `new field name` is case-sensitive. +
+
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT does not add the field to the header.
|[[extract-new-record-state-drop-fields-header-name]]xref:extract-new-record-state-drop-fields-header-name[`drop.fields.header.name`]
|
2023-01-24 21:57:59 +01:00
|The Kafka message header name to use for listing field names in the source message that you want to drop from the output message.
|[[extract-new-record-state-drop-fields-from-key]]xref:extract-new-record-state-drop-fields-from-key[`drop.fields.from.key`]
|`false`
2023-01-24 21:57:59 +01:00
|Specifies whether you want the SMT to remove fields that are listed in `drop.fields.header.name` from the event's key.
|[[extract-new-record-state-drop-fields-keep-schema-compatible]]xref:extract-new-record-state-drop-fields-keep-schema-compatible[`drop.fields.keep.schema.compatible`]
|`true`
2023-01-24 21:57:59 +01:00
|Specifies whether you want the SMT to remove non-optional fields that are included in the xref:extract-new-record-state-drop-fields-header-name[`drop.fields.header.name`] configuration property. +
+
2023-01-24 21:57:59 +01:00
By default, the SMT only removes fields that are marked `optional`.
|===