DBZ-2021 Completed revision of doc for topic routing and event flattening

This commit is contained in:
TovaCohen 2020-05-12 16:49:56 -04:00 committed by Chris Cranford
parent d28a934635
commit c9c9be51e8
2 changed files with 46 additions and 31 deletions

View File

@ -1,7 +1,5 @@
// Category: cdc-using
// Type: assembly
// :community:
:product:
ifdef::community[]
[id="new-record-state-extraction"]
@ -119,24 +117,25 @@ link:https://github.com/debezium/debezium/blob/master/debezium-core/src/main/jav
You can configure the `ExtractNewRecordState` SMT for a Debezium connector, that is, for a source connector, or for a sink connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do either or both of the following:
You can configure the transformation to do any of the following:
* Add metadata from the change event to the simplified Kafka record or header.
* Add metadata from the change event to the simplified Kafka record or record header. The default behavior is that the SMT does not add metadata to the simplified Kafka record.
* Pass Kafka records that contain change events for `DELETE` operations to consumers.
* Keep Kafka records that contain change events for `DELETE` operations in the stream. The default behavior is that the SMT drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them.
The default behavior is that the transformation drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them. A database `DELETE` operation causes Debezium to generate two Kafka records:
A database `DELETE` operation causes Debezium to generate two Kafka records:
* A record that contains `"op": "d",` and the `before` row data.
* A record that contains `"op": "d",` the `before` row data, and some other fields.
* A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that
link:https://kafka.apache.org/documentation/#compaction[log compaction] can remove all records that have this key.
Instead of dropping the record that contains the `before` row data, you can configure the `ExtractNewRecordData` SMT to do one of the following:
* Pass the record as is.
* Convert the record into another tombstone record and pass it on.
* Keep the record in the stream and edit it to have only the `"value": "null"` field.
* Keep the record in the stream and edit it to have a `value` field that contains the key/value pairs that were in the `before` field with an added `"__deleted": "true"` entry.
Similary, instead of dropping the tombstone record, you can configure the `ExtractNewRecordData` SMT to pass the tombstone record along.
Similary, instead of dropping the tombstone record, you can configure the `ExtractNewRecordData` SMT to keep the tombstone record in the stream.
ifdef::community[]
== Configuration
@ -158,7 +157,7 @@ transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
As in any Kafka Connect connector `.properties` file, you can set `transforms=` to multiple, comma-separated, SMT aliases in the order in which you want Kafka Connect to apply the SMTs.
To pass tombstone records for `DELETE` operations and also add metadata to the simplified Kafka record, specify something like this:
The following example sets several `ExtractNewRecordState` options:
[source]
----
@ -171,7 +170,15 @@ transforms.unwrap.add.fields=table,lsn
`drop-tombstones=false`:: Keeps tombstone records for `DELETE` operations in the event stream.
`delete-handling-mode=rewrite`:: Adds metadata for `DELETE` operations to the simplified Kafka record.
`delete-handling-mode=rewrite`:: For `DELETE` operations, edits the Kafka record by flattening the `value` field that was in the change event. The `value` field directly contains the key/value pairs that were in the `before` field. The SMT adds `__deleted` and sets it to `true`, for example:
+
----
"value": {
"pk": 2,
"cola": null,
"__deleted": "true"
}
----
`add.fields=table,lsn`:: Adds change event metadata for the `table` and `lsn` fields to the simplified Kafka record.
@ -216,7 +223,7 @@ Also, simplified Kafka records would have a `__db` header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadata from a `DELETE` change event, you must also configure the `delete.handling.mode` option with a setting of `rewrite`.
To add metadat to a simplified Kafka record that is for a `DELETE` operation, you must also configure `delete.handling.mode=rewrite`.
ifdef::community[]
// Do not include deprecated content in downstream doc
@ -292,25 +299,35 @@ The following table describes the options that you can specify for the `ExtractN
|`drop`
|Debezium generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
+
Specify `none` to keep this change event record in the stream as it is. +
Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. +
+
Specify `rewrite` to keep this change event record in the stream and also add a `__deleted` field set to `true` to the change event record.
Specify `rewrite` to keep the change event record in the stream and edit the record to have a `value` field that contains the key/value pairs that were in the `before` field and also add `+__deleted: true+` to the `value`. This is another way to indicate that the record has been deleted. +
+
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.
|[[configuration-option-route-by-field]]<<configuration-option-route-by-field, `route.by.field`>>
|
|The column that determines how the events will be routed. The value is a topic name obtained from the `after` state. For `DELETE` operations, the value is obtained from the `before` record state.
|To use row data to determine the topic to route the record to, set this option to an `after` field attribute. The SMT routes the record to the topic whose name matches the value of the specified `after` field attribute. For a `DELETE` operation, set this option to a `before` field attribute. +
+
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
+
If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
|[[configuration-option-add-fields]]<<configuration-option-add-fields, `add.fields`>>
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
When the SMT adds metadata fields to the simplified record, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.
When the SMT adds metadata fields to the simplified record, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT adds the field.
|[[configuration-option-add-headers]]<<configuration-option-add-headers, `add.headers`>>
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name.
When the SMT adds metadata fields to the simplified record's header, it prefixes each metadata field name with a double underscore. For a struct specification, the SMT also inserts an underscore between the struct name and the field name. +
+
If you specify a field that is not in the change event record, the SMT does not add the field to the header.
ifdef::community[]
// Do not include deprecated content in downstream doc

View File

@ -1,7 +1,5 @@
// Category: cdc-using
// Type: assembly
// :community:
:product:
ifdef::community[]
[id="topic-routing"]
@ -21,14 +19,14 @@ ifdef::product[]
= Routing change event records to topics that you specify
endif::product[]
Before Kafka records that contain database change events reach the Kafka Connect converter, you can re-route them to topics that you choose.
Each Kafka record that contains a database change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter.
To do this, Debezium provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the Debezium connector's Kafka Connect `.properties` file. Configuration options enable you to specify the following:
* An expression for identifying the records to route
* The destination topic
* How to ensure a unique key among the records being routed to the topic
* An expression for identifying the records to re-route
* An expression that resolves to the destination topic
* How to ensure a unique key among the records being re-routed to the destination topic
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from the way that you configure the transformation.
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.
The `ByLogicalTableRouter` transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
@ -42,7 +40,7 @@ The following topics provide details:
endif::product[]
ifdef::community[]
= Use case
== Use case
endif::community[]
ifdef::product[]
@ -57,7 +55,7 @@ A logical table is a common use case for routing records for multiple physical t
You can re-route change event records for tables in any of the shards to the same topic.
ifdef::community[]
= Example
== Example
endif::community[]
ifdef::product[]
@ -66,7 +64,7 @@ ifdef::product[]
== Example of routing records for multiple tables to one topic
endif::product[]
To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect `.properties` file for the Debezium connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that identify:
To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect `.properties` file for the Debezium connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that determine:
* The tables for which to route records. These tables must all have the same schema.
* The destination topic name.
@ -92,7 +90,7 @@ In the example, the regular expression, `(.*)customers_shard(.*)` matches record
`topic.replacement`:: Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the `myserver.mydb.customers_all_shards` topic.
ifdef::community[]
= Ensure unique key
== Ensure unique key
endif::community[]
ifdef::product[]
@ -118,11 +116,11 @@ transforms.Reroute.key.field.name=shard_id
This example adds the `shard_id` field to the key structure in routed records.
If you want to adjust the value of key's new field, configure both of these options:
If you want to adjust the value of the key's new field, configure both of these options:
`key.field.regex`:: Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters.
`key.field.replacement`:: Specifies a regular expression for the value of the inserted key field in terms of those captured groups.
`key.field.replacement`:: Specifies a regular expression for determining the value of the inserted key field in terms of those captured groups.
For example: