22c6901e3c
* Describing new MongoDB SMT option * Adding missing docs for operation.header for relational SMT
160 lines
5.9 KiB
Plaintext
160 lines
5.9 KiB
Plaintext
= New Record State Extraction
|
|
include::../_attributes.adoc[]
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
[NOTE]
|
|
====
|
|
This SMT is supported only for the SQL database connectors, it does not work with the MongoDB connector.
|
|
See xref:configuration/mongodb-event-flattening.adoc[here] for the MongoDB equivalent to this SMT.
|
|
====
|
|
|
|
Debezium generates a data change in a form of a complex message structure.
|
|
The message consists of three parts:
|
|
|
|
* metadata, comprising the type of operation, information on the event source and a timestamp
|
|
* the row data before change
|
|
* the row data after change
|
|
|
|
E.g. the general message structure for an `update` change looks like this:
|
|
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"op": "u",
|
|
"source": {
|
|
...
|
|
},
|
|
"ts_ms" : "...",
|
|
"before" : {
|
|
"field1" : "oldvalue1",
|
|
"field2" : "oldvalue2"
|
|
},
|
|
"after" : {
|
|
"field1" : "newvalue1",
|
|
"field2" : "newvalue2"
|
|
}
|
|
}
|
|
----
|
|
|
|
More details about the message structure are provided in xref:connectors/index.adoc[the documentation for each connector].
|
|
|
|
This format allows the user to get most information about changes happening in the system.
|
|
The downside of using the complex format is that other connectors or other parts of the Kafka ecosystem usually expect the data in a simple message format that can generally be described like so:
|
|
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"field1" : "newvalue1",
|
|
"field2" : "newvalue2"
|
|
}
|
|
----
|
|
|
|
Debezium provides https://kafka.apache.org/documentation/#connect_transforms[a single message transformation] that crosses the bridge between the complex and simple formats, the https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java[ExtractNewRecordState] SMT.
|
|
|
|
The SMT provides three main functions.
|
|
It
|
|
|
|
* extracts the `after` field from change events and replaces the original event just with this part
|
|
* optionally filters delete and tombstone records, as per the capabilities and requirements of downstream consumers
|
|
* optionally adds metadata fields from the change event's `source` field, prefixing them with "__"
|
|
|
|
The SMT can be applied either to a source connector (Debezium) or a sink connector.
|
|
We generally recommend to apply the transformation on the sink side as it means that the messages stored in Apache Kafka will contain the whole context.
|
|
The final decision depends on use case for each user.
|
|
|
|
== Configuration
|
|
The configuration is a part of source/sink task connector and is expressed in a set of properties:
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
|
|
transforms.unwrap.drop.tombstones=false
|
|
transforms.unwrap.delete.handling.mode=rewrite
|
|
transforms.unwrap.add.source.fields=table,lsn
|
|
----
|
|
|
|
=== Record filtering for delete records
|
|
|
|
The SMT provides a special handling for events that signal a `delete` operation.
|
|
When a `DELETE` is executed on a datasource then Debezium generates two events:
|
|
|
|
* a record with `d` operation that contains only old row data
|
|
* (optionally) a record with `null` value and the same key (a "tombstone" message). This record serves as a marker for Apache Kafka that all messages with this key can be removed from the topic during https://kafka.apache.org/documentation/#compaction[log compaction].
|
|
|
|
Upon processing these two records, the SMT can pass on the `d` record as is,
|
|
convert it into another tombstone record or drop it.
|
|
The original tombstone message can be passed on as is or also be dropped.
|
|
|
|
[NOTE]
|
|
====
|
|
The SMT by default filters out *both* delete records as widely used sink connectors do not support handling of tombstone messages at this point.
|
|
====
|
|
|
|
=== Determine original operation
|
|
|
|
When a message is flattened the final result won't show whether it was an insert, update or first read
|
|
(deletions can be detected via tombstones or rewrites, see link:#configuration_options[Configuration options]).
|
|
|
|
To solve this problem Debezium offers an option to propagate the original operation via a header added to the message.
|
|
To enable this feature the option `operation.header` must be set to `true`.
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
|
|
transforms.unwrap.operation.header=true
|
|
----
|
|
|
|
The possible values are the ones from the `op` field of the original change event.
|
|
|
|
|
|
=== Adding source metadata fields
|
|
|
|
The SMT can optionally add metadata fields from the original change event's `source` structure to the final flattened record (prefixed with "__"). This functionality can be used to add things like the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available in the source structure see xref:connectors/index.adoc[the documentation for each connector].
|
|
|
|
For example, the configuration
|
|
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
|
|
transforms.unwrap.add.source.fields=table,lsn
|
|
----
|
|
|
|
will add
|
|
|
|
----
|
|
{ "__table": "MY_TABLE", "__lsn": "123456789", ...}
|
|
----
|
|
|
|
to the final flattened record.
|
|
|
|
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
|
|
|
|
[[configuration_options]]
|
|
== Configuration options
|
|
[cols="35%a,10%a,55%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|
|
|=======================
|
|
|Property
|
|
|Default
|
|
|Description
|
|
|
|
|`operation.header`
|
|
|`false`
|
|
|The SMT adds the event operation (as obtained from the `op` field of the original record) as a message header.
|
|
|
|
|`drop.tombstones`
|
|
|`true`
|
|
|The SMT removes the tombstone generated by Debezium from the stream.
|
|
|
|
|`delete.handling.mode`
|
|
|`drop`
|
|
|The SMT can `drop` (the default), `rewrite` or pass delete events (`none`). The rewrite mode will add a `__deleted` column with true/false values based on record operation.
|
|
|
|
|`add.source.fields`
|
|
|
|
|
|Fields from the change event's `source` structure to add as metadata (prefixed with "__") to the flattened record
|
|
|=======================
|