DBZ-5283 Document ExtractChangedRecordState SMT

This commit is contained in:
Chris Cranford 2023-01-17 13:39:52 -05:00 committed by Jiri Pechanec
parent f8eb32cf6f
commit a330c39f8b
2 changed files with 150 additions and 0 deletions

View File

@ -24,6 +24,7 @@
** xref:transformations/topic-routing.adoc[Topic Routing]
** xref:transformations/event-flattening.adoc[New Record State Extraction]
** xref:transformations/mongodb-event-flattening.adoc[MongoDB New Document State Extraction]
** xref:transformations/event-changes.adoc[Event Changes]
** xref:transformations/outbox-event-router.adoc[Outbox Event Router]
** xref:transformations/mongodb-outbox-event-router.adoc[MongoDB Outbox Event Router]
** xref:transformations/filtering.adoc[Message Filtering]

View File

@ -0,0 +1,149 @@
:page-aliases: configuration/event-changes.adoc
[id="event-record-changes"]
= Event Record Changes
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highligher: highlight.js
toc::[]
ifdef::community[]
[NOTE]
====
This single message transformation (SMT) is supported for only the SQL database connectors.
====
endif::community[]
A {prodname} data change event has a complex structure that provides a wealth of information.
However, parts of the chagne data capture pipeline may require knowledge of what fields were changed or unchanged when the event occurred in the source system.
To provide this information, {prodname} provides a event record changes single message transformation (SMT).
The event changes transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
== Change event structure
{prodname} generates data change events that have a complex structure.
Each event consists of three parts:
* Metadata, which includes but is not limited to:
** The operation that made the change
** Source information such as the names of the database and table where the change was made
** Time stamp for when the change was made
** Optional transaction information
* Row data before the change
* Row data after the change
For example, part of the structure of an `UPDATE` change event looks like this:
[source,json,indent=0]
----
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
----
ifdef::community[]
More details about change event structure are provided in
xref:{link-connectors}[the documentation for each connector].
endif::community[]
This complex format provides the most information about changes happening in the system.
However, other connectors or other parts of the Kafka ecosystem may expect knowledge of what fields changed or are unchanged, and this event changes SMT allows exposing that information via event headers.
[[event-changes-behavior]]
== Behavior
The event changes SMT extracts the `before` and `after` fields from a {prodname} `UPDATE` change event in a Kafka record.
The SMT will examine each of the fields within these two event state structures and writes the names of fields that changed or are unchanged based on the configuration supplied to the SMT.
If the event represents an `INSERT` or `DELETE`, this single message transformation has no effect.
You can configure the event changes SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector.
The advantage of configuring the event changes for a sink connector is that the records stored in Apache Kafka contain whole {prodname} change events.
The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
* Record the fields that changed in the `UPDATE` in a user-configured header.
* Record the fields that did not change in the `UPDATE` in a user-configured header.
The single message transformation can be configured to include both changed and unchanged fields or either, depending on your use case.
== Configuration
Configure the {prodname} event changes SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration.
To obtain the default behavior which doesn't add any headers, in a `.properties` file, you would specify something like the following:
[source]
----
transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
----
As with any Kafka Connect connector configuration, you can set `transforms=` to multiple, comma-seperated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following `.properties` example sets several event changes SMT options:
[source]
----
transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
----
`header.changed.name`:: Specifies the name of the header that will include a comma-separated list of fields that were changed.
`header.unchanged.name`:: Specifies the name of the header that will include a comma-separated list of fields that were not changed.
.Customizing the configuration
The connector might emit many types of event messages (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes).
To apply the transformation to a subset of events, you can define xref:options-for-applying-the-transformation-selectively[an SMT predicate statement that selectively applies the transformation] to specific events only.
[id="options-for-applying-the-event-flattening-transformation-selectively"]
== Options for applying the event changes transformation selectively
In addition to the change event messages that a {prodname} connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions.
Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it's best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
For more information about how to apply the SMT selectively, see xref:{link-smt-predicates}#applying-transformations-selectively[Configure an SMT predicate for the transformation].
ifdef::community[]
[id="configuration-options"]
endif::community[]
== Configuration options
The following table describes the options that you can specify to configure the event changes SMT.
.Descriptions of event changes SMT configuration options
[cols="30%a,25%a,45%a",subs="+attributes",options="header"]
|===
|Option
|Default
|Description
|[[extract-changes-header-changed-name]]<<extract-changes-header-changed-name, `+header.changed.name+`>>
|
|Specifies the name of the header that will include a comma-separated list of fields that were changed.
|[[extract-changes-header-unchanged-name]]<<extract-changes-header-unchanged-name, `+header.unchanged.name+`>>
|
|Specifies the name of the header that will include a comma-separated list of fields that were not changed.