tet123/documentation/modules/ROOT/pages/transformations/applying-transformations-selectively.adoc

196 lines
11 KiB
Plaintext

:page-aliases: configuration/applying-transformations-selectively.adoc
// Category: debezium-using
// Type: assembly
// ModuleID: applying-transformations-selectively-with-smt-predicates
// Title: Applying transformations selectively with SMT predicates
[id="applying-transformations-selectively"]
= Applying transformations selectively
When you configure a single message transformation (SMT) for a connector, you can define a predicate for the transformation.
The predicate specifies how to apply the transformation conditionally to a subset of the messages that the connector processes.
You can assign predicates to transformations that you configure for source connectors, such as {prodname}, or to sink connectors.
// Type: concept
// ModuleID: about-smt-predicates
// Title: About SMT predicates
== SMT predicates
{prodname} provides several single message transformations (SMTs) that you can use to modify event records before Kafka Connect saves the records to Kafka topics.
By default, when you configure one of these SMTs for a {prodname} connector, Kafka Connect applies that transformation to every record that the connector emits.
However, there might be instances in which you want to apply a transformation selectively, so that it modifies only that subset of change event messages that share a common characteristic.
For example, for a {prodname} connector, you might want to run the transformation only on event messages from a specific table or that include a specific header key.
In environments that run Apache Kafka 2.6 or greater, you can append a predicate statement to a transformation to instruct Kafka Connect to apply the SMT only to certain records.
In the predicate, you specify a condition that Kafka Connect uses to evaluate each message that it processes.
When a {prodname} connector emits a change event message, Kafka Connect checks the message against the configured predicate condition.
If the condition is true for the event message, Kafka Connect applies the transformation, and then writes the message to a Kafka topic.
Messages that do not match the condition are sent to Kafka unmodified.
The situation is similar for predicates that you define for a sink connector SMT.
The connector reads messages from a Kafka topic and Kafka Connect evaluates the messages against the predicate condition.
If a message matches the condition, Kafka Connect applies the transformation and then passes the messages to the sink connector.
After you define a predicate, you can reuse it and apply it to multiple transforms.
Predicates also include a `negate` option that you can use to invert a predicate so that the predicate condition is applied only to records that do _not_ match the condition that is defined in the predicate statement.
You can use the `negate` option to pair the predicate with other transforms that are based on negating the condition.
.Predicate elements
Predicates include the following elements:
* `predicates` prefix
* Alias (for example, `isOutboxTable`)
* Type (for example, `org.apache.kafka.connect.transforms.predicates.TopicNameMatches`).
Kafka Connect provides a set of default predicate types, which you can supplement by defining your own custom predicates.
* Condition statement and any additional configuration properties, depending on the type of predicate (for example, a regex naming pattern)
.Default predicate types
The following predicate types are available by default:
HasHeaderKey:: Specifies a key name in the header in the event message that you want Kafka Connect to evaluate.
The predicate evaluates to true for any records that include a header key that has the specified name.
[[record-is-tombstone]]
RecordIsTombstone:: Matches Kafka _tombstone_ records.
The predicate evaluates to `true` for any record that has a `null` value.
Use this predicate in combination with a filter SMT to remove tombstone records.
This predicate has no configuration parameters.
+
A tombstone in Kafka is a record that has a key with a 0-byte, `null` payload.
When a {prodname} connector processes a delete operation in the source database, the connector emits two change events for the delete operation:
* A delete operation (`"op" : "d"`) event that provides the previous value of the database record.
* A tombstone event that has the same key, but a `null` value.
+
The tombstone represents a delete marker for the row.
When link:{link-kafka-docs}#compaction[log compaction] is enabled for Kafka, during compaction Kafka removes all events that share the same key as the tombstone.
Log compaction occurs periodically, with the compaction interval controlled by the link:{link-kafka-docs}#topicconfigs_delete.retention.ms[`delete.retention.ms`] setting for the topic.
+
Although it is possible to xref:ignoring-tombstone-events[configure {prodname} so that it does not emit tombstone events], it's best to permit {prodname} to emit tombstones to maintain the expected behavior during log compaction.
Suppressing tombstones prevents Kafka from removing records for a deleted key during log compaction.
If your environment includes sink connectors that cannot process tombstones, you can configure the sink connector to use an SMT with the `RecordIsTombstone` predicate to filter out the tombstone records.
TopicNameMatches:: A regular expression that specifies the name of a topic that you want Kafka Connect to match.
The predicate is true for connector records in which the topic name matches the specified regular expression.
Use this predicate to apply an SMT to records based on the name of the source table.
.Additional resources
* link:https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs[KIP-585: Filter and Conditional SMTs]
* link:{link-kafka-docs}/#connect_predicates[Apache Kafka documentation for Kafka Connect predicates]
// Type: procedure
[id="defining-smt-predicates"]
== Defining SMT predicates
ifdef::product[]
By default, Kafka Connect applies each single message transformation in the {prodname} connector configuration to every change event record that it receives from {prodname}.
Beginning with Apache Kafka 2.6, you can define an SMT predicate for a transformation in the connector configuration that controls how Kafka Connect applies the transformation.
The predicate statement defines the conditions under which Kafka Connect applies the transformation to event records emitted by {prodname}.
Kafka Connect evaluates the predicate statement and then applies the SMT selectively to the subset of records that match the condition that is defined in the predicate.
endif::product[]
Configuring Kafka Connect predicates is similar to configuring transforms.
You specify a predicate alias, associate the alias with a transform, and then define the type and configuration for the predicate.
.Prerequisites
* The {prodname} environment runs Apache Kafka 2.6 or greater.
* An SMT is configured for the {prodname} connector.
.Procedure
. In the {prodname} connector configuration, specify a predicate alias for the `predicates` parameter, for example, `IsOutboxTable`.
. Associate the predicate alias with the transform that you want to apply conditionally, by appending the predicate alias to the transform alias in the connector configuration:
+
[options="nowrap" subs="+quotes"]
----
transforms._<TRANSFORM_ALIAS>_.predicate=_<PREDICATE_ALIAS>_
----
+
For example:
+
[source,options="nowrap" subs="+quotes"]
----
transforms.outbox.predicate=IsOutboxTable
----
. Configure the predicate by specifying its type and providing values for configuration parameters.
.. For the type, specify one of the following default types that are available in Kafka Connect:
+
* HasHeaderKey
* RecordIsTombstone
* TopicNameMatches
+
For example:
+
[source,options="nowrap" subs="+quotes"]
----
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
----
.. For the TopicNameMatch or `HasHeaderKey` predicates, specify a regular expression for the topic or header name that you want to match.
+
For example:
+
[source]
----
predicates.IsOutboxTable.pattern=outbox.event.*
----
. If you want to negate a condition, append the `negate` keyword to the transform alias and set it to `true`.
+
For example:
+
[source,options="nowrap" subs="+quotes"]
----
transforms.outbox.negate=true
----
+
The preceding property inverts the set of records that the predicate matches, so that Kafka Connect applies the transform to any record that does not match the condition specified in the predicate.
.Example: TopicNameMatch predicate for the outbox event router transformation
The following example shows a {prodname} connector configuration that applies the outbox event router transformation only to messages that {prodname} emits to the Kafka `outbox.event.order` topic.
Because the `TopicNameMatch` predicate evaluates to _true_ only for messages from the outbox table (`outbox.event.*`), the transformation is not applied to messages that originate from other tables in the database.
[source]
----
transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*
----
// Type: procedure
[id="ignoring-tombstone-events"]
== Ignoring tombstone events
You can control whether {prodname} emits tombstone events, and how long Kafka retains them.
Depending on your data pipeline, you might want to set the `tombstones.on.delete` property for a connector so that {prodname} does not emit tombstone events.
Whether you enable {prodname} to emit tombstones depends on how topics are consumed in your environment and by the characteristics of the sink consumer.
Some sink connectors rely on tombstone events to remove records from downstream data stores.
In cases where sink connectors rely on tombstone records to indicate when to delete records in downstream data stores, configure {prodname} to emit them.
When you configure {prodname} to generate tombstones, further configuration is required to ensure that sink connectors receive the tombstone events.
The retention policy for a topic must be set so that the connector has time to read event messages before Kafka removes them during log compaction.
The length of time that a topic retains tombstones before compaction is controlled by the link:{link-kafka-docs}#topicconfigs_delete.retention.ms[`delete.retention.ms`] property for the topic.
By default, the `tombstones.on.delete` property for a connector is set to `true` so that the connector generates a tombstone after each delete event.
If you set the property to `false` to prevent {prodname} from saving tombstone records to Kafka topics, the absence of tombstone records might lead to unintended consequences.
Kafka relies on tombstone during log compaction to remove records that are related to a deleted key.
If you need to support sink connectors or downstream Kafka consumers that cannot process records with null values, rather than preventing {prodname} from emitting tombstones,
consider configuring an SMT for the connector with a predicate that uses the xref:record-is-tombstone[`RecordIsTombstone`] predicate type to remove tombstone messages before consumers read them.
.Procedure
* To prevent {prodname} from emitting tombstone events for deleted database records, set the connector option `tombstones.on.delete` to `false`.
+
For example:
+
[source]
----
“tombstones.on.delete”: “false”
----