tet123/documentation/modules/ROOT/pages/configuration/outbox-event-router.adoc

241 lines
10 KiB
Plaintext
Raw Normal View History

= Outbox Event Router
include::../_attributes.adoc[]
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
[NOTE]
====
This single message transformation (SMT) is under active development right now, so the emitted message structure or other details may still change as development progresses.
====
[NOTE]
====
This SMT does *not* support the MongoDB connector.
====
The Outbox Event Router SMT has the intent to provide out-of-the-box support for the Outbox Pattern.
For more information on why and how the Outbox Pattern is used please refer our blog post link:/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/[Reliable Microservices Data Exchange With the Outbox Pattern].
For more implementation details please refer to the {jira-url}/browse/DBZ-1169[Outbox SMT Design].
A working example can also be found at our https://github.com/debezium/debezium-examples[Examples Repository] in the https://github.com/debezium/debezium-examples/tree/master/outbox[`outbox` directory].
== Example
In order to understand the configuration and terms used in this SMT, let's look into its parts with the given expected outbox message:
[source,javascript,indent=0]
----
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
----
This message is generated by transforming a Debezium raw message, which looks like:
[source,javascript,indent=0]
----
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"before": null,
"after": {
"id": "406c07f3-26f0-4eea-a50c-109940064b8f",
"aggregateid": "1",
"aggregatetype": "Order",
"payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
"timestamp": 1556890294344,
"type": "OrderCreated"
},
"source": {
"version": "0.9.3.Final",
"connector": "postgresql",
"name": "dbserver1-bare",
"db": "orderdb",
"ts_usec": 1556890294448870,
"txId": 584,
"lsn": 24064704,
"schema": "inventory",
"table": "outboxevent",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "c",
"ts_ms": 1556890294484
}
----
This result was achieved with the {link-prefix}:{link-outbox-event-router}#outbox-event-router-configuration-options[default configuration] which assumes a table structure and event routing based on aggregates. In case you want a custom behavior, the SMT is fully configurable, check the {link-prefix}:{link-outbox-event-router}#outbox-event-router-configuration-options[available configuration options].
== Configuration
[[outbox-event-router-configuration-options]]
=== Configuration options
[cols="30%a,10%a,10%a,50%a",options="header"]
|=======================
|Property
|Default
|Group
|Description
|[[outbox-event-router-property-table-field-event-id]]<<outbox-event-router-property-table-field-event-id, `table.field.event.id`>>
|`id`
|Table
|The column which contains the event ID within the outbox table
|[[outbox-event-router-property-table-field-event-key]]<<outbox-event-router-property-table-field-event-key, `table.field.event.key`>>
|`aggregateid`
|Table
|The column which contains the event key within the outbox table; when this is set the value of this column will be used as a Kafka message key
|[[outbox-event-router-property-table-field-event-timestamp]]<<outbox-event-router-property-table-field-event-timestamp, `table.field.event.timestamp`>>
|
|Table
|Optionally you can override the Kafka message timestamp with a value from a chosen field, otherwise it'll be the Debezium event processed timestamp.
|[[outbox-event-router-property-table-field-event-payload]]<<outbox-event-router-property-table-field-event-payload, `table.field.event.payload`>>
|`payload`
|Table
|The column which contains the event payload within the outbox table
|[[outbox-event-router-property-table-field-event-payload-id]]<<outbox-event-router-property-table-field-event-payload-id, `table.field.event.payload.id`>>
|`aggregateid`
|Table
|The column which contains the payload ID within the outbox table
|[[outbox-event-router-property-table-fields-additional-placement]]<<outbox-event-router-property-table-fields-additional-placement, `table.fields.additional.placement`>>
|
|Table, Envelope
2020-05-04 17:45:57 +02:00
|Extra fields can be added as part of the event envelope or as message header; the format is a list of colon-delimited pairs or trios when you desire to have aliases, e.g. `id:header,field_name:envelope:alias`. Placement options are `header` and `envelope`.
|[[outbox-event-router-property-table-field-event-schema-version]]<<outbox-event-router-property-table-field-event-schema-version, `table.field.event.schema.version`>>
|
|Table, Schema
|When set, it'll be used as schema version as in the https://kafka.apache.org/20/javadoc/org/apache/kafka/connect/data/ConnectSchema.html#version--[Kafka Connect Schema] javadoc
|[[outbox-event-router-property-route-by-field]]<<outbox-event-router-property-route-by-field, `route.by.field`>>
|`aggregatetype`
|Router
|The column which determines how the events will be routed, the value will become part of the topic name
|[[outbox-event-router-property-route-topic-regex]]<<outbox-event-router-property-route-topic-regex, `route.topic.regex`>>
|`(?<routedByValue>.*)`
|Router
|The default regex to use within the RegexRouter, the default capture will allow to replace the routed field into a new topic name defined in `route.topic.replacement`
|[[outbox-event-router-property-route-topic-replacement]]<<outbox-event-router-property-route-topic-replacement, `route.topic.replacement`>>
|`outbox.event{zwsp}.pass:[${routedByValue}]`
|Router
|The name of the topic in which the events will be routed, a replacement `pass:[${routedByValue}]` is available which is the value of The column configured via `route.by.field`
|[[outbox-event-router-property-route-tombstone-on-empty-payload]]<<outbox-event-router-property-route-tombstone-on-empty-payload, `route.tombstone.on.empty.payload`>>
|`false`
|Router
|Whether or not an empty or `null` payload should cause a tombstone event.
|[[outbox-event-router-property-debezium-op-invalid-behavior]]<<outbox-event-router-property-debezium-op-invalid-behavior, `debezium.op.invalid.behavior`>>
|`warn`
|Debezium
|While Debezium is monitoring the table, it's not expecting to see 'update' row events, in case it happens, this transform can log it as warning, error or stop the process. Options are `warn`, `error` and `fatal`
|=======================
=== Default table columns
[source]
----
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb |
----
=== Default configuration values explained
After observing all those pieces we can see what the default configuration does:
[cols="30%a,70%a",options="header"]
|=======================
|Table Column
|Effect
|`id`
|The `id` shows up as a header in the Kafka message, this is the unique ID of the event, can be used for consumer side dedupe for instance.
|`aggregatetype`
|Is the default field for the routing, it gets append to the topic name (check configuration <<outbox-event-router-property-route-topic-replacement, `route.topic.replacement`>>)
|`aggregateid`
|Becomes the Kafka message key, which is important for keeping ordering within Kafka partitions
|`payload`
|The JSON representation of the event itself, becomes either part of the message as `payload` or if other metadata including `eventType` are delivered as headers then the payload becomes the message itself without an encapsulation in an envelope
|=======================
=== Basic configuration
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
----
2019-11-08 10:18:43 +01:00
=== Using Avro as the payload format
The outbox routing SMT supports arbitrary payload formats, as the payload column value is passed on transparently.
As an alternative to working with JSON as shown above it's therefore also possible to use Avro.
This can be beneficial for the purposes of message format governance and making sure outbox event schemas evolve in a backwards-compatible way.
How a source application produces Avro messages as an outbox event payload is out of the scope of this documentation.
One possibility could be to leverage the `KafkaAvroSerializer` class and use it to serialize `GenericRecord` instances.
In order to ensure that the Kafka message value is the exact Avro binary data,
apply the following configuration to the connector:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.ByteBufferConverter
----
By default only the `payload` column value (the Avro data) is the sole message value.
Using `ByteBufferConverter` as the value converter propagates that value as-is into the Kafka message value.
2020-05-05 18:21:44 +02:00
[[emitting-messages-with-additional-fields]]
2020-05-04 17:45:57 +02:00
=== Emitting messages with additional fields
2020-05-04 17:45:57 +02:00
The outbox table may contain additional columns that need to be included in the emitted Kafka message.
Suppose the table contains an `eventType` column that indicates the type a given event is (e.g. for events of the "purchase order" aggregate type, there might be event types like "order created", "order shipped", etc.).
2020-05-04 17:45:57 +02:00
To emit the `eventType` column in the Kafka message headers, use:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:header:eventType
----
2020-05-04 17:45:57 +02:00
To emit the `eventType` column in the Kafka message envelope, use:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:envelope:eventType
----