tet123/documentation/modules/ROOT/pages/integrations/serdes.adoc

110 lines
3.6 KiB
Plaintext
Raw Normal View History

2019-12-06 09:25:42 +01:00
= Debezium Event Deserialization
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
2019-12-10 10:05:46 +01:00
[NOTE]
====
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems will using these SerDes.
====
2019-12-06 09:25:42 +01:00
2019-12-10 10:05:46 +01:00
Debezium generates data change events in the form of a complex message structure.
This message is later on serialized by the configured Kafka Connect converter and it is the responsibility of the consumer to deserialize it into a logical message.
For this purpose, Kafka uses the so-called https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html[SerDes].
2019-12-06 09:25:42 +01:00
Debezium provides SerDes (`io.debezium.serde.DebeziumSerdes`) to simplify the deserialization for the consumer either being it Kafka Streams pipeline or plain Kafka consumer.
2019-12-06 09:25:42 +01:00
== JSON SerDe
2019-12-10 10:05:46 +01:00
The JSON SerDe deserializes JSON encoded change events and transforms it into a Java class.
2019-12-06 09:25:42 +01:00
Internally this is achieved using https://github.com/FasterXML/jackson-databind/wiki[Jackson Databind].
2019-12-10 10:05:46 +01:00
2019-12-06 09:25:42 +01:00
The consumer creates a serde instance using
2019-12-10 10:05:46 +01:00
2019-12-06 09:25:42 +01:00
[source,java,indent=0]
----
final Serde<MyType> serde = Serdes.payloadJson(MyType.class);
----
2019-12-10 10:05:46 +01:00
The consumer will then receive the logical Java type `MyType` whose fields are initiated from the JSON message.
This applies to both for keys and values.
It is also possible to use plain Java types like `Integer`, for example when the key consists of a single `INT` field.
2019-12-06 09:25:42 +01:00
2019-12-10 10:05:46 +01:00
When the JSON converter is used by Kafka Connect then it generally provides two modes of operations - with or without schema.
If the schema is used then the message looks like so:
2019-12-06 09:25:42 +01:00
[source,json,indent=0]
----
{
"schema": {...},
"payload": {
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
}
----
2019-12-10 10:05:46 +01:00
Whereas without schema, the structure look more like this:
2019-12-06 09:25:42 +01:00
[source,json,indent=0]
----
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
----
2019-12-10 10:05:46 +01:00
The deserializer behaviour is driven by the `from.field` configuration option and follows these rules:
2019-12-06 09:25:42 +01:00
2019-12-10 10:05:46 +01:00
* if a message contains a schema, then use `payload` only
* if the key is deserialized, then map key field(s) into the target class
* if the value is deserialized and contains the Debezium event envelope then:
** if `from.field` is not set, then deserialize the complete envelope into the target type
** otherwise deserialize and map only content of the field configured into the target type, thus effectively flatting the message
* if the value is deserialized and contains already a flattened message (i.e. when using the SMT for xref:configuration/event-flattening.adoc[Event Flattening]) then map the flattened record into the target logical type
2019-12-06 09:25:42 +01:00
[[configuration_options]]
=== Configuration options
2019-12-10 10:05:46 +01:00
2019-12-06 09:25:42 +01:00
[cols="35%a,10%a,55%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|Property
|Default
|Description
|`from.field`
|`N/A`
|Empty if a message with full envelope should be deserialized, `before`/`after` if only data values before or after the change are required.
|`unknown.properties.ignored`
|`false`
|Determines when an unknown property is encountered whether it should be silently ignored or if a runtime exception should be thrown.
2019-12-06 09:25:42 +01:00
|=======================