// Category: cdc-using // Type: assembly // :community: :product: ifdef::community[] [id="topic-routing"] = Topic Routing include::../_attributes.adoc[] :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] endif::community[] ifdef::product[] [id="routing-change-event-records-to-topics-that-you-specify"] = Routing change event records to topics that you specify endif::product[] Before Kafka records that contain database change events reach the Kafka Connect converter, you can re-route them to topics that you choose. To do this, Debezium provides the `ByLogicalTableRouter` single message transformation (SMT). Configure this transformation in the Debezium connector's Kafka Connect `.properties` file. Configuration options enable you to specify the following: * An expression for identifying the records to route * The destination topic * How to ensure a unique key among the records being routed to the topic It is up to you to ensure that the transformation configuration provides the behavior that you want. Debezium does not validate the behavior that results from the way that you configure the transformation. The `ByLogicalTableRouter` transformation is a link:https://kafka.apache.org/documentation/#connect_transforms[Kafka Connect SMT]. ifdef::product[] The following topics provide details: * xref:use-case-for-routing-records-to-topics-that-you-specify[] * xref:example-of-routing-records-for-multiple-tables-to-one-topic[] * xref:ensuring-unique-keys-across-records-routed-to-the-same-topic[] endif::product[] ifdef::community[] = Use case endif::community[] ifdef::product[] // Type: concept [id="use-case-for-routing-records-to-topics-that-you-specify"] == Use case for routing records to topics that you specify endif::product[] The default behavior is that a Debezium connector sends each change event record to a topic whose name is formed from the name of the database and the name of the table in which the change was made. In other words, a topic receives records for one physical table. When you want a topic to receive records for more than one physical table, you must configure the Debezium connector to re-route the records to that topic. A logical table is a common use case for routing records for multiple physical tables to one topic. In a logical table, there are multiple physical tables that all have the same schema. For example, sharded tables have the same schema. A logical table might consist of two or more sharded tables: `db_shard1.my_table` and `db_shard2.my_table`. The tables are in different shards and are physically distinct but together they form a logical table. You can re-route change event records for tables in any of the shards to the same topic. ifdef::community[] = Example endif::community[] ifdef::product[] // Type: concept [id="example-of-routing-records-for-multiple-tables-to-one-topic"] == Example of routing records for multiple tables to one topic endif::product[] To route change event records for multiple physical tables to the same topic, configure the `ByLogicalTableRouter` transformation in the Kafka Connect `.properties` file for the Debezium connector. Configuration of the `ByLogicalTableRouter` SMT requires you to specify regular expressions that identify: * The tables for which to route records. These tables must all have the same schema. * The destination topic name. For example: [source] ---- transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards ---- `topic.regex`:: Specifies a regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic. + In the example, the regular expression, `(.*)customers_shard(.*)` matches records for changes to tables whose names include the `customers_shard` string. This would re-route records for tables with the following names: + `myserver.mydb.customers_shard1` + `myserver.mydb.customers_shard2` + `myserver.mydb.customers_shard3` `topic.replacement`:: Specifies a regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression. In this example, records for the three sharded tables listed above would be routed to the `myserver.mydb.customers_all_shards` topic. ifdef::community[] = Ensure unique key endif::community[] ifdef::product[] // Type: procedure [id="ensuring-unique-keys-across-records-routed-to-the-same-topic"] == Ensuring unique keys across records routed to the same topic endif::product[] A Debezium change event key uses the table columns that make up the table's primary key. To route records for multiple physical tables to one topic, the event key must be unique across all of those tables. However, it is possible for each physical table to have a primary key that is unique within only that table. For example, a row in the `myserver.mydb.customers_shard1` table might have the same key value as a row in the `myserver.mydb.customers_shard2` table. To ensure that each event key is unique across the tables whose change event records go to the same topic, the `ByLogicalTableRouter` transformation inserts a field into change event keys. By default, the name of the inserted field is `+__dbz__+` followed by the default destination topic name. If you want to, you can configure the `ByLogicalTableRouter` transformation to insert a different field into the key. To do this, specify the `key.field.name` option and set it to a field name that does not clash with existing primary key field names. For example: [source] ---- transforms=Reroute transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter transforms.Reroute.topic.regex=(.*)customers_shard(.*) transforms.Reroute.topic.replacement=$1customers_all_shards transforms.Reroute.key.field.name=shard_id ---- This example adds the `shard_id` field to the key structure in routed records. If you want to adjust the value of key's new field, configure both of these options: `key.field.regex`:: Specifies a regular expression that the transformation applies to the default destination topic name to capture one or more groups of characters. `key.field.replacement`:: Specifies a regular expression for the value of the inserted key field in terms of those captured groups. For example: [source] ---- transforms.Reroute.key.field.regex=(.*)customers_shard(.*) transforms.Reroute.key.field.replacement=$2 ---- With this configuration, suppose that the default destination topic names are: `myserver.mydb.customers_shard1` + `myserver.mydb.customers_shard2` + `myserver.mydb.customers_shard3` The transformation uses the values in the second captured group, the shard numbers, as the value of the key's new field. In this example, the inserted key field's values would be `1`, `2`, or `3`.