From e95d5467b07ec5aac15499a8288b82d2e4df2f75 Mon Sep 17 00:00:00 2001 From: Tova Cohen Date: Thu, 17 Sep 2020 16:27:35 -0400 Subject: [PATCH] DBZ-2554 Edit topic routing/event flattening SMTs format/style/annotations/conditions --- .../pages/configuration/event-flattening.adoc | 109 +++++++----------- .../pages/configuration/topic-routing.adoc | 77 +++++-------- 2 files changed, 72 insertions(+), 114 deletions(-) diff --git a/documentation/modules/ROOT/pages/configuration/event-flattening.adoc b/documentation/modules/ROOT/pages/configuration/event-flattening.adoc index c0fdc67da..716a39b31 100644 --- a/documentation/modules/ROOT/pages/configuration/event-flattening.adoc +++ b/documentation/modules/ROOT/pages/configuration/event-flattening.adoc @@ -1,9 +1,10 @@ // Category: debezium-using // Type: assembly - -ifdef::community[] +// ModuleID: extracting-source-record-after-state-from-debezium-change-events +// Title: Extracting source record `after` state from {prodname} change events [id="new-record-state-extraction"] = New Record State Extraction + :toc: :toc-placement: macro :linkattrs: @@ -12,43 +13,36 @@ ifdef::community[] toc::[] +ifdef::community[] [NOTE] ==== This single message transformation (SMT) is supported for only the SQL database connectors. For the MongoDB connector, see the {link-prefix}:{link-mongodb-event-flattening}[documentation for the MongoDB equivalent to this SMT]. ==== endif::community[] -ifdef::product[] -[id="extracting-source-record-after-state-from-debezium-change-events"] -= Extracting source record `after` state from {prodname} change events -endif::product[] - A {prodname} data change event has a complex structure that provides a wealth of information. Kafka records that convey {prodname} change events contain all of this information. However, parts of a Kafka ecosystem might expect Kafka records that provide a flat structure of field names and values. -To provide this kind of record, {prodname} provides the `ExtractNewRecordState` single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events. +To provide this kind of record, {prodname} provides the event flattening single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events. -The `ExtractNewRecordState` transformation is a +The event flattening transformation is a link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT]. ifdef::product[] -The transformation is available to only SQL database connectors. +This transformation is available to only SQL database connectors. The following topics provide details: * xref:description-of-debezium-change-event-structure[] -* xref:behavior-of-debezium-extractnewrecordstate-transformation[] -* xref:configuration-of-extractnewrecordstate-transformation[] -* xref:example-of-adding-metadata-to-the-kafka-record[] -* xref:options-for-configuring-extractnewrecordstate-transformation[] - -// Type: concept -[id="description-of-debezium-change-event-structure"] -== Description of {prodname} change event structure +* xref:behavior-of-debezium-event-flattening-transformation[] +* xref:configuration-of-debezium-event-flattening-transformation[] +* xref:example-of-adding-debezium-metadata-to-the-kafka-record[] +* xref:options-for-configuring-debezium-event-flattening-transformation[] endif::product[] -ifdef::community[] +// Type: concept +// ModuleID: description-of-debezium-change-event-structure +// Title: Description of {prodname} change event structure == Change event structure -endif::community[] {prodname} generates data change events that have a complex structure. Each event consists of three parts: @@ -63,7 +57,7 @@ Each event consists of three parts: * Row data before the change * Row data after the change -For example, the structure of an `UPDATE` change event looks like this: +For example, part of the structure of an `UPDATE` change event looks like this: [source,json,indent=0] ---- @@ -100,21 +94,16 @@ However, other connectors or other parts of the Kafka ecosystem usually expect t } ---- -To provide the needed Kafka record format for consumers, configure the `ExtractNewRecordState` SMT. +To provide the needed Kafka record format for consumers, configure the event flattening SMT. -ifdef::community[] -== Behavior -endif::community[] - -ifdef::product[] // Type: concept -[id="behavior-of-debezium-extractnewrecordstate-transformation"] -== Behavior of {prodname} `ExtractNewRecordState` transformation -endif::product[] +// ModuleID: behavior-of-debezium-event-flattening-transformation +// Behavior of {prodname} event flattening transformation +== Behavior -The `ExtractNewRecordState` SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record. +The event flattening SMT extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record. -You can configure the `ExtractNewRecordState` SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case. +You can configure the event flattening SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring event flattening for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case. You can configure the transformation to do any of the following: @@ -128,25 +117,20 @@ A database `DELETE` operation causes {prodname} to generate two Kafka records: * A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that link:https://kafka.apache.org/documentation/#compaction[log compaction] can remove all records that have this key. -Instead of dropping the record that contains the `before` row data, you can configure the `ExtractNewRecordData` SMT to do one of the following: +Instead of dropping the record that contains the `before` row data, you can configure the event flattening SMT to do one of the following: * Keep the record in the stream and edit it to have only the `"value": "null"` field. * Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry. -Similary, instead of dropping the tombstone record, you can configure the `ExtractNewRecordData` SMT to keep the tombstone record in the stream. +Similary, instead of dropping the tombstone record, you can configure the event flattening SMT to keep the tombstone record in the stream. -ifdef::community[] -== Configuration -endif::community[] - -ifdef::product[] // Type: concept -[id="configuration-of-extractnewrecordstate-transformation"] -== Configuration of `ExtractNewRecordState` transformation -endif::product[] +// ModuleID: configuration-of-debezium-event-flattening-transformation +// Title: Configuration of {prodname} event flattening transformation +== Configuration -Configure the {prodname} `ExtractNewRecordState` SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration. To obtain the default behavior, in a `.properties` file, you would specify something like the following: +Configure the {prodname} event flattening SMT in a Kafka Connect source or sink connector by adding the SMT configuration details to your connector's configuration. To obtain the default behavior, in a `.properties` file, you would specify something like the following: [source] ---- @@ -156,7 +140,7 @@ transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState As for any Kafka Connect connector configuration, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs. -The following `.properties` example sets several `ExtractNewRecordState` options: +The following `.properties` example sets several event flattening SMT options: [source] ---- @@ -182,17 +166,12 @@ transforms.unwrap.add.fields=table,lsn `add.fields=table,lsn`:: Adds change event metadata for the `table` and `lsn` fields to the simplified Kafka record. -ifdef::community[] -== Adding metadata -endif::community[] - -ifdef::product[] // Type: concept -[id="example-of-adding-metadata-to-the-kafka-record"] -== Example of adding metadata to the Kafka record -endif::product[] +// ModuleID: example-of-adding-debezium-metadata-to-the-kafka-record +// Title: Example of adding {prodname} metadata to the Kafka record +== Adding metadata -The `ExtractNewRecordState` SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record's header or value to contain any of the following: +The event flattening SMT can add original, change event metadata to the simplified Kafka record. For example, you might want the simplified record's header or value to contain any of the following: * The type of operation that made the change * The name of the database or table that was changed @@ -236,33 +215,31 @@ To add metadata to a simplified Kafka record that is for a `DELETE` operation, y ifdef::community[] [id="configuration-options"] -== Configuration options - endif::community[] -ifdef::product[] // Type: reference -[id="options-for-configuring-extractnewrecordstate-transformation"] -== Options for configuring `ExtractNewRecordState` transformation -endif::product[] +// ModuleID: options-for-configuring-debezium-event-flattening-transformation +// Title: Options for configuring {prodname} event flattening transformation +== Configuration options -The following table describes the options that you can specify for the `ExtractNewRecordState` SMT. +The following table describes the options that you can specify to configure the event flattening SMT. -[cols="30%a,25%a,45%a"] +.Descriptions of event flattening SMT configuration options +[cols="30%a,25%a,45%a",subs="+attributes"] |=== -|Property +|Option |Default |Description [id="extract-new-record-state-drop-tombstones"] |{link-prefix}:{link-event-flattening}#extract-new-record-state-drop-tombstones[`drop.tombstones`] |`true` -|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`. +|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that event flattening SMT removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`. [id="extract-new-record-state-delete-handling-mode"] -|{link-prefix}:{link-event-flattening}#extract-new-record-state-delete-handling-mode[`delete.handling.mode`] +|{link-prefix}:{link-event-flattening}#extract-new-record-state-delete-handling-mode[`delete.handling{zwsp}.mode`] |`drop` -|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. + +|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that event flattening SMT removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. + + Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. + + @@ -277,7 +254,7 @@ When you specify `rewrite`, the updated simplified records for `DELETE` operati + For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. + + -If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event. +If you are configuring the event flattening SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event. [id="extract-new-record-state-add-fields-prefix"] |{link-prefix}:{link-event-flattening}#extract-new-record-state-add-fields-prefix[`add.fields.prefix`] diff --git a/documentation/modules/ROOT/pages/configuration/topic-routing.adoc b/documentation/modules/ROOT/pages/configuration/topic-routing.adoc index c4c5f7086..a18d7422b 100644 --- a/documentation/modules/ROOT/pages/configuration/topic-routing.adoc +++ b/documentation/modules/ROOT/pages/configuration/topic-routing.adoc @@ -1,7 +1,8 @@ // Category: debezium-using // Type: assembly +// ModuleID: routing-debezium-event-records-to-topics-that-you-specify +// Title: Routing {prodname} event records to topics that you specify -ifdef::community[] [id="topic-routing"] = Topic Routing :toc: @@ -11,15 +12,9 @@ ifdef::community[] :source-highlighter: highlight.js toc::[] -endif::community[] - -ifdef::product[] -[id="routing-change-event-records-to-topics-that-you-specify"] -= Routing change event records to topics that you specify -endif::product[] Each Kafka record that contains a data change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter. -To do this, {prodname} provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the {prodname} connector's Kafka Connect configuration. Configuration options enable you to specify the following: +To do this, {prodname} provides the topic routing single message transformation (SMT). Configure this transformation in the {prodname} connector's Kafka Connect configuration. Configuration options enable you to specify the following: * An expression for identifying the records to re-route * An expression that resolves to the destination topic @@ -27,7 +22,7 @@ To do this, {prodname} provides the `ByLogicalTableRouter` single message transf It is up to you to ensure that the transformation configuration provides the behavior that you want. {prodname} does not validate the behavior that results from your configuration of the transformation. -The `ByLogicalTableRouter` transformation is a +The topic routing transformation is a link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT]. ifdef::product[] @@ -36,18 +31,13 @@ The following topics provide details: * xref:use-case-for-routing-records-to-topics-that-you-specify[] * xref:example-of-routing-records-for-multiple-tables-to-one-topic[] * xref:ensuring-unique-keys-across-records-routed-to-the-same-topic[] -* xref:options-for-configuring-bylogicaltablerouter-transformation[] +* xref:options-for-configuring-topic-routing-transformation[] endif::product[] -ifdef::community[] -== Use case -endif::community[] - -ifdef::product[] // Type: concept -[id="use-case-for-routing-records-to-topics-that-you-specify"] -== Use case for routing records to topics that you specify -endif::product[] +// ModuleID: use-case-for-routing-records-to-topics-that-you-specify +// Title: Use case for routing records to topics that you specify +== Use case The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the {prodname} connector to re-route the records to that topic. @@ -58,19 +48,14 @@ You can re-route change event records for tables in any of the shards to the sam .Partitioned PostgreSQL tables -When the {prodname} PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the `ByLogicalTableRouter` SMT. Because each key in a partitioned table is guaranteed to be unique, configure {link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness=false`] so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior. +When the {prodname} PostgreSQL connector captures changes in a partitioned table, the default behavior is that change event records are routed to a different topic for each partition. To emit records from all partitions to one topic, configure the topic routing SMT. Because each key in a partitioned table is guaranteed to be unique, configure {link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness=false`] so that the SMT does not add a key field to ensure unique keys. The addition of a key field is default behavior. -ifdef::community[] -== Example -endif::community[] - -ifdef::product[] // Type: concept -[id="example-of-routing-records-for-multiple-tables-to-one-topic"] -== Example of routing records for multiple tables to one topic -endif::product[] +// ModuleID: example-of-routing-records-for-multiple-tables-to-one-topic +// Title: Example of routing records for multiple tables to one topic +== Example -To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect configuration for the {prodname} connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that determine: +To route change event records for multiple physical tables to the same topic, configure the topic routing transformation in the Kafka Connect configuration for the {prodname} connector. Configuration of the topic routing SMT requires you to specify regular expressions that determine: * The tables for which to route records. These tables must all have the same schema. * The destination topic name. @@ -95,21 +80,16 @@ In the example, the regular expression, `(.*)customers_shard(.*)` matches record `topic.replacement`:: Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the `myserver.mydb.customers_all_shards` topic. -ifdef::community[] -== Ensure unique key -endif::community[] - -ifdef::product[] // Type: procedure -[id="ensuring-unique-keys-across-records-routed-to-the-same-topic"] -== Ensuring unique keys across records routed to the same topic -endif::product[] +// ModuleID: ensuring-unique-keys-across-records-routed-to-the-same-topic +// Title: Ensuring unique keys across records routed to the same topic +== Ensure unique key A {prodname} change event key uses the table columns that make up the table's primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the `myserver.mydb.customers_shard1` table might have the same key value as a row in the `myserver.mydb.customers_shard2` table. -To ensure that each event key is unique across the tables whose change event records go to the same topic, the `ByLogicalTableRouter` transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__physicalTableIdentifier+`. The value of the inserted field is the default destination topic name. +To ensure that each event key is unique across the tables whose change event records go to the same topic, the topic routing transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__physicalTableIdentifier+`. The value of the inserted field is the default destination topic name. -If you want to, you can configure the `ByLogicalTableRouter` transformation to insert a different field into the key. To do this, specify the `key.field.name` option and set it to a field name that does not clash with existing primary key field names. For example: +If you want to, you can configure the topic routing transformation to insert a different field into the key. To do this, specify the `key.field.name` option and set it to a field name that does not clash with existing primary key field names. For example: [source] ---- @@ -144,7 +124,7 @@ With this configuration, suppose that the default destination topic names are: The transformation uses the values in the second captured group, the shard numbers, as the value of the key's new field. In this example, the inserted key field's values would be `1`, `2`, or `3`. -If your tables contain globally unique keys and you do not need to change the key structure, you can set the `key.enforce.uniqueness` property to `false`: +If your tables contain globally unique keys and you do not need to change the key structure, you can set the `key.enforce.uniqueness` option to `false`: [source] ---- @@ -155,18 +135,19 @@ transforms.Reroute.key.enforce.uniqueness=false ifdef::community[] [[configuration-options]] -== Configuration options endif::community[] -ifdef::product[] // Type: reference -[id="options-for-configuring-bylogicaltablerouter-transformation"] -== Options for configuring `ByLogicalTableRouter` transformation -endif::product[] +// ModuleID: options-for-configuring-topic-routing-transformation +// Title: Options for configuring topic routing transformation +== Configuration options -[cols="30%a,25%a,45%a"] +The following table describes topic routing SMT configuration options. + +.Topic routing SMT configuration options +[cols="30%a,25%a,45%a",subs="+attributes"] |=== -|Property +|Option |Default |Description @@ -181,7 +162,7 @@ endif::product[] |Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for `topic.regex`. To refer to a group, specify `$1`, `$2`, and so on. [id="by-logical-table-router-key-enforce-uniqueness"] -|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce.uniqueness`] +|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-enforce-uniqueness[`key.enforce{zwsp}.uniqueness`] |`true` |Indicates whether to add a field to the record's change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. + + @@ -198,7 +179,7 @@ Specify `false` if you do not want the transformation to add a key field. For e |Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default. [id="by-logical-table-router-key-field-replacement"] -|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-field-replacement[`key.field.replacement`] +|{link-prefix}:{link-topic-routing}#by-logical-table-router-key-field-replacement[`key.field{zwsp}.replacement`] | |Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for `key.field.regex`. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default.