tet123/documentation/modules/ROOT/pages/configuration/avro.adoc

254 lines
12 KiB
Plaintext
Raw Normal View History

2020-06-16 16:55:08 +02:00
// Category: cdc-using
// Type: assembly
// ModuleID: using-avro-to-serialzie-message-keys-and-values
// Title: Using Avro to serialize message keys and values
[id="avro-serialization"]
= Avro Serialization
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
2020-06-16 16:55:08 +02:00
{prodname} connectors work with the Kafka Connect framework to capture changes in databases and generate change event records.
The Kafka Connect workers then apply any configured transformations to each of the messages generated by the connector,
serialize each message key and value into a binary form by using the configured link:https://kafka.apache.org/documentation/#connect_running[_converters_],
and write each message into the correct Kafka topic.
You can specify converters in several ways:
* In the Kafka Connect worker configuration.
+
In this case, the same converters are used for all connectors that are deployed to that worker's cluster.
* For an individual connector.
+
Kafka Connect comes with a _JSON converter_ that serializes message keys and values into JSON documents. You can configure the JSON converter to include or exclude the message schema by specifying the `key.converter.schemas.enable` and `value.converter.schemas.enable` properties.
The {prodname} {link-prefix}:{link-tutorial}[tutorial] shows what the messages look like when both payload and schemas are included.
+
Including schemas make the messages very verbose.
If you want your messages serialized with JSON, consider setting these properties to `false` to exclude the verbose schema information.
2020-06-16 16:55:08 +02:00
+
Alternatively, you can serialize the message keys and values by using link:https://avro.apache.org/[Apache Avro].
The Avro binary format is compact and efficient, and Avro schemas make it possible to ensure that the messages have the correct structure.
Avro's schema evolution mechanism makes it possible to evolve the schemas over time, which is essential for {prodname} connectors that dynamically generate the message schemas to match the structure of the database tables.
Over time, the change events captured by {prodname} connectors and written by Kafka Connect into a topic may have different versions of the same schema.
Avro serialization makes it easier for consumers to adapt to the changing schema.
ifdef::product[]
[IMPORTANT]
====
Using Avro to serialize message keys and values is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope].
====
endif::product[]
2020-06-16 16:55:08 +02:00
// Type: concept
// Title: About the Apicurio API and schema registry
[id="about-the-apicurio-api-and-schema-registry"]
== Apicurio API and Schema Registry
2020-06-16 16:55:08 +02:00
The link:https://github.com/Apicurio/apicurio-registry[Apicurio Registry] open-source project provides several components that work with Avro:
2020-05-22 12:48:11 +02:00
2020-06-16 16:55:08 +02:00
* An Avro converter that you can configure in Kafka Connect workers. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the message keys and values into Avro's compact binary form.
2020-06-16 16:55:08 +02:00
* An API/Schema registry that tracks:
+
** Avro schemas that are used in Kafka topics
** Where the Avro converter sends the generated Avro schemas
+
Since the Avro schemas are stored in this registry, each message need only include a tiny _schema identifier_.
2020-06-16 16:55:08 +02:00
This makes each message even smaller. For an I/O bound system like Kafka, this means more total throughput of the producers and consumers.
* Avro _Serdes_ (serializers and deserializers) for Kafka producers and consumers.
Any Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the Apicurio registry with {prodname}, you must add Apicurio Registry converters and their dependencies to the Kafka Connect container image that you are using for running {prodname}.
2020-05-22 12:23:41 +02:00
[NOTE]
====
2020-06-16 16:55:08 +02:00
The Apicurio Registry project also provides a JSON converter that can be used with the Apicurio registry. This combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
2020-05-22 12:23:41 +02:00
====
2020-06-16 16:55:08 +02:00
ifdef::community[]
2020-05-22 12:23:41 +02:00
Another option is using the Confluent schema registry, which is described further below.
2020-06-16 16:55:08 +02:00
endif::community[]
// Type: concept
// Title: Example of implementing Avro serialization
[id="example-of-implementing-avro-serialization"]
== Implementation
To use Avro to serialize message keys and values, you must perform the following tasks:
ifdef::community[]
* Deploy an link:https://github.com/Apicurio/apicurio-registry[Apicurio API/Schema Registry] instance.
endif::community[]
ifdef::product[]
* Deploy an link:{LinkServiceRegistryGetStart}[Apicurio API/Schema Registry] instance by following the instructions in {NameServiceRegistryGetStart}.
2020-06-16 16:55:08 +02:00
endif::product[]
* Install the Avro converter from link:https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into Kafka Connect's _libs_ directory or directly into a plug-in directory
* Configure a Kafka Connect instance with the following property settings:
2020-06-16 16:55:08 +02:00
+
[source,options="nowrap"]
----
2020-05-20 10:26:28 +02:00
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://apicurio:8080/api
2020-05-20 10:26:28 +02:00
key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://apicurio:8080/api
2020-05-20 10:26:28 +02:00
value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
----
2020-06-16 16:55:08 +02:00
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
2020-06-16 16:55:08 +02:00
// Type: procedure
// Title: Building Debezium container images with Avro converter
// ModuleID: building-debezium-container-images-with-avro-converter
== Building {prodname} container images
2020-06-16 16:55:08 +02:00
To implement Avro serialization of message keys and values, you must build a {prodname} image in which {prodname} uses the Avro converter.
2020-05-20 10:26:28 +02:00
.Procedure
. Deploy an Apicurio Registry instance.
+
ifdef::community[]
The following example uses a non-production in-memory instance:
+
[source]
2020-05-20 10:26:28 +02:00
----
docker run -it --rm --name apicurio \
-p 8080:8080 apicurio/apicurio-registry-mem:{apicurio-version}
----
endif::community[]
ifdef::product[]
2020-05-20 10:26:28 +02:00
endif::product[]
2020-06-16 16:55:08 +02:00
. Build a {prodname} image with the Avro converter from this link:https://github.com/debezium/debezium-examples/blob/master/tutorial/debezium-with-apicurio/Dockerfile[Dockerfile]:
+
2020-05-20 10:26:28 +02:00
[source]
[subs="attributes"]
----
docker build --build-arg DEBEZIUM_VERSION={debezium-docker-label} -t debezium/connect-apicurio:{debezium-docker-label} .
----
2020-06-16 16:55:08 +02:00
. Run a Kafka Connect image that is configured to use Avro:
+
2020-05-20 10:26:28 +02:00
[source]
[subs="attributes"]
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link apicurio:apicurio \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
2020-06-16 16:55:08 +02:00
-e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
2020-05-20 10:26:28 +02:00
-e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080 \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
-e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080 \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
-p 8083:8083 debezium/connect-apicurio:{debezium-docker-label}
----
2020-06-16 16:55:08 +02:00
// Type: concept
// Title: About Avro name requirements
// ModuleID: about-avro-name-requirements
2020-05-20 10:26:28 +02:00
[[avro-naming]]
== Naming
As stated in the Avro link:https://avro.apache.org/docs/current/spec.html#names[documentation], names must adhere to the following rules:
* Start with `[A-Za-z_]`
2020-06-16 16:55:08 +02:00
* Subsequently contains only `[A-Za-z0-9_]` characters
2020-05-20 10:26:28 +02:00
{prodname} uses the column's name as the basis for the corresponding Avro field.
2020-06-16 16:55:08 +02:00
This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules.
Each {prodname} connector provides a configuration property, `sanitize.field.names` that you can set to `true` if you have columns that do not adhere to Avro rules for names. Setting `sanitize.field.names` to `true` allows serialization of non-conformant fields without having to actually modify your schema.
2020-05-20 10:26:28 +02:00
2020-06-16 16:55:08 +02:00
ifdef::community[]
2020-05-20 10:26:28 +02:00
== Confluent Schema Registry
2020-06-17 08:55:26 +02:00
There is an alternative https://github.com/confluentinc/schema-registry[schema registry] implementation provided by Confluent.
2020-05-20 10:26:28 +02:00
The configuration is slightly different.
2020-05-22 12:23:41 +02:00
Here are the properties that should be used:
2020-05-20 10:26:28 +02:00
[source]
----
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
----
2020-05-22 12:23:41 +02:00
An instance of the Confluent Schema Registry can be deployed like so:
[source]
----
docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
----
Run a Kafka Connect image configured to use Avro:
[source]
[subs="attributes"]
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-p 8083:8083 debezium/connect:{debezium-docker-label}
----
Run a console consumer which reads new Avro messages from the `db.myschema.mytable` topic and decodes to JSON:
[source]
[subs="attributes"]
----
docker run -it --rm --name avro-consumer \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
debezium/connect:{debezium-docker-label} \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic db.myschema.mytable
----
2020-06-16 16:55:08 +02:00
endif::[community]
2020-05-22 12:23:41 +02:00
== Getting More Information
link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the {prodname} blog
2020-05-22 12:23:41 +02:00
describes the concepts of serializers, converters etc. and discusses the advantages of using Avro.
Note that some details around Kafka Connect converters have slightly changed since the time of writing this post.
For a complete example of using Avro as the message format for {prodname} data change events,
2020-05-22 12:23:41 +02:00
please see the https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format] tutorial example.