DBZ-2554 Edit topic routing/event flattening SMTs format/style/annotations/conditions

This commit is contained in:
Tova Cohen 2020-09-17 16:27:35 -04:00 committed by Chris Cranford
parent 134c80b562
commit e95d5467b0
2 changed files with 72 additions and 114 deletions

View File

@ -1,9 +1,10 @@
// Category: debezium-using
// Type: assembly
ifdef::community[]
// ModuleID: extracting-source-record-after-state-from-debezium-change-events
// Title: Extracting source record `after` state from {prodname} change events
[id="new-record-state-extraction"]
= New Record State Extraction
:toc:
:toc-placement: macro
:linkattrs:
@ -12,43 +13,36 @@ ifdef::community[]
toc::[]
ifdef::community[]
[NOTE]
====
This single message transformation (SMT) is supported for only the SQL database connectors. For the MongoDB connector, see the {link-prefix}:{link-mongodb-event-flattening}[documentation for the MongoDB equivalent to this SMT].
====
endif::community[]
ifdef::product[]
[id="extracting-source-record-after-state-from-debezium-change-events"]
= Extracting source record `after` state from {prodname} change events
endif::product[]
A {prodname} data change event has a complex structure that provides a wealth of information. Kafka records that convey {prodname} change events contain all of this information.
However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values.
To provide this kind of record, {prodname} provides the `ExtractNewRecordState` single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events.
To provide this kind of record, {prodname} provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events.
The `ExtractNewRecordState` transformation is a
The event flattening transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
ifdef::product[]
The transformation is available to only SQL database connectors.
This transformation is available to only SQL database connectors.
The following topics provide details:
* xref:description-of-debezium-change-event-structure[]
* xref:behavior-of-debezium-extractnewrecordstate-transformation[]
* xref:configuration-of-extractnewrecordstate-transformation[]
* xref:example-of-adding-metadata-to-the-kafka-record[]
* xref:options-for-configuring-extractnewrecordstate-transformation[]
// Type: concept
[id="description-of-debezium-change-event-structure"]
== Description of {prodname} change event structure
* xref:behavior-of-debezium-event-flattening-transformation[]
* xref:configuration-of-debezium-event-flattening-transformation[]
* xref:example-of-adding-debezium-metadata-to-the-kafka-record[]
* xref:options-for-configuring-debezium-event-flattening-transformation[]
endif::product[]
ifdef::community[]
// Type: concept
// ModuleID: description-of-debezium-change-event-structure
// Title: Description of {prodname} change event structure
== Change event structure
endif::community[]
{prodname} generates data change events that have a complex structure.
Each event consists of three parts:
@ -63,7 +57,7 @@ Each event consists of three parts:
* Row data before the change
* Row data after the change
For example, the structure of an `UPDATE` change event looks like this:
For example, part of the structure of an `UPDATE` change event looks like this:
[source,json,indent=0]
----
@ -100,21 +94,16 @@ However, other connectors or other parts of the Kafka ecosystem usually expect t
}
----
To provide the needed Kafka record format for consumers, configure the `ExtractNewRecordState` SMT.
To provide the needed Kafka record format for consumers, configure the event flattening SMT.
ifdef::community[]
== Behavior
endif::community[]
ifdef::product[]
// Type: concept
[id="behavior-of-debezium-extractnewrecordstate-transformation"]
== Behavior of {prodname} `ExtractNewRecordState` transformation
endif::product[]
// ModuleID: behavior-of-debezium-event-flattening-transformation
// Behavior of {prodname} event flattening transformation
== Behavior
The `ExtractNewRecordState` SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
The event flattening SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
You can configure the `ExtractNewRecordState` SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the event flattening SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
@ -128,25 +117,20 @@ A database `DELETE` operation causes {prodname} to generate two Kafka records:
* A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that
link:https://kafka.apache.org/documentation/#compaction[log compaction] can remove all records that have this key.
Instead of dropping the record that contains the `before` row data, you can configure the `ExtractNewRecordData` SMT to do one of the following:
Instead of dropping the record that contains the `before` row data, you can configure the event flattening SMT to do one of the following:
* Keep the record in the stream and edit it to have only the `"value": "null"` field.
* Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry.
Similary, instead of dropping the tombstone record, you can configure the `ExtractNewRecordData` SMT to keep the tombstone record in the stream.
Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream.
ifdef::community[]
== Configuration
endif::community[]
ifdef::product[]
// Type: concept
[id="configuration-of-extractnewrecordstate-transformation"]
== Configuration of `ExtractNewRecordState` transformation
endif::product[]
// ModuleID: configuration-of-debezium-event-flattening-transformation
// Title: Configuration of {prodname} event flattening transformation
== Configuration
Configure the {prodname} `ExtractNewRecordState` SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration. To obtain the default behavior, in a `.properties` file, you would specify something like the following:
Configure the {prodname} event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration. To obtain the default behavior, in a `.properties` file, you would specify something like the following:
[source]
----
@ -156,7 +140,7 @@ transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
As for any Kafka Connect connector configuration, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
The following `.properties` example sets several `ExtractNewRecordState` options:
The following `.properties` example sets several event flattening SMT options:
[source]
----
@ -182,17 +166,12 @@ transforms.unwrap.add.fields=table,lsn
`add.fields=table,lsn`:: Adds change event metadata for the `table` and `lsn` fields to the simplified Kafka record.
ifdef::community[]
== Adding metadata
endif::community[]
ifdef::product[]
// Type: concept
[id="example-of-adding-metadata-to-the-kafka-record"]
== Example of adding metadata to the Kafka record
endif::product[]
// ModuleID: example-of-adding-debezium-metadata-to-the-kafka-record
// Title: Example of adding {prodname} metadata to the Kafka record
== Adding metadata
The `ExtractNewRecordState` SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record's header or value to contain any of the following:
The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record's header or value to contain any of the following:
* The type of operation that made the change
* The name of the database or table that was changed
@ -236,33 +215,31 @@ To add metadata to a simplified Kafka record that is for a `DELETE` operation, y
ifdef::community[]
[id="configuration-options"]
== Configuration options
endif::community[]
ifdef::product[]
// Type: reference
[id="options-for-configuring-extractnewrecordstate-transformation"]
== Options for configuring `ExtractNewRecordState` transformation
endif::product[]
// ModuleID: options-for-configuring-debezium-event-flattening-transformation
// Title: Options for configuring {prodname} event flattening transformation
== Configuration options
The following table describes the options that you can specify for the `ExtractNewRecordState` SMT.
The following table describes the options that you can specify to configure the event flattening SMT.
[cols="30%a,25%a,45%a"]
.Descriptions of event flattening SMT configuration options
[cols="30%a,25%a,45%a",subs="+attributes"]
|===
|Property
|Option
|Default
|Description
[id="extract-new-record-state-drop-tombstones"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-drop-tombstones[`drop.tombstones`]
|`true`
|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
[id="extract-new-record-state-delete-handling-mode"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-delete-handling-mode[`delete.handling.mode`]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-delete-handling-mode[`delete.handling{zwsp}.mode`]
|`drop`
|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
+
Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. +
+
@ -277,7 +254,7 @@ When you specify `rewrite`, the updated simplified records for `DELETE` operati
+
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
+
If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
[id="extract-new-record-state-add-fields-prefix"]
|{link-prefix}:{link-event-flattening}#extract-new-record-state-add-fields-prefix[`add.fields.prefix`]

View File

@ -1,7 +1,8 @@
// Category: debezium-using
// Type: assembly
// ModuleID: routing-debezium-event-records-to-topics-that-you-specify
// Title: Routing {prodname} event records to topics that you specify
ifdef::community[]
[id="topic-routing"]
= Topic Routing
:toc:
@ -11,15 +12,9 @@ ifdef::community[]
:source-highlighter: highlight.js
toc::[]
endif::community[]
ifdef::product[]
[id="routing-change-event-records-to-topics-that-you-specify"]
= Routing change event records to topics that you specify
endif::product[]
Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter.
To do this, {prodname} provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the {prodname} connector's Kafka Connect configuration. Configuration options enable you to specify the following:
To do this, {prodname} provides the topic routing single message transformation (SMT). Configure this transformation in the {prodname} connector's Kafka Connect configuration. Configuration options enable you to specify the following:
* An expression for identifying the records to re-route
* An expression that resolves to the destination topic
@ -27,7 +22,7 @@ To do this, {prodname} provides the `ByLogicalTableRouter` single message transf
It is up to you to ensure that the transformation configuration provides the behavior that you want. {prodname} does not validate the behavior that results from your configuration of the transformation.
The `ByLogicalTableRouter` transformation is a
The topic routing transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
ifdef::product[]
@ -36,18 +31,13 @@ The following topics provide details:
* xref:use-case-for-routing-records-to-topics-that-you-specify[]
* xref:example-of-routing-records-for-multiple-tables-to-one-topic[]
* xref:ensuring-unique-keys-across-records-routed-to-the-same-topic[]
* xref:options-for-configuring-bylogicaltablerouter-transformation[]
* xref:options-for-configuring-topic-routing-transformation[]
endif::product[]
ifdef::community[]
== Use case
endif::community[]
ifdef::product[]
// Type: concept
[id="use-case-for-routing-records-to-topics-that-you-specify"]
== Use case for routing records to topics that you specify
endif::product[]
// ModuleID: use-case-for-routing-records-to-topics-that-you-specify
// Title: Use case for routing records to topics that you specify
== Use case
The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the {prodname} connector to re-route the records to that topic.
@ -58,19 +48,14 @@ You can re-route change event records for tables in any of the shards to the sam
.Partitioned PostgreSQL tables
When the {prodname} PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the `ByLogicalTableRouter` SMT. Because each key in a partitioned table is guaranteed to be unique, configure {link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness=false`] so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.
When the {prodname} PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure {link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness=false`] so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior.
ifdef::community[]
== Example
endif::community[]
ifdef::product[]
// Type: concept
[id="example-of-routing-records-for-multiple-tables-to-one-topic"]
== Example of routing records for multiple tables to one topic
endif::product[]
// ModuleID: example-of-routing-records-for-multiple-tables-to-one-topic
// Title: Example of routing records for multiple tables to one topic
== Example
To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect configuration for the {prodname} connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that determine:
To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the {prodname} connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine:
* The tables for which to route records. These tables must all have the same schema.
* The destination topic name.
@ -95,21 +80,16 @@ In the example, the regular expression, `(.*)customers_shard(.*)` matches record
`topic.replacement`:: Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the `myserver.mydb.customers_all_shards` topic.
ifdef::community[]
== Ensure unique key
endif::community[]
ifdef::product[]
// Type: procedure
[id="ensuring-unique-keys-across-records-routed-to-the-same-topic"]
== Ensuring unique keys across records routed to the same topic
endif::product[]
// ModuleID: ensuring-unique-keys-across-records-routed-to-the-same-topic
// Title: Ensuring unique keys across records routed to the same topic
== Ensure unique key
A {prodname} change event key uses the table columns that make up the table's primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the `myserver.mydb.customers_shard1` table might have the same key value as a row in the `myserver.mydb.customers_shard2` table.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the `ByLogicalTableRouter` transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__physicalTableIdentifier+`. The value of the inserted field is the default destination topic name.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__physicalTableIdentifier+`. The value of the inserted field is the default destination topic name.
If you want to, you can configure the `ByLogicalTableRouter` transformation to insert a different field into the key. To do this, specify the `key.field.name` option and set it to a field name that does not clash with existing primary key field names. For example:
If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the `key.field.name` option and set it to a field name that does not clash with existing primary key field names. For example:
[source]
----
@ -144,7 +124,7 @@ With this configuration, suppose that the default destination topic names are:
The transformation uses the values in the second captured group, the shard numbers, as the value of the key's new field. In this example, the inserted key field's values would be `1`, `2`, or `3`.
If your tables contain globally unique keys and you do not need to change the key structure, you can set the `key.enforce.uniqueness` property to `false`:
If your tables contain globally unique keys and you do not need to change the key structure, you can set the `key.enforce.uniqueness` option to `false`:
[source]
----
@ -155,18 +135,19 @@ transforms.Reroute.key.enforce.uniqueness=false
ifdef::community[]
[[configuration-options]]
== Configuration options
endif::community[]
ifdef::product[]
// Type: reference
[id="options-for-configuring-bylogicaltablerouter-transformation"]
== Options for configuring `ByLogicalTableRouter` transformation
endif::product[]
// ModuleID: options-for-configuring-topic-routing-transformation
// Title: Options for configuring topic routing transformation
== Configuration options
[cols="30%a,25%a,45%a"]
The following table describes topic routing SMT configuration options.
.Topic routing SMT configuration options
[cols="30%a,25%a,45%a",subs="+attributes"]
|===
|Property
|Option
|Default
|Description
@ -181,7 +162,7 @@ endif::product[]
|Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for `topic.regex`. To refer to a group, specify `$1`, `$2`, and so on.
[id="by-logical-table-router-key-enforce-uniqueness"]
|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness`]
|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce{zwsp}.uniqueness`]
|`true`
|Indicates whether to add a field to the record's change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. +
+
@ -198,7 +179,7 @@ Specify `false` if you do not want the transformation to add a key field. For e
|Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default.
[id="by-logical-table-router-key-field-replacement"]
|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-field-replacement[`key.field.replacement`]
|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-field-replacement[`key.field{zwsp}.replacement`]
|
|Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for `key.field.regex`. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default.