99 lines
3.0 KiB
Plaintext
99 lines
3.0 KiB
Plaintext
|
= Debezium Event Deserialization
|
||
|
include::../_attributes.adoc[]
|
||
|
:toc:
|
||
|
:toc-placement: macro
|
||
|
:linkattrs:
|
||
|
:icons: font
|
||
|
:source-highlighter: highlight.js
|
||
|
|
||
|
toc::[]
|
||
|
|
||
|
Debezium generates a data change in the form of a complex message structure.
|
||
|
This message is later on serialized by Kafka Connect converter and it is the responsibility of the consumer to deserialize it into a logical message.
|
||
|
For this purpose, Kafka uses the so-called https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html[SerDes].
|
||
|
|
||
|
Debezium provides serdes to simplify the deserialization for the consumer either being it Kafka Streams pipeline or plain Kafka consumer.
|
||
|
|
||
|
|
||
|
== JSON SerDe
|
||
|
JSON SerDe deserializes JSON encoded message and transforms it into a Java class.
|
||
|
Internally this is achieved using https://github.com/FasterXML/jackson-databind/wiki[Jackson Databind].
|
||
|
The consumer creates a serde instance using
|
||
|
[source,java,indent=0]
|
||
|
----
|
||
|
final Serde<MyType> serde = Serdes.payloadJson(MyType.class);
|
||
|
----
|
||
|
The consumer will then receive logical Java type `MyType` whose fields are initiated from the JSON message.
|
||
|
This applies both for keys and values.
|
||
|
It is also possible to use plain Java types like `Integer` for example when key consists of a single `INT` column.
|
||
|
|
||
|
When JSON converter is used by Kafka Connect then it generally provides two modes of operations - with or without schema.
|
||
|
|
||
|
When the schema is used then the message looks like
|
||
|
|
||
|
[source,json,indent=0]
|
||
|
----
|
||
|
{
|
||
|
"schema": {...},
|
||
|
"payload": {
|
||
|
"op": "u",
|
||
|
"source": {
|
||
|
...
|
||
|
},
|
||
|
"ts_ms" : "...",
|
||
|
"before" : {
|
||
|
"field1" : "oldvalue1",
|
||
|
"field2" : "oldvalue2"
|
||
|
},
|
||
|
"after" : {
|
||
|
"field1" : "newvalue1",
|
||
|
"field2" : "newvalue2"
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
|
||
|
when it is not then the structure is more like
|
||
|
|
||
|
[source,json,indent=0]
|
||
|
----
|
||
|
{
|
||
|
"op": "u",
|
||
|
"source": {
|
||
|
...
|
||
|
},
|
||
|
"ts_ms" : "...",
|
||
|
"before" : {
|
||
|
"field1" : "oldvalue1",
|
||
|
"field2" : "oldvalue2"
|
||
|
},
|
||
|
"after" : {
|
||
|
"field1" : "newvalue1",
|
||
|
"field2" : "newvalue2"
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
|
||
|
The deserializer behaviour is driven by `from.field` configuration option and follows the rules
|
||
|
|
||
|
* if a message contains schema then use `payload` only
|
||
|
* if the key is deserialized then map key fields into the target class
|
||
|
* if the value is deserialized and contains envelope then
|
||
|
** if `from.field` is not set then deserialize complete envelope into the target type
|
||
|
** otherwise deserialize and map only content of field configured into the target type thus effectively flatting the message
|
||
|
* if the value is deserialized and contains already flattened message (i.e. using `ExtractNewRecordState`) then map flattened record into the target logical type
|
||
|
|
||
|
[[configuration_options]]
|
||
|
=== Configuration options
|
||
|
[cols="35%a,10%a,55%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|
||
|
|=======================
|
||
|
|Property
|
||
|
|Default
|
||
|
|Description
|
||
|
|
||
|
|`from.field`
|
||
|
|`N/A`
|
||
|
|Empty if a message with full envelope should be deserialized, `before`/`after` if only data values before or after the change are required.
|
||
|
|=======================
|
||
|
|