tet123/documentation/modules/ROOT/pages/configuration/event-flattening.adoc

237 lines
10 KiB
Plaintext
Raw Normal View History

= New Record State Extraction
include::../_attributes.adoc[]
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
[NOTE]
====
This SMT is supported only for the SQL database connectors, it does not work with the MongoDB connector.
See xref:configuration/mongodb-event-flattening.adoc[here] for the MongoDB equivalent to this SMT.
====
Debezium generates data change events in a form of a complex message structure.
2020-02-12 19:37:58 +01:00
Each event consists of three parts:
* metadata, comprising the type of operation, information on the event source, a timestamp, and optionally transaction information
* the row data before change
* the row data after change
E.g. the general message structure for an `update` change looks like this:
[source,json,indent=0]
----
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
----
More details about the message structure are provided in xref:connectors/index.adoc[the documentation for each connector].
This format allows the user to get most information about changes happening in the system.
The downside of using the complex format is that other connectors or other parts of the Kafka ecosystem usually expect the data in a simple message format that can generally be described like so:
[source,json,indent=0]
----
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
----
Debezium provides {link-kafka-docs}/#connect_transforms[a single message transformation] that crosses the bridge between the complex and simple formats, the https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java[ExtractNewRecordState] SMT.
The SMT provides three main functions.
It
* extracts the `after` field from change events and replaces the original event just with this part
* optionally filters delete and tombstone records, as per the capabilities and requirements of downstream consumers
* optionally adds metadata fields from the change event to the outgoing flattened record
* optionally add metadata fields to the header
The SMT can be applied either to a source connector (Debezium) or a sink connector.
We generally recommend to apply the transformation on the sink side as it means that the messages stored in Apache Kafka will contain the whole context.
The final decision depends on use case for each user.
== Configuration
The configuration is a part of source/sink task connector and is expressed in a set of properties:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.source.fields=table,lsn
----
=== Record filtering for delete records
The SMT provides a special handling for events that signal a `delete` operation.
When a `DELETE` is executed on a datasource then Debezium generates two events:
* a record with `d` operation that contains only old row data
* (optionally) a record with `null` value and the same key (a "tombstone" message). This record serves as a marker for Apache Kafka that all messages with this key can be removed from the topic during {link-kafka-docs}/#compaction[log compaction].
Upon processing these two records, the SMT can pass on the `d` record as is,
convert it into another tombstone record or drop it.
The original tombstone message can be passed on as is or also be dropped.
[NOTE]
====
The SMT by default filters out *both* delete records as widely used sink connectors do not support handling of tombstone messages at this point.
====
=== Adding metadata fields to the message
The SMT can optionally add metadata fields from the original change event to the final flattened record. This functionality can be used to add things like the operation or the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available see xref:connectors/index.adoc[the documentation for each connector].
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>_", depending on the specification of the struct. Please use a comma separated list without spaces.
For example, the configuration
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
----
will add
----
{ "__op" : "c", __table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ...}
----
to the final flattened record.
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
=== Adding metadata fields to the header
The SMT can optionally add metadata fields from the original change event to the header of the final flattened record. This functionality can be used to add things like the operation or the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available see xref:connectors/index.adoc[the documentation for each connector].
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>_", depending on the specification of the struct. Please use a comma separated list without spaces.
For example, the configuration
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.headers=op,table,lsn,source.ts_ms
----
will add headers `__op`, `__table`, `__lsn` and `__source_ts_ms` to the outgoing record.
ifndef::cdc-product[]
// Do not include deprecated content in downstream doc
=== Determine original operation [DEPRECATED]
_The `operation.header` option is deprecated and scheduled for removal. Please use add.headers instead. If both add.headers and operation.header are specified, the latter will be ignored._
When a message is flattened the final result won't show whether it was an insert, update or first read
(deletions can be detected via tombstones or rewrites, see link:#configuration_options[Configuration options]).
To solve this problem Debezium offers an option to propagate the original operation via a header added to the message.
To enable this feature the option `operation.header` must be set to `true`.
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.operation.header=true
----
The possible values are the ones from the `op` field of the original change event.
endif::cdc-product[]
ifndef::cdc-product[]
// Do not include deprecated content in downstream doc
=== Adding source metadata fields [DEPRECATED]
_The `add.source.fields` option is deprecated and scheduled for removal. Please use add.fields instead. If both add.fields and add.source.fields are specified, the latter will be ignored._
The SMT can optionally add metadata fields from the original change event's `source` structure to the final flattened record (prefixed with "__"). This functionality can be used to add things like the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available in the source structure see xref:connectors/index.adoc[the documentation for each connector].
For example, the configuration
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
----
will add
----
{ "__table": "MY_TABLE", "__lsn": "123456789", ...}
----
to the final flattened record.
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
endif::cdc-product[]
[[configuration_options]]
== Configuration options
[cols="35%a,10%a,55%a",options="header"]
|=======================
|Property
|Default
|Description
|`drop.tombstones`
|`true`
|The SMT removes the tombstone generated by Debezium from the stream.
|`delete.handling.mode`
|`drop`
|The SMT can `drop` (the default), `rewrite` or pass delete events (`none`). The rewrite mode will add a `__deleted` column with true/false values based on record operation.
|`route.by.field`
|
|The column which determines how the events will be routed, the value will the topic name; obtained from the old record state for delete events, and from the new record state otherwise
|`add.fields`
|
|Specify a list of metadata fields to add to the flattened message. In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>__", depending on the specification of the struct. Please use a comma separated list without spaces.
|`add.headers`
|
|Specify a list of metadata fields to add to the header of the flattened message. In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms"). The fields will be prefixed with "\\__" or "__<struct>__", depending on the specification of the struct. Please use a comma separated list without spaces.
ifndef::cdc-product[]
// Do not include deprecated content in downstream doc
|`operation.header` DEPRECATED
|`false`
|_This option is deprecated and scheduled for removal. Please use add.headers instead. If both add.headers and operation.header are specified, the latter will be ignored._
The SMT adds the event operation (as obtained from the `op` field of the original record) as a message header.
endif::cdc-product[]
ifndef::cdc-product[]
// Do not include deprecated content in downstream doc
|`add.source.fields` DEPRECATED
|
|_This option is deprecated and scheduled for removal. Please use add.fields instead. If both add.fields and add.source.fields are specified, the latter will be ignored._
2020-03-10 12:15:22 +01:00
Fields from the change event's `source` structure to add as metadata (prefixed with "__") to the flattened record.
endif::cdc-product[]
|=======================