tet123/documentation/modules/ROOT/pages/transformations/outbox-event-router.adoc
2021-08-26 09:42:49 +02:00

390 lines
21 KiB
Plaintext

:page-aliases: configuration/outbox-event-router.adoc
// Category: debezium-using
// Type: assembly
// ModuleID: configuring-debezium-connectors-to-use-the-outbox-pattern
// Title: Configuring {prodname} connectors to use the outbox pattern
[id="outbox-event-router"]
= Outbox Event Router
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
The outbox pattern is a way to safely and reliably exchange data between multiple (micro) services. An outbox pattern implementation avoids inconsistencies between a service's internal state (as typically persisted in its database) and state in events consumed by services that need the same data.
To implement the outbox pattern in a {prodname} application, configure a {prodname} connector to:
* Capture changes in an outbox table
* Apply the {prodname} outbox event router single message transformation (SMT)
A {prodname} connector that is configured to apply the outbox SMT should capture changes that occur in an outbox table only.
For more information, see {link-prefix}:{link-outbox-event-router}#options-for-applying-the-transformation-selectively[Options for applying the transformation selectively].
A connector can capture changes in more than one outbox table only if each outbox table has the same structure.
See link:https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/[Reliable Microservices Data Exchange With the Outbox Pattern] to learn about why the outbox pattern is useful and how it works.
ifdef::community[]
For an example that you can run, see the link:https://github.com/debezium/debezium-examples/tree/master/outbox[outbox pattern demo], which is in the {prodname} examples repository. It includes an example of how to configure a {prodname} connector to run the outbox event router SMT.
endif::community[]
[NOTE]
====
The outbox event router SMT does *not* support the MongoDB connector.
====
ifdef::product[]
The following topics provide details:
* xref:example-of-a-debezium-outbox-message[]
* xref:outbox-table-structure-expected-by-debezium-outbox-event-router-smt[]
* xref:basic-debezium-outbox-event-router-smt-configuration[]
* xref:using-avro-as-the-payload-format-in-debezium-outbox-messages[]
* xref:emitting-additional-fields-in-debezium-outbox-messages[]
* xref:options-for-configuring-outbox-event-router-transformation[]
endif::product[]
// Type: concept
// ModuleID: example-of-a-debezium-outbox-message
// Title: Example of a {prodname} outbox message
[[example-outbox-message]]
== Example outbox message
To learn about how to configure the {prodname} outbox event router SMT, consider the following example of a {prodname} outbox message:
[source,javascript,indent=0]
----
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
----
A {prodname} connector that is configured to apply the outbox event router SMT generates the above message by transforming a {prodname} raw message like this:
[source,javascript,indent=0,subs="attributes"]
----
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"before": null,
"after": {
"id": "406c07f3-26f0-4eea-a50c-109940064b8f",
"aggregateid": "1",
"aggregatetype": "Order",
"payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
"timestamp": 1556890294344,
"type": "OrderCreated"
},
"source": {
"version": "{debezium-version}",
"connector": "postgresql",
"name": "dbserver1-bare",
"db": "orderdb",
"ts_usec": 1556890294448870,
"txId": 584,
"lsn": 24064704,
"schema": "inventory",
"table": "outboxevent",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "c",
"ts_ms": 1556890294484
}
----
This example of a {prodname} outbox message is based on the {link-prefix}:{link-outbox-event-router}#outbox-event-router-configuration-options[default outbox event router configuration], which assumes an outbox table structure and event routing based on aggregates. To customize behavior, the outbox event router SMT provides numerous {link-prefix}:{link-outbox-event-router}#outbox-event-router-configuration-options[configuration options].
// Type: concept
// Title: Outbox table structure expected by {prodname} outbox event router SMT
// ModuleID: outbox-table-structure-expected-by-debezium-outbox-event-router-smt
[[basic-outbox-table]]
== Basic outbox table
To apply the default outbox event router SMT configuration, your outbox table is assumed to have the following columns:
[source]
----
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb |
----
.Descriptions of expected outbox table columns
[cols="30%a,70%a",options="header"]
|===
|Column
|Effect
|`id`
|Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages. +
+
To obtain the unique ID of the event from a different outbox table column, set the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-table-field-event-id[`table.field.event.id` SMT option] in the connector configuration.
|[[route-by-field-example]]`aggregatetype`
|Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default `pass:[${routedByValue}]` variable in the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-route-topic-replacement[`route.topic.replacement`] SMT option. +
+
For example, in a default configuration, the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-route-by-field[`route.by.field`] SMT option is set to `aggregatetype` and the xref:outbox-event-router-property-route-topic-replacement[`route.topic.replacement`] SMT option is set to `outbox.event.pass:[${routedByValue}]`.
Suppose that your application adds two records to the outbox table. In the first record, the value in the `aggregatetype` column is `customers`.
In the second record, the value in the `aggregatetype` column is `orders`.
The connector emits the first record to the `outbox.event.customers` topic.
The connector emits the second record to the `outbox.event.orders` topic. +
+
To obtain this value from a different outbox table column, set the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-route-by-field[`route.by.field` SMT option] in the connector configuration.
|`aggregateid`
|Contains the event key, which provides an ID for the payload.
The SMT uses this value as the key in the emitted outbox message.
This is important for maintaining correct order in Kafka partitions. +
+
To obtain the event key from a different outbox table column, set the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-table-field-event-key[`table.field.event.key` SMT option] in the connector configuration.
|`payload`
|A representation of the outbox change event.
The default structure is JSON.
By default, the Kafka message value is solely comprised of the `payload` value.
However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately.
For more information, see xref:emitting-messages-with-additional-fields[Emitting messages with additional fields]. +
+
To obtain the event payload from a different outbox table column, set the {link-prefix}:{link-outbox-event-router}#outbox-event-router-property-table-field-event-payload[`table.field.event.payload` SMT option] in the connector configuration.
|Additional custom columns
|Any additional columns from the outbox table can be {link-prefix}:{link-outbox-event-router}#outbox-event-router-emitting-messages-with-additional-fields[added to outbox events] either within the payload section or as a message header. +
+
One example could be a column `eventType` which conveys a user-defined value that helps to categorize or organize events.
|===
// Type: concept
// Title: Basic {prodname} outbox event router SMT configuration
// ModuleID: basic-debezium-outbox-event-router-smt-configuration
[[basic-outbox-configuration]]
== Basic configuration
To configure a {prodname} connector to support the outbox pattern, configure the `outbox.EventRouter` SMT. For example, the basic configuration in a `.properties` file looks like this:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
----
// Type: concept
// Title: Options for applying the Outbox event router transformation selectively
// ModuleID: options-for-applying-the-outbox-event-router-transformation-selectively
[id="options-for-applying-the-transformation-selectively"]
== Options for applying the transformation selectively
In addition to the change event messages that a {prodname} connector emits when a database change occurs, the connector also emits other types of messages, including heartbeat messages, and metadata messages about schema changes and transactions.
Because the structure of these other messages differs from the structure of the change event messages that the SMT is designed to process, it's best to configure the connector to selectively apply the SMT, so that it processes only the intended data change messages.
You can use one of the following methods to configure the connector to apply the SMT selectively:
* {link-prefix}:{link-smt-predicates}#applying-transformation-selectively[Configure an SMT predicate for the transformation].
* Use the xref:outbox-event-router-property-route-topic-regex[route.topic.regex] configuration option for the SMT.
ifdef::community[]
// Type: concept
// Title: Using Avro as the payload format in {prodname} outbox messages
// ModuleID: using-avro-as-the-payload-format-in-debezium-outbox-messages
[[avro-as-payload-format]]
== Using Avro as the payload format
The outbox event router SMT supports arbitrary payload formats. The `payload` column value in an outbox table is passed on transparently. An alternative to working with JSON is to use Avro.
This can be beneficial for message format governance and for ensuring that outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro formatted content for outbox message payloads is out of the scope of this documentation.
One possibility is to leverage the `KafkaAvroSerializer` class to serialize `GenericRecord` instances.
To ensure that the Kafka message value is the exact Avro binary data,
apply the following configuration to the connector:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter
----
By default, the `payload` column value (the Avro data) is the only message value.
Configuration of `ByteBufferConverter` as the value converter propagates the `payload` column value as-is into the Kafka message value.
The {prodname} connectors may be configured to emit heartbeat, transaction metadata, or schema change events (support varies by connector).
These events cannot be serialized by the `ByteBufferConverter` so additional configuration must be provided so the converter knows how to serialize these events.
As an example, the following configuration illustrates using the Apache Kafka `JsonConverter` with no schemas:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
----
The delegate `Converter` implementation is specified by the `delegate.converter.type` option.
If any extra configuration options are needed by the converter, they can also be specified, such as the disablement of schemas shown above using `schemas.enable=false`.
endif::community[]
// Type: concept
// Title: Emitting additional fields in {prodname} outbox messages
// ModuleID: emitting-additional-fields-in-debezium-outbox-messages
[[emitting-messages-with-additional-fields]]
== Emitting messages with additional fields
Your outbox table might contain columns whose values you want to add to the emitted outbox messages. For example, consider an outbox table that has a value of `purchase-order` in the `aggregatetype` column and another column, `eventType`, whose possible values are `order-created` and `order-shipped`.
To emit the `eventType` column value in the outbox message header, configure the SMT like this:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:header:eventType
----
To emit the `eventType` column value in the outbox message envelope, configure the SMT like this:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:envelope:eventType
----
// Type: reference
// ModuleID: options-for-configuring-outbox-event-router-transformation
// Title: Options for configuring outbox event router transformation
[[outbox-event-router-configuration-options]]
== Configuration options
The following table describes the options that you can specify for the outbox event router SMT. In the table, the *Group* column indicates a configuration option classification for Kafka.
.Descriptions of outbox event router SMT configuration options
[cols="30%a,20%a,10%a,40%a",options="header"]
|===
|Option
|Default
|Group
|Description
|[[outbox-event-router-property-table-field-event-id]]<<outbox-event-router-property-table-field-event-id, `table.field.event.id`>>
|`id`
|Table
|Specifies the outbox table column that contains the unique event ID.
|[[outbox-event-router-property-table-field-event-key]]<<outbox-event-router-property-table-field-event-key, `table.field.event.key`>>
|`aggregateid`
|Table
|Specifies the outbox table column that contains the event key. When this column contains a value, the SMT uses that value as the key in the emitted outbox message. This is important for maintaining correct order in Kafka partitions.
|[[outbox-event-router-property-table-field-event-timestamp]]<<outbox-event-router-property-table-field-event-timestamp, `table.field.event.timestamp`>>
|
|Table
|By default, the timestamp in the emitted outbox message is the {prodname} event timestamp. To use a different timestamp in outbox messages, set this option to an outbox table column that contains the timestamp that you want to be in emitted outbox messages.
|[[outbox-event-router-property-table-field-event-payload]]<<outbox-event-router-property-table-field-event-payload, `table.field.event.payload`>>
|`payload`
|Table
|Specifies the outbox table column that contains the event payload.
|[[outbox-event-router-property-table-field-event-payload-id]]<<outbox-event-router-property-table-field-event-payload-id, `table.field.event.payload.id`>>
|`aggregateid`
|Table
|Specifies the outbox table column that contains the payload ID.
|[[outbox-event-router-property-table-fields-additional-placement]]<<outbox-event-router-property-table-fields-additional-placement, `table.fields.additional.placement`>>
|
|Table, Envelope
a|Specifies one or more outbox table columns that you want to add to outbox message headers or envelopes. Specify a comma-separated list of pairs. In each pair, specify the name of a column and whether you want the value to be in the header or the envelope. Separate the values in the pair with a colon, for example:
`id:header,my-field:envelope`
To specify an alias for the column, specify a trio with the alias as the third value, for example:
`id:header,my-field:envelope:my-alias`
The second value is the placement and it must always be `header` or `envelope`.
Configuration examples are in {link-prefix}:{link-outbox-event-router}#emitting-messages-with-additional-fields[emitting additional fields in {prodname} outbox messages].
|[[outbox-event-router-property-table-field-event-schema-version]]<<outbox-event-router-property-table-field-event-schema-version, `table.field.event.schema.version`>>
|
|Table, Schema
|When set, this value is used as the schema version as described in the link:https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/ConnectSchema.html#version--[Kafka Connect Schema] Javadoc.
|[[outbox-event-router-property-route-by-field]]<<outbox-event-router-property-route-by-field, `route.by.field`>>
|`aggregatetype`
|Router
|Specifies the name of a column in the outbox table. The default behavior is that the value in this column becomes a part of the name of the topic to which the connector emits the outbox messages. An example is in the {link-prefix}:{link-outbox-event-router}#route-by-field-example[description of the expected outbox table].
|[[outbox-event-router-property-route-topic-regex]]<<outbox-event-router-property-route-topic-regex, `route.topic.regex`>>
|`(?<routedByValue>.*)`
|Router
|Specifies a regular expression that the outbox SMT applies in the RegexRouter to outbox table records. This regular expression is part of the setting of the xref:outbox-event-router-property-route-topic-replacement[`route.topic.replacement`] SMT option. +
+
The default behavior is that the SMT replaces the default `pass:[${routedByValue}]` variable in the setting of the `route.topic.replacement` SMT option with the setting of the xref:outbox-event-router-property-route-by-field[`route.by.field`] outbox SMT option.
|[[outbox-event-router-property-route-topic-replacement]]<<outbox-event-router-property-route-topic-replacement, `route.topic.replacement`>>
|`outbox.event{zwsp}.pass:[${routedByValue}]`
|Router
a|Specifies the name of the topic to which the connector emits outbox messages.
The default topic name is `outbox.event.` followed by the `aggregatetype` column value in the outbox table record. For example, if the `aggregatetype` value is `customers`, the topic name is `outbox.event.customers`. +
+
To change the topic name, you can: +
* Set the xref:outbox-event-router-property-route-by-field[`route.by.field`] option to a different column.
* Set the xref:outbox-event-router-property-route-topic-regex[route.topic.regex] option to a different regular expression.
|[[outbox-event-router-property-route-tombstone-on-empty-payload]]<<outbox-event-router-property-route-tombstone-on-empty-payload, `route.tombstone.on.empty.payload`>>
|`false`
|Router
|Indicates whether an empty or `null` payload causes the connector to emit a tombstone event.
|[[outbox-event-router-property-debezium-op-invalid-behavior]]<<outbox-event-router-property-debezium-op-invalid-behavior, `debezium.op.invalid.behavior`>>
|`warn`
|{prodname}
a|Determines the behavior of the SMT when there is an `UPDATE` operation on the outbox table. Possible settings are:
* `warn` - The SMT logs a warning and continues to the next outbox table record.
* `error` - The SMT logs an error and continues to the next outbox table record.
* `fatal` - The SMT logs an error and the connector stops processing.
All changes in an outbox table are expected to be `INSERT` operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed.
The SMT automatically filters out `DELETE` operations on an outbox table.
ifdef::community[]
|[[outbox-event-router-property-tracing-span-context-field]]<<outbox-event-router-property-tracing-span-context-field, `tracing.span.context.field`>>
|`tracingspancontext`
|Tracing
|The name of the field containing tracing span context.
|[[outbox-event-router-property-tracing-operation-name]]<<outbox-event-router-property-tracing-operation-name, `tracing.operation.name`>>
|`debezium-read`
|Tracing
|The operation name representing the Debezium processing span.
|[[outbox-event-router-property-tracing-with-context-field-only]]<<outbox-event-router-property-tracing-with-context-field-only, `tracing.with.context.field.only`>>
|`false`
|Tracing
|When `true` only events that have serialized context field should be traced.
endif::community[]
|===
ifdef::community[]
== Distributed tracing
The outbox event routing SMT has support for distributed tracing.
See link:/documentation/reference/integrations/tracing[tracing documentation] for more details.
endif::community[]