185 lines
9.7 KiB
Plaintext
185 lines
9.7 KiB
Plaintext
// Category: debezium-using
|
|
// Type: assembly
|
|
// ModuleID: routing-change-event-records-to-topics-according-to-event-content
|
|
// Title: Routing change event records to topics according to event content
|
|
[id="content-based-routing"]
|
|
= Content-based routing
|
|
ifdef::community[]
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
endif::community[]
|
|
By default, {prodname} streams all of the change events that it reads from a table to a single static topic.
|
|
However, there might be situations in which you might want to reroute selected events to other topics, based on the event content.
|
|
The process of routing messages based on their content is described in the https://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html[Content-based routing] messaging pattern.
|
|
To apply this pattern in {prodname}, you use the content-based routing SMT to write expressions that are evaluated for each event.
|
|
Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.
|
|
|
|
Kafka Connect provides its own link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[Simple Message Transforms] (SMT) to encode routing logic.
|
|
However, the following drawbacks apply to using the generic SMT:
|
|
|
|
* It is necessary to compile the transformation up front and deploy it to Kafka Connect.
|
|
* Every change needs code recompilation and redeployment, leading to inflexible operations.
|
|
|
|
The content-based routing SMT supports scripting languages that integrate with https://jcp.org/en/jsr/detail?id=223[JSR 223] (Scripting for the Java(TM) Platform).
|
|
|
|
// Type: concept
|
|
// ModuleID: example-debezium-basic-content-based-routing-configuration
|
|
// Title: Example: {prodname} basic content-based routing configuration
|
|
[[example-basic-content-based-routing-configuration]]
|
|
== Example: Basic configuration
|
|
|
|
To configure a {prodname} connector to route change event records based on the event content, you configure the `ContentBasedRouter` SMT in the Kafka Connect configuration for the connector.
|
|
|
|
Configuration of the content-based routing SMT requires you to specify a regular expression that defines the filtering criteria.
|
|
In the configuration, you create a regular expression that defines routing criteria.
|
|
The expression defines a pattern for evaluating event records.
|
|
It also specifies the name of a destination topic where events that match the pattern are routed.
|
|
The pattern that you specify might designate an event type, such as a table insert, update, or delete operation.
|
|
You might also define a pattern that matches a value in a specific column or row.
|
|
|
|
For example, to reroute all update (`u`) records to an `updates` topic, you might add the following configuration to your connector configuration:
|
|
|
|
[source]
|
|
----
|
|
...
|
|
transforms=route
|
|
transforms.route.type=io.debezium.transforms.ContentBasedRouter
|
|
transforms.route.language=jsr223.groovy
|
|
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
|
|
...
|
|
----
|
|
|
|
The preceding example specifies the use of the `Groovy` expression language.
|
|
|
|
Records that do not match the pattern are routed to the default topic.
|
|
|
|
[IMPORTANT]
|
|
====
|
|
Although the {prodname} package includes the JSR 223 API, which provides support for scripting languages that integrate with JSR 223, by default, {prodname} cannot interpret scripts from these languages.
|
|
To use an expression language, you must download the JSR-223 script engine JAR for the language, and add the JAR to your classpath, along any other JAR files used by the language implementation.
|
|
For Groovy 3, you can download the JAR from https://groovy-lang.org/. For GraalVM JavaScript, the JAR is available at https://github.com/graalvm/graaljs.
|
|
|
|
Typically, you add the language JAR to the directory that contains the {prodname} connector JAR files, which is the directory that is referenced in the `plugin.path` configuration property that is set for the Kafka Connect plugin.
|
|
If you install the script engine JAR to a different directory, you must add that directory to the list in the `plugin.path` property.
|
|
====
|
|
|
|
// Type: concept
|
|
// ModuleID: variables-for-use-in-debezium-content-based-routing-expressions
|
|
//Title: Variables for use in {prodname} content-based routing expressions
|
|
== Variables for use in content-based routing expressions
|
|
|
|
{prodname} binds certain variables into the evaluation context for the SMT.
|
|
When you create expressions to specify conditions to control the routing destination,
|
|
the SMT can look up and interpret the values of these variables to evaluate conditions in an expression.
|
|
|
|
The following table lists the variables that {prodname} binds into the evaluation context for the content-based routing SMT:
|
|
|
|
.Content-based routing expression variables
|
|
[options="header"]
|
|
|=======================
|
|
|Name |Description |Type
|
|
|`key` |A key of the message. |`org.apache.kafka.connect.data.Struct`
|
|
|`value` |A value of the message. |`org.apache.kafka.connect.data.Struct`
|
|
|`keySchema` |Schema of the message key.|`org.apache.kafka.connect.data.Schema`
|
|
|`valueSchema`|Schema of the message value.| `org.apache.kafka.connect.data.Schema`
|
|
|`topic`|Name of the target topic.| String
|
|
|`headers`
|
|
a|A Java map of message headers. The key field is the header name.
|
|
The `headers` variable exposes the following properties:
|
|
|
|
* `value` (of type `Object`)
|
|
|
|
* `schema` (of type `org.apache.kafka.connect.data.Schema`)
|
|
|
|
| `java.util.Map<String, io.debezium.transforms.scripting.RecordHeader>`
|
|
|=======================
|
|
|
|
An expression can invoke arbitrary methods on its variables.
|
|
Expressions should resolve to a Boolean value that determines how the SMT dispositions the message.
|
|
When the routing condition in an expression evaluates to `true`, the message is retained.
|
|
When the routing condition evaluates to `false`, the message is removed.
|
|
|
|
Expressions should not result in any side-effects. That is, they should not modify any variables that they pass.
|
|
|
|
// Type: reference
|
|
// ModuleID: configuration-of-content-based-routing-conditions-for-other-scripting-languages
|
|
// Title: Configuration of content-based routing conditions for other scripting languages
|
|
== Language specifics
|
|
|
|
The way that you express content-based routing conditions depends on the scripting language that you use.
|
|
For example, as shown in {link-prefix}:{link-content-based-routing}#example-basic-content-based-routing-configuration[], when you use `Groovy` as the expression language,
|
|
the following expression reroutes all update (`u`) records to the `updates` topic, while routing other records to the default topic:
|
|
|
|
[source,groovy]
|
|
----
|
|
value.op == 'u' ? 'updates' : null
|
|
----
|
|
|
|
Other languages use different methods to express the same condition.
|
|
|
|
[TIP]
|
|
====
|
|
The {prodname} MongoDB connector emits the `after` and `patch` fields as serialized JSON documents rather than as structures.
|
|
To use the ContentBasedRouting SMT with the MongoDB connector, you must first unwind the fields by applying the {link-prefix}:{link-mongodb-event-flattening}[`ExtractNewDocumentState`] SMT.
|
|
|
|
You could also take the approach of using a JSON parser within the expression.
|
|
For example, if you use Groovy as the expression language, add the `groovy-json` artifact to the classpath, and then add an expression such as `(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'`.
|
|
====
|
|
|
|
.Javascript
|
|
When you use JavaScript as the expression language, you can call the `Struct#get()` method to specify the content-based routing condition, as in the following example:
|
|
|
|
[source,javascript]
|
|
----
|
|
value.get('op') == 'u' ? 'updates' : null
|
|
----
|
|
|
|
.Javascript with Graal.js
|
|
When you create coentent-based routing conditions by using JavaScript with Graal.js, you use an approach that is similar to the one use with Groovy.
|
|
For example:
|
|
|
|
[source,javascript]
|
|
----
|
|
value.op == 'u' ? 'updates' : null
|
|
----
|
|
|
|
|
|
// Type: reference
|
|
// ModuleID: options-for-configuring-the-content-based-routing-transformation
|
|
// Title: Options for configuring the content-based routing transformation
|
|
[[content-based-router-configuration-options]]
|
|
== Configuration options
|
|
[cols="30%a,25%a,45%a"]
|
|
|===
|
|
|Property
|
|
|Default
|
|
|Description
|
|
|
|
|[[content-based-router-topic-regex]]<<content-based-router-topic-regex, `topic.regex`>>
|
|
|
|
|
|An optional regular expression that evaluates the name of the destination topic for an event to determine whether to apply the condition logic.
|
|
If the name of the destination topic matches the value in `topic.regex`, the transformation applies the condition logic before it passes the event to the topic.
|
|
If the name of the topic does not match the value in `topic.regex`, the SMT passes the event to the topic unmodified.
|
|
|
|
|[[content-based-router-language]]<<content-based-router-language, `language`>>
|
|
|
|
|
|The language in which the expression is written. Must begin with `jsr223.`, e.g. `jsr223.groovy`, or `jsr223.graal.js`. {prodname} supports bootstrapping through the https://jcp.org/en/jsr/detail?id=223[JSR 223 API ("Scripting for the Java (TM) Platform")] only.
|
|
|
|
|[[content-based-router-topic-expression]]<<content-based-router-topic-expression, `topic.expression`>>
|
|
|
|
|
|The expression to be evaluated for every message. Must evaluate to a `String` value where a result of non-null reroutes the message to a new topic, and a `null` value routes the message to the default topic.
|
|
|
|
|[[content-based-router-null-handling-mode]]<<content-based-router-null-handling-mode, `null.handling.mode`>>
|
|
|`keep`
|
|
a|Specifies how the transformation handles `null` (tombstone) messages. You can specify one of the following options:
|
|
|
|
`keep`:: (Default) Pass the messages through.
|
|
`drop`:: Remove the messages completely.
|
|
`evaluate`:: Apply the condition logic to the messages.
|
|
|===
|