DBZ-2021 Updates based on PR review comments

This commit is contained in:
TovaCohen 2020-05-14 15:20:22 -04:00 committed by Chris Cranford
parent 2f62f24e0d
commit 093f8103cb
3 changed files with 92 additions and 32 deletions

View File

@ -15,9 +15,11 @@ asciidoc:
modules: '../../modules'
mysql-version: '8.0'
strimzi-version: '0.13.0'
community:
community: true
link-prefix: 'xref'
link-avro-serialization: 'configuration/avro.adoc'
link-event-flattening: 'configuration/event-flattening.adoc'
link-topic-routing: 'configuration/topic-routing.adoc'
link-mysql-connector: 'connectors/mysql.adoc'
link-mongodb-connector: 'connectors/mongodb.adoc'
link-postgresql-connector: 'connectors/postgresql.adoc'

View File

@ -4,7 +4,6 @@
ifdef::community[]
[id="new-record-state-extraction"]
= New Record State Extraction
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
:linkattrs:
@ -21,12 +20,12 @@ endif::community[]
ifdef::product[]
[id="extracting-source-record-after-state-from-debezium-change-events"]
= Extracting source record `after` state from Debezium change events
= Extracting source record `after` state from {prodname} change events
endif::product[]
A Debezium database change event has a complex structure that provides a wealth of information. Kafka records that convey Debezium change events contain all of this information.
A {prodname} database change event has a complex structure that provides a wealth of information. Kafka records that convey {prodname} change events contain all of this information.
However, parts of a Kafka ecosystem expect Kafka records that provide a flat structure of field names and values.
To provide this kind of record, Debezium provides the `ExtractNewRecordState` single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain Debezium change events.
To provide this kind of record, {prodname} provides the `ExtractNewRecordState` single message transformation (SMT). Configure this transformation when consumers need Kafka records that have a format that is simpler than Kafka records that contain {prodname} change events.
The `ExtractNewRecordState` transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
@ -44,14 +43,14 @@ The following topics provide details:
// Type: concept
[id="description-of-debezium-change-event-structure"]
== Description of Debezium change event structure
== Description of {prodname} change event structure
endif::product[]
ifdef::community[]
== Change event structure
endif::community[]
Debezium generates database change events that have a complex structure.
{prodname} generates database change events that have a complex structure.
Each event consists of three parts:
* Metadata, which includes but is not limited to:
@ -110,12 +109,12 @@ endif::community[]
ifdef::product[]
// Type: concept
[id="behavior-of-debezium-extractnewrecordstate-transformation"]
== Behavior of Debezium `ExtractNewRecordState` transformation
== Behavior of {prodname} `ExtractNewRecordState` transformation
endif::product[]
link:https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java[The `ExtractNewRecordState` SMT] extracts the `after` field from a Debezium change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
link:https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/ExtractNewRecordState.java[The `ExtractNewRecordState` SMT] extracts the `after` field from a {prodname} change event in a Kafka record. The SMT replaces the original change event with only its `after` field to create a simple Kafka record.
You can configure the `ExtractNewRecordState` SMT for a Debezium connector, that is, for a source connector, or for a sink connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole Debezium change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the `ExtractNewRecordState` SMT for a {prodname} connector or for a sink connector that consumes messages emitted by a {prodname} connector. The advantage of configuring `ExtractNewRecordState` for a sink connector is that records stored in Apache Kafka contain whole {prodname} change events. The decision to apply the SMT to a source or sink connector depends on your particular use case.
You can configure the transformation to do any of the following:
@ -123,7 +122,7 @@ You can configure the transformation to do any of the following:
* Keep Kafka records that contain change events for `DELETE` operations in the stream. The default behavior is that the SMT drops Kafka records for `DELETE` operation change events because most consumers cannot yet handle them.
A database `DELETE` operation causes Debezium to generate two Kafka records:
A database `DELETE` operation causes {prodname} to generate two Kafka records:
* A record that contains `"op": "d",` the `before` row data, and some other fields.
* A tombstone record that has the same key as the deleted row and a value of `null`. This record is a marker for Apache Kafka. It indicates that
@ -147,7 +146,7 @@ ifdef::product[]
== Configuration of `ExtractNewRecordState` transformation
endif::product[]
Configure the Debezium `ExtractNewRecordState` SMT in a Kafka Connect source or sink connector `.properties` file. To obtain the default behavior, specify something like the following:
Configure the {prodname} `ExtractNewRecordState` SMT in a Kafka Connect source or sink connector `.properties` file. To obtain the default behavior, specify something like the following:
[source]
----
@ -223,7 +222,7 @@ Also, simplified Kafka records would have a `__db` header.
In the simplified Kafka record, the SMT prefixes the metadata field names with a double underscore. When you specify a struct, the SMT also inserts an underscore between the struct name and the field name.
To add metadat to a simplified Kafka record that is for a `DELETE` operation, you must also configure `delete.handling.mode=rewrite`.
To add metadata to a simplified Kafka record that is for a `DELETE` operation, you must also configure `delete.handling.mode=rewrite`.
ifdef::community[]
// Do not include deprecated content in downstream doc
@ -234,7 +233,7 @@ _The `operation.header` option is deprecated and scheduled for removal. Please u
When a Kafka record is flattened the final result won't show whether it was an insert, update or first read
(deletions can be detected via tombstones or rewrites, see link:#options-for-configuring-extractnewrecordstate-transformation[Configuration options]).
To solve this problem Debezium offers an option to propagate the original operation via a header added to the Kafka record.
To solve this problem {prodname} offers an option to propagate the original operation via a header added to the Kafka record.
To enable this feature the option `operation.header` must be set to `true`.
[source]
@ -285,35 +284,39 @@ endif::product[]
The following table describes the options that you can specify for the `ExtractNewRecordState` SMT.
[cols="35%a,10%a,55%a",options="header"]
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[configuration-option-drop-tombstones]]<<configuration-option-drop-tombstones, `drop.tombstones`>>
[id="configuration-option-drop-tombstones"]
|{link-prefix}:{link-event-flattening}#configuration-option-drop-tombstones[`drop.tombstones`]
|`true`
|Debezium generates a tombstone record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
|{prodname} generates a tombstone record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes tombstone records from the stream. To keep tombstone records in the stream, specify `drop.tombstones=false`.
|[[configuration-option-delete-handling-mode]]<<configuration-option-delete-handling-mode, `delete.handling.mode`>>
[id="configuration-option-delete-handling-mode"]
|{link-prefix}:{link-event-flattening}#configuration-option-delete-handling-mode[`delete.handling.mode`]
|`drop`
|Debezium generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
|{prodname} generates a change event record for each `DELETE` operation. The default behavior is that `ExtractNewRecordState` removes these records from the stream. To keep Kafka records for `DELETE` operations in the stream, set `delete.handling.mode` to `none` or `rewrite`. +
+
Specify `none` to keep the change event record in the stream. The record contains only `"value": "null"`. +
+
Specify `rewrite` to keep the change event record in the stream and edit the record to have a `value` field that contains the key/value pairs that were in the `before` field and also add `+__deleted: true+` to the `value`. This is another way to indicate that the record has been deleted. +
+
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the Debezium connector creates.
When you specify `rewrite`, the updated simplified records for `DELETE` operations might be all you need to track deleted records. You can consider accepting the default behavior of dropping the tombstone records that the {prodname} connector creates.
|[[configuration-option-route-by-field]]<<configuration-option-route-by-field, `route.by.field`>>
[id="configuration-option-route-by-field"]
|{link-prefix}:{link-event-flattening}#configuration-option-route-by-field[`route.by.field`]
|
|To use row data to determine the topic to route the record to, set this option to an `after` field attribute. The SMT routes the record to the topic whose name matches the value of the specified `after` field attribute. For a `DELETE` operation, set this option to a `before` field attribute. +
+
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
For example, configuration of `route.by.field=destination` routes records to the topic whose name is the value of `after.destination`. The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. +
+
If you are configuring the `ExtractNewRecordState` SMT on a sink connector, setting this option might be useful when the destination topic name dictates the name of the database table that will be updated with the simplified change event record. If the topic name is not correct for your use case, you can configure `route.by.field` to re-route the event.
|[[configuration-option-add-fields]]<<configuration-option-add-fields, `add.fields`>>
[id="configuration-option-add-fields"]
|{link-prefix}:{link-event-flattening}#configuration-option-add-fields[`add.fields`]
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
@ -321,7 +324,8 @@ When the SMT adds metadata fields to the simplified record, it prefixes each met
+
If you specify a field that is not in the change event record, the SMT adds the field.
|[[configuration-option-add-headers]]<<configuration-option-add-headers, `add.headers`>>
[id="configuration-option-add-headers"]
|{link-prefix}:{link-event-flattening}#configuration-option-add-headers[`add.headers`]
|
|Set this option to a comma-separated list, with no spaces, of metadata fields to add to the header of the simplified Kafka record. When there are duplicate field names, to add metadata for one of those fields, specify the struct as well as the field, for example `source.ts_ms`. +
+
@ -331,14 +335,20 @@ If you specify a field that is not in the change event record, the SMT does not
ifdef::community[]
// Do not include deprecated content in downstream doc
|`operation.header` DEPRECATED
[id="configuration-option-operation.header"]
|{link-prefix}:{link-event-flattening}#configuration-option-operation-header[`operation.header`] +
DEPRECATED
|`false`
|_This option is deprecated and scheduled for removal. Please use add.headers instead. If both add.headers and operation.header are specified, the latter will be ignored._
The SMT adds the event operation (as obtained from the `op` field of the original record) as a Kafka record header.
// Do not include deprecated content in downstream doc
|`add.source.fields` DEPRECATED
[id="configuration-option-add-source-fields"]
|{link-prefix}:{link-event-flattening}#configuration-option-add-source-fields[`add.source.fields`] +
DEPRECATED
|
|_This option is deprecated and scheduled for removal. Please use add.fields instead. If both add.fields and add.source.fields are specified, the latter will be ignored._

View File

@ -4,7 +4,6 @@
ifdef::community[]
[id="topic-routing"]
= Topic Routing
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
:linkattrs:
@ -20,13 +19,13 @@ ifdef::product[]
endif::product[]
Each Kafka record that contains a database change event has a default destination topic. If you need to, you can re-route records to topics that you specify before the records reach the Kafka Connect converter.
To do this, Debezium provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the Debezium connector's Kafka Connect `.properties` file. Configuration options enable you to specify the following:
To do this, {prodname} provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the {prodname} connector's Kafka Connect `.properties` file. Configuration options enable you to specify the following:
* An expression for identifying the records to re-route
* An expression that resolves to the destination topic
* How to ensure a unique key among the records being re-routed to the destination topic
It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from your configuration of the transformation.
It is up to you to ensure that the transformation configuration provides the behavior that you want. {prodname} does not validate the behavior that results from your configuration of the transformation.
The `ByLogicalTableRouter` transformation is a
link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT].
@ -49,7 +48,7 @@ ifdef::product[]
== Use case for routing records to topics that you specify
endif::product[]
The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic.
The default behavior is that a {prodname} connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the {prodname} connector to re-route the records to that topic.
A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: `db_shard1.my_table` and `db_shard2.my_table`. The tables are in different shards and are physically distinct but together they form a logical table.
You can re-route change event records for tables in any of the shards to the same topic.
@ -64,7 +63,7 @@ ifdef::product[]
== Example of routing records for multiple tables to one topic
endif::product[]
To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect `.properties` file for the Debezium connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that determine:
To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect `.properties` file for the {prodname} connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that determine:
* The tables for which to route records. These tables must all have the same schema.
* The destination topic name.
@ -99,7 +98,7 @@ ifdef::product[]
== Ensuring unique keys across records routed to the same topic
endif::product[]
A Debezium change event key uses the table columns that make up the table's primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the `myserver.mydb.customers_shard1` table might have the same key value as a row in the `myserver.mydb.customers_shard2` table.
A {prodname} change event key uses the table columns that make up the table's primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the `myserver.mydb.customers_shard1` table might have the same key value as a row in the `myserver.mydb.customers_shard2` table.
To ensure that each event key is unique across the tables whose change event records go to the same topic, the `ByLogicalTableRouter` transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__physicalTableIdentifier+`. The value of the inserted field is the default destination topic name.
@ -137,3 +136,52 @@ With this configuration, suppose that the default destination topic names are:
`myserver.mydb.customers_shard3`
The transformation uses the values in the second captured group, the shard numbers, as the value of the key's new field. In this example, the inserted key field's values would be `1`, `2`, or `3`.
ifdef::community[]
[[configuration-options]]
== Configuration options
endif::community[]
ifdef::product[]
// Type: reference
[id="options-for-configuring-bylogicaltablerouter-transformation"]
== Options for configuring `ByLogicalTableRouter` transformation
endif::product[]
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
[id="configuration-option-topic-regex"]
|{link-prefix}:{link-topic-routing}#configuration-option-topic-regex[`topic.regex`]
|
|Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic.
[id="configuration-option-topic-replacement"]
|{link-prefix}:{link-topic-routing}configuration-option-topic-replacement#[`topic.replacement`]
|
|Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. This expression can refer to groups captured by the regular expression that you specify for `topic.regex`. To refer to a group, specify `$1`, `$2`, and so on.
[id="configuration-option-key-enforce-uniqueness"]
|{link-prefix}:{link-topic-routing}#configuration-option-key-enforce-uniqueness[`key.enforce.uniqueness`]
|`true`
|Indicates whether to add a field to the record's change event key. Adding a key field ensures that each event key is unique across the tables whose change event records go to the same topic. This helps to prevent collisions of change events for records that have the same key but that originate from different source tables. Specify `false` if you do not want the transformation to add a key field.
[id="configuration-option-key-field-name"]
|{link-prefix}:{link-topic-routing}#configuration-option-key-field-name[`key.field.name`]
|`+__dbz__physicalTableIdentifier+`
|Name of a field to be added to the change event key. The value of this field identifies the original table name. For the SMT to add this field, `key.enforce.uniqueness` must be `true`, which is the default.
[id="configuration-option-key-field-regex"]
|{link-prefix}:{link-topic-routing}#configuration-option-key-field-regex[`key.field.regex`]
|
|Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default.
[id="configuration-option-key-field-replacement"]
|{link-prefix}:{link-topic-routing}#configuration-option-key-field-replacement[`key.field.replacement`]
|
|Specifies a regular expression for determining the value of the inserted key field in terms of the groups captured by the expression specified for `key.field.regex`. For the SMT to apply this expression, `key.enforce.uniqueness` must be `true`, which is the default.
|===