// Category: debezium-using // Type: assembly // ModuleID: emitting-debezium-change-event-records-in-cloudevents-format // Title: Emitting {prodname} change event records in CloudEvents format [id="exporting-cloud-events"] = Exporting CloudEvents :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] link:https://cloudevents.io/[CloudEvents] is a specification for describing event data in a common way. Its aim is to provide interoperability across services, platforms and systems. {prodname} enables you to configure a Db2, ifdef::community[] Informix, endif::community[] MongoDB, MySQL, Oracle, PostgreSQL, or SQL Server connector to emit change event records that conform to the CloudEvents specification. ifdef::community[] [NOTE] ==== Support for CloudEvents is in an incubating state. This means that exact semantics, configuration options, and other details may change in future revisions based on feedback. Please let us know your specific requirements or if you encounter any problems while using this feature. ==== endif::community[] ifdef::product[] [IMPORTANT] ==== Emitting change event records in CloudEvents format is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope]. ==== endif::product[] The CloudEvents specification defines: * A set of standardized event attributes * Rules for defining custom attributes * Encoding rules for mapping event formats to serialized representations such as JSON or Apache Avro * Protocol bindings for transport layers such as Apache Kafka, HTTP or AMQP To configure a {prodname} connector to emit change event records that conform to the CloudEvents specification, {prodname} provides the `io.debezium.converters.CloudEventsConverter`, which is a Kafka Connect message converter. Currently, only structured mapping mode can be used. The CloudEvents change event envelope can be JSON or Avro, and you can use JSON or Avro as the `data` format for each envelope type. ifdef::community[] It is expected that a future {prodname} release will support binary mapping mode. endif::community[] ifdef::product[] Information about emitting change events in CloudEvents format is organized as follows: * xref:example-debezium-change-event-records-in-cloudevents-format[] * xref:example-of-configuring-debezium-cloudevents-converter[] * xref:debezium-cloudevents-converter-configuration-options[] endif::product[] For information about using Avro, see: * {link-prefix}:{link-avro-serialization}#avro-serialization[Avro serialization] * link:https://github.com/Apicurio/apicurio-registry[Apicurio Registry] // Type: concept // ModuleID: example-debezium-change-event-records-in-cloudevents-format // Title: Example {prodname} change event records in CloudEvents format == Example event format The following example shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is configured to use JSON as the CloudEvents format envelope and also as the `data` format. [source,json,indent=0,subs="+attributes"] ---- { "id" : "name:test_server;lsn:29274832;txId:565", <1> "source" : "/debezium/postgresql/test_server", <2> "specversion" : "1.0", <3> "type" : "io.debezium.postgresql.datachangeevent", <4> "time" : "2020-01-13T13:55:39.738Z", <5> "datacontenttype" : "application/json", <6> "iodebeziumop" : "r", <7> "iodebeziumversion" : "{debezium-version}", <8> "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578923739738", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumlsn" : "29274832", "iodebeziumxmin" : null, "iodebeziumtxid": "565", <9> "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : { <10> "before" : null, "after" : { "pk" : 1, "name" : "Bob" } } } ---- .Descriptions of fields in a CloudEvents change event record [cols="1,7",options="header",subs="+attributes"] |=== |Item |Description |1 |Unique ID that the connector generates for the change event based on the change event's content. |2 |The source of the event, which is the logical name of the database as specified by the `topic.prefix` property in the connector's configuration. |3 |The CloudEvents specification version. |4 a|Connector type that generated the change event. The format of this field is `io.debezium.__.datachangeevent`. Valid values for `_CONNECTOR_TYPE_` are `db2`, ifdef::community[] `informix`, endif::community[] `mongodb`, `mysql`, `oracle`, `postgresql`, or `sqlserver`. |5 |Time of the change in the source database. |6 |Describes the content type of the `data` attribute. Possible values are `json`, as in this example, or `avro`. |7 |An operation identifier. Possible values are `r` for read, `c` for create, `u` for update, or `d` for delete. |8 |All `source` attributes that are known from {prodname} change events are mapped to CloudEvents extension attributes by using the `iodebezium` prefix for the attribute name. |9 |When enabled in the connector, each `transaction` attribute that is known from {prodname} change events is mapped to a CloudEvents extension attribute by using the `iodebeziumtx` prefix for the attribute name. |10 |The actual data change. Depending on the operation and the connector, the data might contain `before`, `after`, or `patch` fields. |=== The following example also shows what a CloudEvents change event record emitted by a PostgreSQL connector looks like. In this example, the PostgreSQL connector is again configured to use JSON as the CloudEvents format envelope, but this time the connector is configured to use Avro for the `data` format. [source,json,indent=0,subs="+attributes"] ---- { "id" : "name:test_server;lsn:33227720;txId:578", "source" : "/debezium/postgresql/test_server", "specversion" : "1.0", "type" : "io.debezium.postgresql.datachangeevent", "time" : "2020-01-13T14:04:18.597Z", "datacontenttype" : "application/avro", <1> "dataschema" : "http://my-registry/schemas/ids/1", <2> "iodebeziumop" : "r", "iodebeziumversion" : "{debezium-version}", "iodebeziumconnector" : "postgresql", "iodebeziumname" : "test_server", "iodebeziumtsms" : "1578924258597", "iodebeziumsnapshot" : "true", "iodebeziumdb" : "postgres", "iodebeziumschema" : "s1", "iodebeziumtable" : "a", "iodebeziumtxId" : "578", "iodebeziumlsn" : "33227720", "iodebeziumxmin" : null, "iodebeziumtxid": "578", "iodebeziumtxtotalorder": "1", "iodebeziumtxdatacollectionorder": "1", "data" : "AAAAAAEAAgICAg==" <3> } ---- .Descriptions of fields in a CloudEvents event record for a connector that uses Avro to format data [cols="1,7",options="header",subs="+attributes"] |=== |Item |Description |1 |Indicates that the `data` attribute contains Avro binary data. |2 |URI of the schema to which the Avro data adheres. |3 |The `data` attribute contains base64-encoded Avro binary data. |=== It is also possible to use Avro for the envelope as well as the `data` attribute. // Type: concept // ModuleID: example-of-configuring-debezium-cloudevents-converter // Title: Example of configuring {prodname} CloudEvents converter == Example configuration Configure `io.debezium.converters.CloudEventsConverter` in your {prodname} connector configuration. The following example shows how to configure the CloudEvents converter to emit change event records that have the following characteristics: * Use JSON as the envelope. * Use the schema registry at `\http://my-registry/schemas/ids/1` to serialize the `data` attribute as binary Avro data. [source,json,indent=0,subs="+attributes"] ---- ... "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type" : "json", <1> "value.converter.data.serializer.type" : "avro", "value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1" ... ---- <1> Specifying the `serializer.type` is optional, because `json` is the default. The CloudEvents converter converts Kafka record values. In the same connector configuration, you can specify `key.converter` if you want to operate on record keys. For example, you might specify `StringConverter`, `LongConverter`, `JsonConverter`, or `AvroConverter`. // Type: concept // ModuleID: configuration-of-sources-of-metadata-and-some-cloudevents-fields // Title: Configuration of sources of metadata and some CloudEvents fields [[configuration-of-sources-of-metadata-and-some-cloudevents-fields]] == Configuration of sources of metadata and some CloudEvents fields By default, the `metadata.source` property consists of three parts, as seen in the following example: [source,json,indent=0,subs="+attributes"] ---- "value,id:generate,type:generate,dataSchemaName:generate" ---- The first part specifies the source for retrieving a record's metadata; the permitted values are `value` and `header`. The next parts specify how the converter populates values for the following metadata fields: * `id` * `type` * `dataSchemaName` (the name under which the schema is registered in the Schema Registry) The converter can use one of the following methods to populate each field: `generate`:: The converter generates a value for the field. `header`:: The converter obtain values for the field from a message header. === Obtaining record metadata To construct a CloudEvent, the converter requires source, operation, and transaction metadata. Generally, the converter can retrieve the metadata from a record's value. But in some cases, before the converter receives a record, the record might be processed in such a way that metadata is not present in its value, for example, after the record is processed by the Outbox Event Router SMT. To preserve the required metadata, you can use the following approach to pass the metadata in the record headers. .Procedure 1. Implement a mechanism for recording the metadata in the record's headers before the record reaches the converter, for example, by using the `HeaderFrom` SMT. 2. Set the value of the converter's `metadata.source` property to `header`. The following example shows the configuration for a connector that uses the Outbox Event Router SMT, and the `HeaderFrom` SMT: [source,json,indent=0,subs="+attributes"] ---- ... "tombstones.on.delete": false, "transforms": "addMetadataHeaders,outbox", "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.addMetadataHeaders.fields": "source,op,transaction", "transforms.addMetadataHeaders.headers": "source,op,transaction", "transforms.addMetadataHeaders.operation": "copy", "transforms.addMetadataHeaders.predicate": "isHeartbeat", "transforms.addMetadataHeaders.negate": true, "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.expand.json.payload": true, "transforms.outbox.table.fields.additional.placement": "type:header", "predicates": "isHeartbeat", "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*", "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.metadata.source": "header", "header.converter": "org.apache.kafka.connect.json.JsonConverter", "header.converter.schemas.enable": true ... ---- NOTE: To use the `HeaderFrom` transformation, it might be necessary to filter tombstone and heartbeat messages. The `header` value of the `metadata.source` property is a global setting. As a result, even if you omit parts of a property's value, such as the `id` and `type` sources, the converter generates `header` values for the omitted parts. === Obtaining CloudEvent metadata By default, the CloudEvents converter automatically generates values for the `id` and `type` fields of a CloudEvent, and generates the schema name for its `data` field. You can customize the way that the converter populates these fields by changing the defaults and specifying the fields' values in the appropriate headers. For example: [source,json,indent=0,subs="+attributes"] ---- "value.converter.metadata.source": "value,id:header,type:header,dataSchemaName:header" ---- With the preceding configuration in effect, you could configure upstream functions to add `id` and `type` headers with the values that you want to pass to the CloudEvents converter. If you want to provide values only for `id` header, use: [source,json,indent=0,subs="+attributes"] ---- "value.converter.metadata.source": "value,id:header,type:generate,dataSchemaName:generate" ---- To configure the converter to obtain `id`, `type`, and `dataSchemaName` metadata from headers, use the following short syntax: [source,json,indent=0,subs="+attributes"] ---- "value.converter.metadata.source": "header" ---- To enable the converter to retrieve the data schema name from a header field, you must set xref:cloud-events-converter-schema-data-name-source-header-enable[`schema.data.name.source.header.enable`] to `true`. // Type: reference // ModuleID: debezium-cloudevents-converter-configuration-options // Title: {prodname} CloudEvents converter configuration options [[cloud-events-converter-configuration-options]] == Configuration options When you configure a {prodname} connector to use the CloudEvent converter you can specify the following options. .Descriptions of CloudEvents converter configuration options [cols="30%a,25%a,45%a",subs="+attributes"] |=== |Option |Default |Description |[[cloud-events-converter-serializer-type]]xref:cloud-events-converter-serializer-type[`serializer.type`] |`json` |The encoding type to use for the CloudEvents envelope structure. The value can be `json` or `avro`. |[[cloud-events-converter-data-serializer-type]]xref:cloud-events-converter-data-serializer-type[`data.serializer.type`] |`json` |The encoding type to use for the `data` attribute. The value can be `json` or `avro`. |[[cloud-events-converter-json]]xref:cloud-events-converter-json[`json. \...`] |N/A |Any configuration options to be passed through to the underlying converter when using JSON. The `json.` prefix is removed. |[[cloud-events-converter-avro]]xref:cloud-events-converter-avro[`avro. \...`] |N/A |Any configuration options to be passed through to the underlying converter when using Avro. The `avro.` prefix is removed. For example, for Avro `data`, you would specify the `avro.schema.registry.url` option. |[[cloud-events-converter-schema-name-adjustment-mode]]xref:cloud-events-converter-schema-name-adjustment-mode[`schema.name.adjustment.mode`] |none |Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The value can be `none` or `avro`. |[[cloud-events-converter-schema-cloudevents-name]]xref:cloud-events-converter-schema-cloudevents-name[`schema.cloudevents.name`] |none |Specifies CloudEvents schema name under which the schema is registered in a Schema Registry. The setting is ignored when `serializer.type` is `json` in which case a record's value is schemaless. If this property is not specified, the default algorithm is used to generate the schema name: `pass:[${serverName}.${databaseName}].CloudEvents.Envelope`. |[[cloud-events-converter-schema-data-name-source-header-enable]]xref:cloud-events-converter-schema-data-name-source-header-enable[`schema.data.name.source.header.enable`] |false |Specifies whether the converter can retrieve the schema name of the CloudEvents `data` field from a header. The schema name is obtained from the `dataSchemaName` parameter that is specified in the xref:cloud-events-converter-metadata-source[`metadata.source`] property. |[[cloud-events-converter-extension-attributes-enable]]xref:cloud-events-converter-extension-attributes-enable[`extension.attributes.enable`] |`true` |Specifies whether the converter includes extension attributes when it generates a cloud event. The value can be `true` or `false`. |[[cloud-events-converter-metadata-source]]xref:cloud-events-converter-metadata-source[`metadata.source`] |`value,id:generate,type:generate,dataSchemaName:generate` |A comma-separated list that specifies the sources from which the converter retrieves metadata values (source, operation, transaction) for CloudEvent `id` and `type` fields, and for the `dataSchemaName` parameter, which specifies the name under which the schema is registered in a Schema Registry. The first element in the list is a global setting that specifies the source of the metadata. The source of metadata can be `value` or `header`. The global setting is followed by a set of pairs. The first element in each pair specifies the name of a CloudEvent field (`id` or `type`), or the name of a data schema (`dataSchemaName`). The second element in the pair specifies how the converter populates the value of the field. Valid values are `generate` or `header`. Separate the values in each pair with a colon, for example: `value,id:header,type:generate,dataSchemaName:header` For configuration examples, see xref:configuration-of-sources-of-metadata-and-some-cloudevents-fields[Configuration of sources of metadata and some CloudEvents fields]. |===