286 lines
12 KiB
Plaintext
286 lines
12 KiB
Plaintext
:page-aliases: configuration/partition-routing.adoc
|
|
// Category: debezium-using
|
|
// Type: assembly
|
|
// ModuleID: routing-records-to-partitions-based-on-payload-fields
|
|
// Title: Routing records to partitions based on payload fields
|
|
[id="partition-routing"]
|
|
= Partition Routing
|
|
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
By default, when {prodname} detects a change in a data collection, the change event that it emits is sent to a topic that uses a single Apache Kafka partition.
|
|
As described in {link-prefix}:{link-topic-auto-creation}#customizing-debezium-automatically-created-topics[Customization of Kafka Connect automatic topic creation], you can customize the default configuration to route events to multiple partitions, based on a hash of the primary key.
|
|
|
|
However, in some cases, you might also want {prodname} to route events to a specific topic partition.
|
|
The partition routing SMT enables you to route events to specific destination partitions based on the values of one or more specified payload fields.
|
|
To calculate the destination partition, {prodname} uses a hash of the specified field values.
|
|
|
|
// Type: concept
|
|
// Title: Example: Basic configuration of the {prodname} partition routing SMT
|
|
// ModuleID: basic-configuration-of-the-debezium-partition-routing-smt
|
|
[[example-basic-partition-routing-configuration-example]]
|
|
== Example: Basic configuration
|
|
|
|
You configure the partition routing transformation in the {prodname} connector's Kafka Connect configuration.
|
|
The configuration specifies the following parameters:
|
|
|
|
`partition.payload.field`:: Specifies the fields in the event payload that the SMT uses to calculate the destination partition.
|
|
You can use dot notation to specify nested payload fields.
|
|
`partition.topic.num`:: Specifies the number of partitions in the destination topic.
|
|
`partition.hash.function`:: Specifies hash function to be used hash of the fields which would determine number of the destination partition.
|
|
|
|
By default, {prodname} routes all change event records for a configured data collection to a single Apache Kafka topic.
|
|
Connectors do not direct event records to specific partitions in the topic.
|
|
|
|
To configure a {prodname} connector to route events to a specific partition, configure the `PartitionRouting` SMT in the Kafka Connect configuration for the {prodname} connector.
|
|
|
|
For example, you might add the following configuration in your connector configuration.
|
|
|
|
[source]
|
|
----
|
|
...
|
|
topic.creation.default.partitions=2
|
|
topic.creation.default.replication.factor=1
|
|
...
|
|
|
|
topic.prefix=fulfillment
|
|
transforms=PartitionRouting
|
|
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
|
|
transforms.PartitionRouting.partition.payload.fields=change.name
|
|
transforms.PartitionRouting.partition.topic.num=2
|
|
transforms.PartitionRouting.predicate=allTopic
|
|
predicates=allTopic
|
|
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
|
|
predicates.allTopic.pattern=fulfillment.*
|
|
...
|
|
----
|
|
|
|
Based on the preceding configuration, whenever the SMT receives a message that is bound for a topic with a name that begin with the prefix, `fulfillment`, it redirects the message to a specific topic partition.
|
|
|
|
The SMT computes the target partition from a hash of the value of the `name` field in the message payload.
|
|
By specifying the`allTopic` predicate, the configuration selectively applies the SMT.
|
|
The `change` prefix is a special keyword that enables the SMT to automatically refer to elements in the payload that describe the `before` or `after` states of the data.
|
|
If a specified field is not present in the event message, the SMT ignores it.
|
|
If none of the fields exist in the message, then the transformation ignores the event message entirely, and delivers the original version of the message to the default destination topic.
|
|
The number of partitions specified by the `topic.num` setting in the SMT configuration must match the number of partitions specified by the Kafka Connect configuration.
|
|
For example, in the preceding configuration example, the value specified by the Kafka Connect property `topic.creation.default.partitions` matches the `topic.num` value in the SMT configuration.
|
|
|
|
Given this `Products` table
|
|
|
|
.Products table
|
|
[cols="25%a,25%a,25%a,25%a"]
|
|
|===
|
|
|id
|
|
|name
|
|
|description
|
|
|weight
|
|
|
|
|101
|
|
|scooter
|
|
|Small 2-wheel scooter
|
|
| 3.14
|
|
|
|
|102
|
|
|car battery
|
|
|12V car battery
|
|
| 8.1
|
|
|103
|
|
|12-pack drill bits
|
|
|12-pack of drill bits with sizes ranging from #40 to #3
|
|
| 0.8
|
|
|104
|
|
|hammer
|
|
|12oz carpenter's hammer
|
|
| 0.75
|
|
|105
|
|
|hammer
|
|
|14oz carpenter's hammer
|
|
| 0.875
|
|
|106
|
|
|hammer
|
|
|16oz carpenter's hammer
|
|
| 1.0
|
|
|107
|
|
|rocks
|
|
|box of assorted rocks
|
|
| 5.3
|
|
|108
|
|
|jacket
|
|
|water resistent black wind breaker
|
|
| 0.1
|
|
|109
|
|
|spare tire
|
|
|24 inch spare tire
|
|
| 22.2
|
|
|===
|
|
|
|
Based on the configuration, the SMT routes change events for the records that have the field name `hammer` to the same partition.
|
|
That is, the items with `id` values `104`, `105`, and `106` are routed to the same partition.
|
|
|
|
// Type: concept
|
|
// Title: Example: Advanced configuration of the {prodname} partition routing SMT
|
|
// ModuleID: advanced-configuration-of-the-debezium-partition-routing-smt
|
|
[[example-advanced-partition-routing-configuration-example]]
|
|
== Example: Advanced configuration
|
|
|
|
Suppose that you want to route events from two data collections (t1, t2) to the same topic (for example, my_topic), and you want to partition events from data collection t1 by using field f1,
|
|
and partition events from data collection t2 by using field f2.
|
|
|
|
You could apply the following configuration:
|
|
|
|
[source]
|
|
----
|
|
transforms=PartitionRouting
|
|
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
|
|
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
|
|
transforms.PartitionRouting.partition.topic.num=2
|
|
transforms.PartitionRouting.predicate=myTopic
|
|
|
|
predicates=myTopic
|
|
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
|
|
predicates.myTopic.pattern=my_topic
|
|
----
|
|
|
|
The preceding configuration does not specify how to re-route events so that they are sent to a specific destination topic.
|
|
For information about how to send events to topics other than their default destination topics, see the {link-prefix}:{link-topic-routing}[Topic Routing SMT]., see the {link-prefix}:{link-topic-routing}[Topic Routing SMT].
|
|
|
|
// Type: concept
|
|
// Title: Migrating from the {prodname} ComputePartition SMT
|
|
// ModuleID: migrate-debezium-compute-partition-smt
|
|
[[migrate-debezium-compute-partition-smt]]
|
|
== Migrating from the {prodname} ComputePartition SMT
|
|
|
|
The {prodname} `ComputePartition` SMT has been discontinued.
|
|
The information in the following section describes how migrate from the `ComputePartition` SMT to the new `PartitionRouting` SMT.
|
|
|
|
Assuming that the configuration sets the same number of partitions for all topics, replace the following `ComputePartition`configuration with the `PartitionRouting` SMT.
|
|
The following examples provide a comparison of the two configuration.
|
|
|
|
.Example: Legacy `ComputePartition` configuration
|
|
[source]
|
|
----
|
|
...
|
|
topic.creation.default.partitions=2
|
|
topic.creation.default.replication.factor=1
|
|
...
|
|
topic.prefix=fulfillment
|
|
transforms=ComputePartition
|
|
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
|
|
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
|
|
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
|
|
...
|
|
----
|
|
|
|
Replace the preceding `ComputePartition` with the following `PartitionRouting` configuration.
|
|
Example: `PartitionRouting` configuration that replaces the earlier `ComputePartition` configuration
|
|
[source]
|
|
----
|
|
...
|
|
topic.creation.default.partitions=2
|
|
topic.creation.default.replication.factor=1
|
|
...
|
|
|
|
topic.prefix=fulfillment
|
|
transforms=PartitionRouting
|
|
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
|
|
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
|
|
transforms.PartitionRouting.partition.topic.num=2
|
|
transforms.PartitionRouting.predicate=allTopic
|
|
predicates=allTopic
|
|
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
|
|
predicates.allTopic.pattern=fulfillment.*
|
|
...
|
|
----
|
|
|
|
If the SMT emits events to topics that do not share the same number of partitions, you must specify unique `partition.num.mappings` values for each topic.
|
|
For example, in the following example, the topic for the legacy `products` collection is configured with 3 partitions, and the topic for the `orders` data collection is configured with 2 partitions:
|
|
|
|
.Example: Legacy `ComputePartition` configuration that sets unique partition values for different topics
|
|
|
|
[source]
|
|
----
|
|
...
|
|
topic.prefix=fulfillment
|
|
transforms=ComputePartition
|
|
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
|
|
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
|
|
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
|
|
...
|
|
----
|
|
|
|
Replace the preceding `ComputePartition` configuration with the following `PartitionRouting` configuration:
|
|
.`PartitionRouting` configuration that sets unique `partition.topic.num` values for different topics
|
|
[source]
|
|
----
|
|
...
|
|
topic.prefix=fulfillment
|
|
|
|
transforms=ProductsPartitionRouting,OrdersPartitionRouting
|
|
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
|
|
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
|
|
transforms.ProductsPartitionRouting.partition.topic.num=3
|
|
transforms.ProductsPartitionRouting.predicate=products
|
|
|
|
transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
|
|
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
|
|
transforms.OrdersPartitionRouting.partition.topic.num=2
|
|
transforms.OrdersPartitionRouting.predicate=products
|
|
|
|
predicates=products,orders
|
|
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
|
|
predicates.products.pattern=fulfillment.inventory.products
|
|
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
|
|
predicates.orders.pattern=fulfillment.inventory.orders
|
|
...
|
|
----
|
|
|
|
// Type: reference
|
|
// ModuleID: options-for-configuring-the-partition-routing-transformation
|
|
// Title: Options for configuring the partition routing transformation
|
|
[[partition-routing-configuration-options]]
|
|
== Configuration options
|
|
|
|
The following table lists the configuration options that you can set for the partition routing SMT.
|
|
|
|
.Partition routing SMT (`PartitionRouting`) configuration options
|
|
[cols="30%a,25%a,45%a"]
|
|
|===
|
|
|Property
|
|
|Default
|
|
|Description
|
|
|
|
|[[partition-routing-payload-fields]]<<partition-routing-payload-fields, `partition.payload.fields`>>
|
|
|
|
|
|Specifies the fields in the event payload that the SMT uses to calculate the target partition.
|
|
Use dot notation if you want the SMT to add fields from the original payload to specific levels in the output data structure.
|
|
To access fields related to data collections, you can use: `after`, `before`, or `change`.
|
|
The 'change' field is a special field that results in the SMT automatically populating content in the 'after' or 'before' elements, depending on type of operation.
|
|
If a specified field is not present in a record, the SMT skips it.
|
|
For example, `after.name,source.table,change.name`
|
|
|
|
|[[partition-routing-partition-topic-num]]<<partition-routing-partition-topic-num, `partition.topic.num`>>
|
|
|
|
|
|The number of partitions for the topic on which this SMT acts.
|
|
Use the `TopicNameMatches` predicate to filter records by topic.
|
|
|
|
|[[partition-routing-partition-hash-function]]<<partition-routing-partition-hash-function, `partition.hash.function`>>
|
|
| `java`
|
|
|Hash function to be used when computing hash of the fields which would determine number of the destination partition.
|
|
Possible values are: +
|
|
+
|
|
`java` - standard Java `Object::hashCode` function +
|
|
+
|
|
`murmur` - latest version of http://en.wikipedia.org/wiki/MurmurHash[MurmurHash] function, MurmurHash3 +
|
|
+
|
|
This configuration is optional.
|
|
If not specified or invalid value is used, the default value will be used.
|
|
|
|
|
|
|
|===
|