tet123/documentation/modules/ROOT/pages/configuration/event-flattening.adoc

348 lines
16 KiB
Plaintext
Raw Normal View History

// Category: cdc-using
// Type: assembly
ifdef::community[]
[id="new-record-state-extraction"]
= New Record State Extraction
include::../_attributes.adoc[]
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
[NOTE]
====
This single message transformation (SMT) is supported for only the SQL database connectors. For the MongoDB connector, see the xref:configuration/mongodb-event-flattening.adoc[documentation for the MongoDB equivalent to this SMT].
====
endif::community[]
ifdef::product[]
[id="extracting-source-record-after-state-from-debezium-change-events"]
= Extracting source record `after` state from Debezium change events
endif::product[]
A Debezium database change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information.
However, parts of a Kafka ecosystem expect Kafka records that provide a flat structure of field names and values.
To provide this kind of record, Debezium provides the `ExtractNewRecordState` single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
The `ExtractNewRecordState` transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
ifdef::product[]
The transformation is available to only SQL database connectors.
The following topics provide details:
* xref:description-of-debezium-change-event-structure[]
* xref:behavior-of-debezium-extractnewrecordstate-transformation[]
* xref:configuration-of-extractnewrecordstate-transformation[]
* xref:example-of-adding-metadata-to-the-kafka-record-or-its-header[]
* xref:options-for-configuring-extractnewrecordstate-transformation[]
// Type: concept
[id="description-of-debezium-change-event-structure"]
== Description of Debezium change event structure
endif::product[]
ifdef::community[]
== Change event structure
endif::community[]
Debezium generates database change events that have a complex structure.
2020-02-12 19:37:58 +01:00
Each event consists of three parts:
* Metadata, which includes but is not limited to:
** The operation that made the change
** Source information such as the names of the database and table where the change was made
** Time stamp for when the change was made
** Optional transaction information
* Row data before the change
* Row data after the change
For example, the structure of an `UPDATE` change event looks like this:
[source,json,indent=0]
----
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
----
ifdef::community[]
More details about change event structure are provided in
xref:connectors/index.adoc[the documentation for each connector].
endif::community[]
This complex format provides the most information about changes happening in the system.
However, other connectors or other parts of the Kafka ecosystem usually expect the data in a simple format like this:
[source,json,indent=0]
----
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
----
To provide the needed Kafka record format for consumers, configure the `ExtractNewRecordState` SMT.
ifdef::community[]
== Behavior
endif::community[]
ifdef::product[]
// Type: concept
[id="behavior-of-debezium-extractnewrecordstate-transformation"]
== Behavior of Debezium `ExtractNewRecordState` transformation
endif::product[]
link:https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java[The `ExtractNewRecordState` SMT] extracts the `after` field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
You can configure the `ExtractNewRecordState` SMT for a Debezium connector, that is, for a source connector, or for a sink connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
* Add metadata from the change event to the simplified Kafka record or record header. The default behavior is that the SMT does not add metadata to the simplified Kafka record.
* Keep Kafka records that contain change events for `DELETE` operations in the stream. The default behavior is that the SMT drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them.
A database `DELETE` operation causes Debezium to generate two Kafka records:
* A record that contains `"op": "d",` the `before` row data, and some other fields.
* A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that
link:https://kafka.apache.org/documentation/#compaction[log compaction] can remove all records that have this key.
Instead of dropping the record that contains the `before` row data, you can configure the `ExtractNewRecordData` SMT to do one of the following:
* Keep the record in the stream and edit it to have only the `"value": "null"` field.
* Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry.
Similary, instead of dropping the tombstone record, you can configure the `ExtractNewRecordData` SMT to keep the tombstone record in the stream.
ifdef::community[]
== Configuration
endif::community[]
ifdef::product[]
// Type: concept
[id="configuration-of-extractnewrecordstate-transformation"]
== Configuration of `ExtractNewRecordState` transformation
endif::product[]
Configure the Debezium `ExtractNewRecordState` SMT in a Kafka Connect source or sink connector `.properties` file. To obtain the default behavior, specify something like the following:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
----
As in any Kafka Connect connector `.properties` file, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following example sets several `ExtractNewRecordState` options:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
----
`drop-tombstones=false`:: Keeps tombstone records for `DELETE` operations in the event stream.
`delete-handling-mode=rewrite`:: For `DELETE` operations, edits the Kafka record by flattening the `value` field that was in the change event. The `value` field directly contains the key/value pairs that were in the `before` field. The SMT adds `__deleted` and sets it to `true`, for example:
+
----
"value": {
"pk": 2,
"cola": null,
"__deleted": "true"
}
----
`add.fields=table,lsn`:: Adds change event metadata for the `table` and `lsn` fields to the simplified Kafka record.
ifdef::community[]
== Adding metadata
endif::community[]
ifdef::product[]
// Type: concept
[id="example-of-adding-metadata-to-the-kafka-record-or-its-header"]
== Example of adding metadata to the Kafka record or its header
endif::product[]
The `ExtractNewRecordState` SMT can add original, change event metadata to the simplified Kafka record or its header. For example, you might want the simplified record or its header to contain any of the following:
* The type of operation that made the change
* The name of the database or table that was changed
* Connector-specific fields such as the Postgres LSN field
ifdef::community[]
For more information on what is available see xref:connectors/index.adoc[the documentation for each connector].
endif::community[]
To add metadata to the simplified Kafka record, specify the `add.fields` option.
To add metadata to the header of the simplified Kafka record, specify the `add.header` option. Each of these options takes a comma separated list of change event field names. Do not specify spaces. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field. For example:
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.handling.mode=rewrite
----
With that configuration, a simplified Kafka record would contain something like the following:
----
{ "__op" : "c", __table": "MY_TABLE", "__lsn": "123456789", "__source_ts_ms" : "123456789", ...}
----
Also, simplified Kafka records would have a `__db` header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadat to a simplified Kafka record that is for a `DELETE` operation, you must also configure `delete.handling.mode=rewrite`.
ifdef::community[]
// Do not include deprecated content in downstream doc
== Determine original operation [DEPRECATED]
_The `operation.header` option is deprecated and scheduled for removal. Please use add.headers instead. If both add.headers and operation.header are specified, the latter will be ignored._
When a Kafka record is flattened the final result won't show whether it was an insert, update or first read
(deletions can be detected via tombstones or rewrites, see link:#options-for-configuring-extractnewrecordstate-transformation[Configuration options]).
To solve this problem Debezium offers an option to propagate the original operation via a header added to the Kafka record.
To enable this feature the option `operation.header` must be set to `true`.
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.operation.header=true
----
The possible values are the ones from the `op` field of the original change event.
endif::community[]
ifdef::community[]
// Do not include deprecated content in downstream doc
== Adding source metadata fields [DEPRECATED]
_The `add.source.fields` option is deprecated and scheduled for removal. Please use add.fields instead. If both add.fields and add.source.fields are specified, the latter will be ignored._
The SMT can optionally add metadata fields from the original change event's `source` structure to the final flattened record (prefixed with "__"). This functionality can be used to add things like the table from the change event, or connector-specific fields like the Postgres LSN field. For more information on what's available in the source structure see xref:connectors/index.adoc[the documentation for each connector].
For example, the configuration
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
----
will add
----
{ "__table": "MY_TABLE", "__lsn": "123456789", ...}
----
to the final flattened record.
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
== Configuration options
endif::community[]
ifdef::product[]
// Type: reference
[id="options-for-configuring-extractnewrecordstate-transformation"]
== Options for configuring `ExtractNewRecordState` transformation
endif::product[]
The following table describes the options that you can specify for the `ExtractNewRecordState` SMT.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[configuration-option-drop-tombstones]]<<configuration-option-drop-tombstones, `drop.tombstones`>>
|`true`
|Debezium generates a tombstone record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
|[[configuration-option-delete-handling-mode]]<<configuration-option-delete-handling-mode, `delete.handling.mode`>>
|`drop`
|Debezium generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
+
Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. +
+
Specify `rewrite` to keep the change event record in the stream and edit the record to have a `value` field that contains the key/value pairs that were in the `before` field and also add `+__deleted: true+` to the `value`. This is another way to indicate that the record has been deleted. +
+
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.
|[[configuration-option-route-by-field]]<<configuration-option-route-by-field, `route.by.field`>>
|
|To use row data to determine the topic to route the record to, set this option to an `after` field attribute. The SMT routes the record to the topic whose name matches the value of the specified `after` field attribute. For a `DELETE` operation, set this option to a `before` field attribute. +
+
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
+
If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
|[[configuration-option-add-fields]]<<configuration-option-add-fields, `add.fields`>>
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
When the SMT adds metadata fields to the simplified record, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT adds the field.
|[[configuration-option-add-headers]]<<configuration-option-add-headers, `add.headers`>>
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT does not add the field to the header.
ifdef::community[]
// Do not include deprecated content in downstream doc
|`operation.header` DEPRECATED
|`false`
|_This option is deprecated and scheduled for removal. Please use add.headers instead. If both add.headers and operation.header are specified, the latter will be ignored._
The SMT adds the event operation (as obtained from the `op` field of the original record) as a Kafka record header.
// Do not include deprecated content in downstream doc
|`add.source.fields` DEPRECATED
|
|_This option is deprecated and scheduled for removal. Please use add.fields instead. If both add.fields and add.source.fields are specified, the latter will be ignored._
Fields from the change event's `source` structure to add as metadata (prefixed with "__") to the flattened record.
endif::community[]
|===