The process of routing messages based on their content is described in the https://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html[Content-based routing] messaging pattern.
To apply this pattern in {prodname}, you use the content-based routing link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transform] (SMT) to write expressions that are evaluated for each event.
Depending how an event is evaluated, the SMT either routes the event message to the original destination topic, or reroutes it to the topic that you specify in the expression.
The content-based routing SMT is under active development. The structure of the emitted message or other details might change as development progresses.
====
endif::community[]
ifdef::product[]
[IMPORTANT]
====
The {prodname} content-based routing SMT is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope].
The content-based routing SMT supports scripting languages that integrate with https://jcp.org/en/jsr/detail?id=223[JSR 223] (Scripting for the Java(TM) Platform).
The JSR223 implementation for GraalVM JavaScript is available at https://github.com/graalvm/graaljs.
After you obtain the script engine files, you add them to your {prodname} connector plug-in directories, along any other JAR files used by the language implementation.
endif::community[]
ifdef::product[]
Depending on the method that you use to deploy {prodname}, you can automatically download the required artifacts from Maven Central,
or you can manually download the artifacts, and then add them to your {prodname} connector plug-in directories, along any other JAR files used by the language implementation.
If you deploy the {prodname} connector by building a custom Kafka Connect container image from a Dockerfile, to use the filter SMT, you must explicitly add the SMT artifact to your Kafka Connect environment.
When you use {StreamsName} to deploy the connector, it can download the required artifacts automatically based on configuration parameters that you specify in the Kafka Connect custom resource.
endif::product[]
ifdef::community[]
To use the content-based routing SMT with a {prodname} connector plug-in, you must explicitly add the SMT artifact to your Kafka Connect environment.
IMPORTANT: After the routing SMT is present in a Kafka Connect instance, any user who is allowed to add a connector to the instance can run scripting expressions.
To ensure that scripting expressions can be run only by authorized users, be sure to secure the Kafka Connect instance and its configuration interface before you add the routing SMT.
With https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], {link-kafka-docs}.html#connect[Kafka Connect], and one or more {prodname} connectors installed, the remaining tasks to install the filter SMT are:
. Download the link:https://repo1.maven.org/maven2/io/debezium/debezium-scripting/{debezium-version}/debezium-scripting-{debezium-version}.tar.gz[scripting SMT archive]
. Extract the contents of the archive into the {prodname} plug-in directories of your Kafka Connect environment.
. From a browser, open the link:{LinkDebeziumDownloads}[{NameDebeziumDownloads}], and download the {prodname} scripting SMT archive (`debezium-scripting-{debezium-version}.tar.gz`).
To configure a {prodname} connector to route change event records based on the event content, you configure the `ContentBasedRouter` SMT in the Kafka Connect configuration for the connector.
The preceding example shows a simple SMT configuration that is designed to process only DML events, which contain an `op` field.
Other types of messages that a connector might emit (heartbeat messages, tombstone messages, or metadata messages about transactions or schema changes) do not contain this field.
To avoid processing failures, you can define xref:options-for-applying-the-transformation-selectively[an SMT predicate statement that selectively applies the transformation] to specific events only.
== Options for applying the transformation selectively
In addition to the change event messages that a {prodname} connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions.
Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it's best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
You can use one of the following methods to configure the connector to apply the SMT selectively:
For example, as shown in the xref:{link-content-based-routing}#example-basic-content-based-routing-configuration[basic configuration example], when you use `Groovy` as the expression language,
For example, if you use Groovy as the expression language, add the `groovy-json` artifact to the classpath, and then add an expression such as `(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar'`.
When you use JavaScript as the expression language, you can call the `Struct#get()` method to specify the content-based routing condition, as in the following example:
If the name of the destination topic matches the value in `topic.regex`, the transformation applies the condition logic before it passes the event to the topic.
If the name of the topic does not match the value in `topic.regex`, the SMT passes the event to the topic unmodified.
|The language in which the expression is written. Must begin with `jsr223.`, for example, `jsr223.groovy`, or `jsr223.graal.js`. {prodname} supports bootstrapping through the https://jcp.org/en/jsr/detail?id=223[JSR 223 API ("Scripting for the Java (TM) Platform")] only.
|The expression to be evaluated for every message. Must evaluate to a `String` value where a result of non-null reroutes the message to a new topic, and a `null` value routes the message to the default topic.