tet123/documentation/modules/ROOT/pages/configuration/mongodb-event-flattening.adoc

313 lines
12 KiB
Plaintext

[id="mongodb-new-document-state-extraction"]
= MongoDB New Document State Extraction
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
[NOTE]
====
This single message transformation (SMT) is under active development right now, so the emitted message structure or other details may still change as development progresses.
Please see below for a descriptions of known limitations of this transformation.
====
[NOTE]
====
This SMT is supported only for the MongoDB connector.
See {link-prefix}:{link-event-flattening}[Extracting source record `after` state from {prodname} change events] for the relational database equivalent to this SMT.
====
The Debezium MongoDB connector generates the data in a form of a complex message structure.
The message consists of two parts:
* operation and metadata
* for inserts, the whole data after the insert has been executed; for updates a patch element describing the altered fields
The `after` and `patch` elements are Strings containing JSON representations of the inserted/altered data.
E.g. the general message structure for a insert event looks like this:
[source,json,indent=0]
----
{
"op": "r",
"after": "{\"field1\":newvalue1,\"field2\":\"newvalue1\"}",
"source": { ... }
}
----
More details about the message structure are provided in {link-prefix}:{link-mongodb-connector}[the documentation] of the MongoDB connector.
While this structure is a good fit to represent changes to MongoDB's schemaless collections,
it's not understood by existing sink connectors as for instance the Confluent JDBC sink connector.
Therefore Debezium provides a {link-kafka-docs}/#connect_transforms[a single message transformation] (SMT)
which converts the `after`/`patch` information from the MongoDB CDC events into a structure suitable for consumption by existing sink connectors.
To do so, the SMT parses the JSON strings and reconstructs properly typed Kafka Connect
(comprising the correct message payload and schema) records from that,
which then can be consumed by connectors such as the JDBC sink connector.
Using JSON as visualization of the emitted record structure, the event from above would like this:
[source,json,indent=0]
----
{
"field1" : "newvalue1",
"field2" : "newvalue2"
}
----
The SMT should be applied on a sink connector.
== Configuration
The configuration is a part of sink task connector and is expressed in a set of properties:
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=drop
transforms.unwrap.operation.header=true
----
=== Array encoding
The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema.
The problem is that such arrays must contains elements of the same time.
MongoDB allows the user to store elements of heterogeneous types into the same array.
To bypass this impedance mismatch it is possible to encode the array in two different ways using `array.encoding` configuration option.
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>
----
Value `array` (the default) will encode arrays as the array datatype.
It is user's responsibility to ensure that all elements for a given array instance are of the same time.
This option is a restricting one but offers easy processing of arrays by downstream clients.
Value `document` will convert the array into a *struct* of *structs* in the similar way as done by http://bsonspec.org/[BSON serialization].
The main *struct* contains fields named `_0`, `_1`, `_2` etc. where the name represents the index of the element in the array.
Every element is then passed as the value for the give field.
Let's suppose an example source MongoDB document with array with heterogeneous types
[source,json,indent=0]
----
{
"_id": 1,
"a1": [
{
"a": 1,
"b": "none"
},
{
"a": "c",
"d": "something"
}
]
}
----
This document will be encoded as
[source,json,indent=0]
----
{
"_id": 1,
"a1": {
"_0": {
"a": 1,
"b": "none"
},
"_1": {
"a": "c",
"d": "something"
}
}
}
----
This option allows you to process arbitrary arrays but the consumer need to know how to properly handle them.
_Note: The underscore in index names is present because Avro encoding requires field names not to start with digit._
=== Nested structure flattening
When a MongoDB document contains a nested document (structure) it is faithfully encoded as a nested structure field.
If the sink connector does support only flat structure it is possible to flatten the internal structure into a flat one with a consistent field naming.
To enable this feature the option `flatten.struct` must be set to `true`.
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>
----
The resulting flat document will consist of fields whose names are created by joining the name of the parent field and the name of the fields in the nested document.
Those elements are separated with string defined by an option `struct.delimiter` by default set to the _underscore_.
Let's suppose an example source MongoDB document with a field with a nested document
[source,json,indent=0]
----
{
"_id": 1,
"a": {
"b": 1,
"c": "none"
},
"d": 100
}
----
Such document will be encoded as
[source,json,indent=0]
----
{
"_id": 1,
"a_c": 1,
"a_d": "none",
"d": 100
}
----
This option allows you to convert a hierarchical document into a flat structure suitable for a table-like storage.
=== MongoDB `$unset` handling
MongoDB allows you to make `$unset` operations which allows you to remove a certain field from a Document, and since the collections are schemaless it becomes hard to find a way to tell the consumers/sinkers which a field is now missing, the approach debezium uses is to set the desired to remove field to null value.
Given the operation
[source,json,indent=0]
----
{
"after":null,
"patch":"{\"$unset\" : {\"a\" : true}}"
}
----
The final encoding will look like
[source,json,indent=0]
----
{
"id": 1,
"a": null
}
----
Note that other mongo operations might cause an `$unset` internally, `$rename` is one example.
=== Determine original operation
When a message is flattened the final result won't show whether it was an insert, update or first read (Deletions can be detected via tombstones or rewrites, see {link-prefix}:{link-mongodb-event-flattening}#mongodb-extract-new-record-state-configuration-options[Configuration options]).
To solve this problem Debezium offers an option to propagate the original operation via a header added to the message.
To enable this feature the option `operation.header` must be set to `true`.
[source]
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.operation.header=true
----
The possible values are the ones from the `op` field of {link-prefix}:{link-mongodb-connector}#mongodb-change-events-value[MongoDB connector change events].
=== Adding source metadata fields
The SMT can optionally add metadata fields from the original change event's `source` structure to the final flattened record (prefixed with "__").
This functionality can be used to add things like the collection from the change event, or connector-specific fields like the replica set name.
For more information on what's available in the source structure see {link-prefix}:{link-mongodb-connector}[the documentation] for the MongoDB connector.
For example, the configuration
----
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.source.fields=rs,collection
----
will add
----
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
----
to the final flattened record.
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
[[mongodb-extract-new-record-state-configuration-options]]
== Configuration options
[cols="35%a,10%a,55%a"]
|===
|Property |Default |Description
|[[mongodb-extract-new-record-state-array-encoding]]<<mongodb-extract-new-record-state-array-encoding, `array.encoding`>>
|`array`
|The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema.
|[[mongodb-extract-new-record-state-flatten-struct]]<<mongodb-extract-new-record-state-flatten-struct, `flatten.struct`>>
|`false`
|The SMT flattens structs by concatenating the fields into plain properties, using a configurable delimiter.
|[[mongodb-extract-new-record-state-flatten-struct-delimiter]]<<mongodb-extract-new-record-state-flatten-struct-delimiter, `flatten.struct.delimiter`>>
|`_`
|Delimiter to concat between field names from the input record when generating field names for the output record. Only applies when `flatten.struct` is set to `true`
ifdef::community[]
|[[mongodb-extract-new-record-state-operation-header]]<<mongodb-extract-new-record-state-operation-header, `operation.header`>>
|`false`
|The SMT adds the {link-prefix}:{link-mongodb-connector}#mongodb-change-events-value[event operation] as a message header. +
This is deprecated as of Debezium 1.2, please use <<mongodb-extract-new-record-state-add-headers, `add.headers`>> and <<mongodb-extract-new-record-state-add-fields, `add.fields`>> instead.
endif::community[]
|[[mongodb-extract-new-record-state-drop-tombstones]]<<mongodb-extract-new-record-state-drop-tombstones, `drop.tombstones`>>
|`true`
|The SMT removes the tombstone generated by Debezium from the stream.
|[[mongodb-extract-new-record-state-delete-handling-mode]]<<mongodb-extract-new-record-state-delete-handling-mode, `delete.handling.mode`>>
|`drop`
|The SMT can `drop`, `rewrite` or pass delete records (`none`). The `rewrite` mode will add a `__deleted` field set to `true` or `false` depending on the represented operation.
ifdef::community[]
|[[mongodb-extract-new-record-state-add-source-fields]]<<mongodb-extract-new-record-state-add-source-fields, `add.source.fields`>> +
|
|Fields from the change event's `source` structure to add as metadata (prefixed with "__") to the flattened record. +
This is deprecated as of Debezium 1.2, please use <<mongodb-extract-new-record-state-add-headers, `add.headers`>> and <<mongodb-extract-new-record-state-add-fields, `add.fields`>> instead.
endif::community[]
|[[mongodb-extract-new-record-state-add-headers]]<<mongodb-extract-new-record-state-add-headers, `add.headers`>>
|
|Specify a list of metadata fields to add to header of the flattened message.
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms").
The fields will be prefixed with "__" or "__<struct>__", depending on the specification of the struct.
Please use a comma separated list without spaces.
|[[mongodb-extract-new-record-state-add-fields]]<<mongodb-extract-new-record-state-add-fields, `add.fields`>>
|
|Specify a list of metadata fields to add to the flattened message.
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms").
The fields will be prefixed with "__" or "__<struct>__", depending on the specification of the struct.
Please use a comma separated list without spaces.
|[[mongodb-extract-new-record-state-sanitize-field-names]]<<mongodb-extract-new-record-state-sanitize-field-names, `sanitize.field.names`>>
|`false`
|Whether field names will be sanitized to adhere to Avro naming requirements.
See {link-prefix}:{link-avro-serialization}#avro-naming[Avro naming] for more details.
|===
== Known limitations
* Feeding data changes from a schemaless store such as MongoDB to strictly schema-based datastores such as a relational database can by definition work within certain limits only.
Specifically, all fields of documents within one collection with the same name must be of the same type. Otherwise, no consistent column definition can be derived in the target database.
* Arrays will be restored in the emitted Kafka Connect record correctly, but they are not supported by sink connector just expecting a "flat" message structure.