312 lines
12 KiB
Plaintext
312 lines
12 KiB
Plaintext
= MongoDB New Document State Extraction
|
|
include::../_attributes.adoc[]
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
[NOTE]
|
|
====
|
|
This single message transformation (SMT) is under active development right now, so the emitted message structure or other details may still change as development progresses.
|
|
Please see below for a descriptions of known limitations of this transformation.
|
|
====
|
|
|
|
[NOTE]
|
|
====
|
|
This SMT is supported only for the MongoDB connector.
|
|
See xref:configuration/event-flattening.adoc[here] for the relational database equivalent to this SMT.
|
|
====
|
|
|
|
The Debezium MongoDB connector generates the data in a form of a complex message structure.
|
|
The message consists of two parts:
|
|
|
|
* operation and metadata
|
|
* for inserts, the whole data after the insert has been executed; for updates a patch element describing the altered fields
|
|
|
|
The `after` and `patch` elements are Strings containing JSON representations of the inserted/altered data.
|
|
E.g. the general message structure for a insert event looks like this:
|
|
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"op": "r",
|
|
"after": "{\"field1\":newvalue1,\"field2\":\"newvalue1\"}",
|
|
"source": { ... }
|
|
}
|
|
----
|
|
|
|
More details about the message structure are provided in xref:connectors/mongodb.adoc[the documentation] of the MongoDB connector.
|
|
|
|
While this structure is a good fit to represent changes to MongoDB's schemaless collections,
|
|
it's not understood by existing sink connectors as for instance the Confluent JDBC sink connector.
|
|
|
|
Therefore Debezium provides a {link-kafka-docs}/#connect_transforms[a single message transformation] (SMT)
|
|
which converts the `after`/`patch` information from the MongoDB CDC events into a structure suitable for consumption by existing sink connectors.
|
|
To do so, the SMT parses the JSON strings and reconstructs properly typed Kafka Connect
|
|
(comprising the correct message payload and schema) records from that,
|
|
which then can be consumed by connectors such as the JDBC sink connector.
|
|
|
|
Using JSON as visualization of the emitted record structure, the event from above would like this:
|
|
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"field1" : "newvalue1",
|
|
"field2" : "newvalue2"
|
|
}
|
|
----
|
|
|
|
The SMT should be applied on a sink connector.
|
|
|
|
== Configuration
|
|
|
|
The configuration is a part of sink task connector and is expressed in a set of properties:
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
|
|
transforms.unwrap.drop.tombstones=false
|
|
transforms.unwrap.delete.handling.mode=drop
|
|
transforms.unwrap.operation.header=true
|
|
----
|
|
|
|
=== Array encoding
|
|
|
|
The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema.
|
|
The problem is that such arrays must contains elements of the same time.
|
|
MongoDB allows the user to store elements of heterogeneous types into the same array.
|
|
To bypass this impedance mismatch it is possible to encode the array in two different ways using `array.encoding` configuration option.
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
|
|
transforms.unwrap.array.encoding=<array|document>
|
|
----
|
|
|
|
Value `array` (the default) will encode arrays as the array datatype.
|
|
It is user's responsibility to ensure that all elements for a given array instance are of the same time.
|
|
This option is a restricting one but offers easy processing of arrays by downstream clients.
|
|
|
|
Value `document` will convert the array into a *struct* of *structs* in the similar way as done by http://bsonspec.org/[BSON serialization].
|
|
The main *struct* contains fields named `_0`, `_1`, `_2` etc. where the name represents the index of the element in the array.
|
|
Every element is then passed as the value for the give field.
|
|
|
|
Let's suppose an example source MongoDB document with array with heterogeneous types
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"_id": 1,
|
|
"a1": [
|
|
{
|
|
"a": 1,
|
|
"b": "none"
|
|
},
|
|
{
|
|
"a": "c",
|
|
"d": "something"
|
|
}
|
|
]
|
|
}
|
|
----
|
|
|
|
This document will be encoded as
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"_id": 1,
|
|
"a1": {
|
|
"_0": {
|
|
"a": 1,
|
|
"b": "none"
|
|
},
|
|
"_1": {
|
|
"a": "c",
|
|
"d": "something"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
|
|
This option allows you to process arbitrary arrays but the consumer need to know how to properly handle them.
|
|
|
|
_Note: The underscore in index names is present because Avro encoding requires field names not to start with digit._
|
|
|
|
=== Nested structure flattening
|
|
|
|
When a MongoDB document contains a nested document (structure) it is faithfully encoded as a nested structure field.
|
|
If the sink connector does support only flat structure it is possible to flatten the internal structure into a flat one with a consistent field naming.
|
|
To enable this feature the option `flatten.struct` must be set to `true`.
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
|
|
transforms.unwrap.flatten.struct=<true|false>
|
|
transforms.unwrap.flatten.struct.delimiter=<string>
|
|
----
|
|
|
|
The resulting flat document will consist of fields whose names are created by joining the name of the parent field and the name of the fields in the nested document.
|
|
Those elements are separated with string defined by an option `struct.delimiter` by default set to the _underscore_.
|
|
|
|
Let's suppose an example source MongoDB document with a field with a nested document
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"_id": 1,
|
|
"a": {
|
|
"b": 1,
|
|
"c": "none"
|
|
},
|
|
"d": 100
|
|
}
|
|
----
|
|
|
|
Such document will be encoded as
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"_id": 1,
|
|
"a_c": 1,
|
|
"a_d": "none",
|
|
"d": 100
|
|
}
|
|
----
|
|
|
|
This option allows you to convert a hierarchical document into a flat structure suitable for a table-like storage.
|
|
|
|
=== MongoDB `$unset` handling
|
|
|
|
MongoDB allows you to make `$unset` operations which allows you to remove a certain field from a Document, and since the collections are schemaless it becomes hard to find a way to tell the consumers/sinkers which a field is now missing, the approach debezium uses is to set the desired to remove field to null value.
|
|
|
|
Given the operation
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"after":null,
|
|
"patch":"{\"$unset\" : {\"a\" : true}}"
|
|
}
|
|
----
|
|
|
|
The final encoding will look like
|
|
[source,json,indent=0]
|
|
----
|
|
{
|
|
"id": 1,
|
|
"a": null
|
|
}
|
|
----
|
|
|
|
Note that other mongo operations might cause an `$unset` internally, `$rename` is one example.
|
|
|
|
=== Determine original operation
|
|
|
|
When a message is flattened the final result won't show whether it was an insert, update or first read (Deletions can be detected via tombstones or rewrites, see {link-prefix}:{link-mongodb-event-flattening}#mongodb-extract-new-record-state-configuration-options[Configuration options]).
|
|
|
|
To solve this problem Debezium offers an option to propagate the original operation via a header added to the message.
|
|
To enable this feature the option `operation.header` must be set to `true`.
|
|
|
|
[source]
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
|
|
transforms.unwrap.operation.header=true
|
|
----
|
|
|
|
The possible values are the ones from the `op` field of xref:connectors/mongodb.adoc#mongodb-change-events-value[MongoDB connector change events].
|
|
|
|
=== Adding source metadata fields
|
|
|
|
The SMT can optionally add metadata fields from the original change event's `source` structure to the final flattened record (prefixed with "__").
|
|
This functionality can be used to add things like the collection from the change event, or connector-specific fields like the replica set name.
|
|
For more information on what's available in the source structure see xref:connectors/mongodb.adoc[the documentation] for the MongoDB connector.
|
|
|
|
For example, the configuration
|
|
|
|
----
|
|
transforms=unwrap,...
|
|
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
|
|
transforms.unwrap.add.source.fields=rs,collection
|
|
----
|
|
|
|
will add
|
|
|
|
----
|
|
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
|
|
----
|
|
|
|
to the final flattened record.
|
|
|
|
For `DELETE` events, this option is only supported when the `delete.handling.mode` option is set to "rewrite".
|
|
|
|
|
|
[[mongodb-extract-new-record-state-configuration-options]]
|
|
== Configuration options
|
|
[cols="35%a,10%a,55%a"]
|
|
|===
|
|
|Property |Default |Description
|
|
|
|
|[[mongodb-extract-new-record-state-array-encoding]]<<mongodb-extract-new-record-state-array-encoding, `array.encoding`>>
|
|
|`array`
|
|
|The SMT converts MongoDB arrays into arrays as defined by Apache Connect (or Apache Avro) schema.
|
|
|
|
|[[mongodb-extract-new-record-state-flatten-struct]]<<mongodb-extract-new-record-state-flatten-struct, `flatten.struct`>>
|
|
|`false`
|
|
|The SMT flattens structs by concatenating the fields into plain properties, using a configurable delimiter.
|
|
|
|
|[[mongodb-extract-new-record-state-flatten-struct-delimiter]]<<mongodb-extract-new-record-state-flatten-struct-delimiter, `flatten.struct.delimiter`>>
|
|
|`_`
|
|
|Delimiter to concat between field names from the input record when generating field names for the output record. Only applies when `flatten.struct` is set to `true`
|
|
|
|
ifdef::community[]
|
|
|[[mongodb-extract-new-record-state-operation-header]]<<mongodb-extract-new-record-state-operation-header, `operation.header`>>
|
|
|`false`
|
|
|The SMT adds the xref:connectors/mongodb.adoc#mongodb-change-events-value[event operation] as a message header. +
|
|
This is deprecated as of Debezium 1.2, please use <<mongodb-extract-new-record-state-add-headers, `add.headers`>> and <<mongodb-extract-new-record-state-add-fields, `add.fields`>> instead.
|
|
endif::community[]
|
|
|
|
|[[mongodb-extract-new-record-state-drop-tombstones]]<<mongodb-extract-new-record-state-drop-tombstones, `drop.tombstones`>>
|
|
|`true`
|
|
|The SMT removes the tombstone generated by Debezium from the stream.
|
|
|
|
|[[mongodb-extract-new-record-state-delete-handling-mode]]<<mongodb-extract-new-record-state-delete-handling-mode, `delete.handling.mode`>>
|
|
|`drop`
|
|
|The SMT can `drop`, `rewrite` or pass delete records (`none`). The `rewrite` mode will add a `__deleted` field set to `true` or `false` depending on the represented operation.
|
|
|
|
ifdef::community[]
|
|
|[[mongodb-extract-new-record-state-add-source-fields]]<<mongodb-extract-new-record-state-add-source-fields, `add.source.fields`>> +
|
|
|
|
|
|Fields from the change event's `source` structure to add as metadata (prefixed with "__") to the flattened record. +
|
|
This is deprecated as of Debezium 1.2, please use <<mongodb-extract-new-record-state-add-headers, `add.headers`>> and <<mongodb-extract-new-record-state-add-fields, `add.fields`>> instead.
|
|
endif::community[]
|
|
|
|
|[[mongodb-extract-new-record-state-add-headers]]<<mongodb-extract-new-record-state-add-headers, `add.headers`>>
|
|
|
|
|
|Specify a list of metadata fields to add to header of the flattened message.
|
|
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms").
|
|
The fields will be prefixed with "__" or "__<struct>__", depending on the specification of the struct.
|
|
Please use a comma separated list without spaces.
|
|
|
|
|[[mongodb-extract-new-record-state-add-fields]]<<mongodb-extract-new-record-state-add-fields, `add.fields`>>
|
|
|
|
|
|Specify a list of metadata fields to add to the flattened message.
|
|
In case of duplicate field names (e.g. "ts_ms" exists twice), the struct should be specified to get the correct field (e.g. "source.ts_ms").
|
|
The fields will be prefixed with "__" or "__<struct>__", depending on the specification of the struct.
|
|
Please use a comma separated list without spaces.
|
|
|
|
|[[mongodb-extract-new-record-state-sanitize-field-names]]<<mongodb-extract-new-record-state-sanitize-field-names, `sanitize.field.names`>>
|
|
|`false`
|
|
|Whether field names will be sanitized to adhere to Avro naming requirements.
|
|
See {link-prefix}:{link-avro-serialization}#avro-naming[Avro naming] for more details.
|
|
|===
|
|
|
|
== Known limitations
|
|
|
|
* Feeding data changes from a schemaless store such as MongoDB to strictly schema-based datastores such as a relational database can by definition work within certain limits only.
|
|
Specifically, all fields of documents within one collection with the same name must be of the same type. Otherwise, no consistent column definition can be derived in the target database.
|
|
* Arrays will be restored in the emitted Kafka Connect record correctly, but they are not supported by sink connector just expecting a "flat" message structure.
|