tet123/documentation/modules/ROOT/pages/configuration/avro.adoc

368 lines
18 KiB
Plaintext
Raw Normal View History

2020-06-16 16:55:08 +02:00
// Category: cdc-using
// Type: assembly
// ModuleID: configuring-debezium-connectors-to-use-avro-serialization
// Title: Configuring {prodname} connectors to use Avro serialization
[id="avro-serialization"]
= Avro Serialization
2019-11-25 22:06:33 +01:00
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
2019-11-25 22:06:33 +01:00
toc::[]
{prodname} connectors work with the Kafka Connect framework to capture changes in databases and generate change event records. The Kafka Connect workers then apply any configured transformations to each of the messages generated by the connector, serialize each message key and value into a binary form by using the configured link:https://kafka.apache.org/documentation/#connect_running[_converters_], and write each message into the correct Kafka topic.
2020-06-16 16:55:08 +02:00
You can specify converters in several ways:
* In the Kafka Connect worker configuration.
+
In this case, the same converters are used for all connectors that are deployed to that worker's cluster.
* For an individual connector.
Kafka Connect comes with a _JSON converter_ that serializes message keys and values into JSON documents. You can configure the JSON converter to include or exclude the message schema by specifying the `key.converter.schemas.enable` and `value.converter.schemas.enable` properties. The {prodname} {link-prefix}:{link-tutorial}[tutorial] shows what the messages look like when both payload and schemas are included.
Including schemas causes the messages to be very verbose. If you want your messages serialized with JSON, consider setting these properties to `false` to exclude the verbose schema information.
2020-06-16 16:55:08 +02:00
Alternatively, you can serialize the message keys and values by using link:https://avro.apache.org/[Apache Avro].
The Avro binary format is compact and efficient, and Avro schemas make it possible to ensure that the messages have the correct structure. Avro's schema evolution mechanism makes it possible to evolve the schemas over time, which is essential for {prodname} connectors that dynamically generate the message schemas to match the structure of the database tables. Over time, the change events captured by {prodname} connectors and written by Kafka Connect into a topic may have different versions of the same schema. Avro serialization makes it easier for consumers to adapt to the changing schema.
2020-06-16 16:55:08 +02:00
To use Apache Avro as the message format, you must deploy a schema registry that manages Avro message schemas and their versions.
2020-06-16 16:55:08 +02:00
ifdef::product[]
[IMPORTANT]
====
Using Avro to serialize message keys and values is a Technology Preview feature. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope].
====
endif::product[]
2020-06-16 16:55:08 +02:00
// Type: concept
// Title: About the {registry}
[id="about-the-registry"]
== About the {registry-name-full}
ifdef::community[]
The link:https://github.com/Apicurio/apicurio-registry[{registry}] open-source project provides several components that work with Avro:
endif::community[]
ifdef::product[]
{LinkServiceRegistryGetStart}[{registry-name-full}] provides several components that work with Avro:
endif::product[]
2020-05-22 12:48:11 +02:00
2020-06-16 16:55:08 +02:00
* An Avro converter that you can configure in Kafka Connect workers. This converter maps Kafka Connect schemas to Avro schemas. The converter then uses the Avro schemas to serialize the message keys and values into Avro's compact binary form.
2020-06-16 16:55:08 +02:00
* An API/Schema registry that tracks:
+
** Avro schemas that are used in Kafka topics
** Where the Avro converter sends the generated Avro schemas
+
Since the Avro schemas are stored in this registry, each message need only include a tiny _schema identifier_.
2020-06-16 16:55:08 +02:00
This makes each message even smaller. For an I/O bound system like Kafka, this means more total throughput of the producers and consumers.
* Avro _Serdes_ (serializers and deserializers) for Kafka producers and consumers.
Any Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the {registry} with {prodname}, you must add {registry} converters and their dependencies to the Kafka Connect container image that you are using for running {prodname}.
2020-05-22 12:23:41 +02:00
[NOTE]
====
The {registry} project also provides a JSON converter. This converter combines the advantage of less verbose messages with human-readable JSON. Messages do not contain the schema information themselves, but only a schema ID.
2020-05-22 12:23:41 +02:00
====
2020-06-16 16:55:08 +02:00
ifdef::community[]
Another option is using the <<confluent-schema-registry,Confluent schema registry>>, which is described later.
2020-06-16 16:55:08 +02:00
endif::community[]
// Type: concept
// Title: Overview of deploying a {prodname} connector that uses Avro serialization
[id="overview-of-deploying-a-debezium-connector-that-uses-avro-serialization"]
== Overview
To deploy a {prodname} connector that uses Avro serialization, there are three main tasks:
2020-06-16 16:55:08 +02:00
ifdef::community[]
. Deploy an link:https://github.com/Apicurio/apicurio-registry[{registry-name-full}] instance.
endif::community[]
ifdef::product[]
. Deploy a link:{LinkServiceRegistryGetStart}[{registry-name-full}] instance by following the instructions in {NameServiceRegistryGetStart}.
endif::product[]
2020-06-16 16:55:08 +02:00
ifdef::community[]
. Install the Avro converter from link:https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into Kafka Connect's _libs_ directory or directly into a plug-in directory.
2020-06-16 16:55:08 +02:00
endif::community[]
ifdef::product[]
. Install the Avro converter by downloading the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[Service Registry Kafka Connect] zip file and extracting it into the {prodname} connector's directory.
2020-06-16 16:55:08 +02:00
endif::product[]
. Configure a Kafka Connect instance with the following property settings:
2020-06-16 16:55:08 +02:00
+
[source,options="nowrap"]
----
2020-05-20 10:26:28 +02:00
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://apicurio:8080/api
2020-05-20 10:26:28 +02:00
key.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://apicurio:8080/api
2020-05-20 10:26:28 +02:00
value.converter.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
----
+
Alternatively, you can also configure specific connectors to use the Avro converter.
2020-06-16 16:55:08 +02:00
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
2020-06-16 16:55:08 +02:00
// Type: procedure
// Title: Deploying connectors that use Avro in {prodname} containers
// ModuleID: deploying-connectors-that-use-avro-in-debezium-containers
== Deploying with {prodname} containers
In your environment, you might want to use a {prodname} pre-built container to deploy {prodname} connectors that use Avro serializaion. Follow the procedure here to do that. In this procedure, you build a custom Kafka Connect container image for {prodname}, which uses the Avro converter.
.Prerequisites
* You have the required permissions on a Kafka cluster.
* You downloaded the {prodname} connector plug-in that you want to deploy with Avro serialization.
2020-05-20 10:26:28 +02:00
.Procedure
. Deploy an instance of {registry}.
+
ifdef::community[]
The following example uses a non-production, in-memory, {registry} instance:
+
[source,subs="attributes"]
2020-05-20 10:26:28 +02:00
----
docker run -it --rm --name apicurio \
-p 8080:8080 apicurio/apicurio-registry-mem:{apicurio-version}
----
endif::community[]
ifdef::product[]
.. Choose the OpenShift project in which you want to deploy the {prodname} connector. In the following command, `$NAMESPACE` represents your project.
.. Deploy the latest published {registry} operator by running the following command:
+
`curl -sSL https://raw.githubusercontent.com/apicurio/apicurio-registry-operator/master/docs/resources/install.yaml | sed "s/{NAMESPACE}/$NAMESPACE/g" | oc apply -f -`
+
This deploys the latest development version of the {registry} operator from the `master` branch. To deploy other versions, specify a different branch or tag, or edit the operator image reference in the file.
+
.. Create a new {registry} deployment by specifying the in-memory persistence option in one of the example custom resources, for example:
+
`oc create -f https://raw.githubusercontent.com/apicurio/apicurio-registry-operator/master/docs/resources/example-cr/in-memory.yaml`
+
The in-memory deployment is not suitable for production. Use the Apache Kafka persistence option for production. For more information, see {LinkServiceRegistryGetStart}[NameServiceRegistryGetStart].
endif::product[]
. Build a {prodname} container image that implements the Avro converter:
2020-06-16 16:55:08 +02:00
+
ifdef::community[]
.. Copy link:https://github.com/debezium/debezium-examples/blob/master/tutorial/debezium-with-apicurio/Dockerfile[`Dockerfile`] to a convenient location. This file has the following content:
+
[listing,options="nowrap"]
----
ARG DEBEZIUM_VERSION
FROM debezium/connect:$DEBEZIUM_VERSION
ENV KAFKA_CONNECT_DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-mysql
ENV APICURIO_VERSION=1.1.2.Final
RUN cd $KAFKA_CONNECT_DEBEZIUM_DIR &&\
curl https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/$APICURIO_VERSION/apicurio-registry-distro-connect-converter-$APICURIO_VERSION-converter.tar.gz | tar xzv
----
.. Run the following command:
+
[source,subs="attributes"]
2020-05-20 10:26:28 +02:00
----
docker build --build-arg DEBEZIUM_VERSION={debezium-docker-label} -t debezium/connect-apicurio:{debezium-docker-label} .
----
endif::community[]
ifdef::product[]
.. Download the link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[{registry} Kafka Connect] zip file.
.. Extract the content into the directory that contains the {prodname} connector that you are configuring to use Avro serialization.
.. Create a custom image for Kafka Connect. See link:{LinkCDCInstallOpenShift}[NameCDCInstallOpenShift] for an example of how to do this. Start with the `Dockerfile` in that example. Then add the {registry} converters to the connector directories.
endif::product[]
2020-05-20 10:26:28 +02:00
. Run the newly built Kafka Connect image, configuring it so it uses the Avro converter (or configure specific converters to do so):
2020-06-16 16:55:08 +02:00
+
ifdef::community[]
[source,subs="attributes"]
2020-05-20 10:26:28 +02:00
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link apicurio:apicurio \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
2020-06-16 16:55:08 +02:00
-e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
2020-05-20 10:26:28 +02:00
-e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080 \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
-e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080 \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_GLOBAL-ID=io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy \
-p 8083:8083 debezium/connect-apicurio:{debezium-docker-label}
----
endif::community[]
ifdef::product[]
.. Follow the steps in the link:{LinkCDCGettingStarted}#deploying-kafka-connect[example of deploying Kafka Connect] in {NameCDCGettingStarted}.
.. Open the `examples/kafka-connect/kafka-connect-s2i-single-node-kafka.yaml` file that you used to deploy Kafka Connect.
+
Before you can create the connector instance,
you must first enable connector resources in the `KafkaConnectS2I` Custom Resource (CR).
.. In the `metadata.annotations` section, enable Kafka Connect to use connector resources.
+
.kafka-connect-s2i-single-node-kafka.yaml
[source,yaml,options="nowrap"]
----
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnectS2I
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
...
----
.. Apply the updated `kafka-connect-s2i-single-node-kafka.yaml` file to update the `KafkaConnectS2I` CR:
+
`oc apply -f kafka-connect-s2i-single-node-kafka.yaml`
.. In the Kafka Connect CR that defines the connector, add the properties that are required by the Avro converter. The CR looks like this:
+
.inventory-connector.yaml
[source,yaml,options="nowrap"]
----
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
name: inventory-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql
database.port: 3306
database.user: debezium
database.password: dbz
database.server.id: 184054
database.server.name: dbserver1
database.whitelist: inventory
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
key.converter: io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url: http://apicurio:8080/api
key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
value.converter: io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url: http://apicurio:8080/api
value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy
----
.. Apply the connector instance:
+
`$ oc apply -f inventory-connector.yaml`
+
This registers `inventory-connector` and the connector starts to run against the `inventory` database.
.. To verify that the connector was created and has started to monitor the database, follow the steps at the end of the example procedure in link:{LinkCDCGettingStarted}#creating-connector-monitor-inventory-database[NameCDCGettingStarted].
endif::product[]
2020-05-20 10:26:28 +02:00
2020-06-16 16:55:08 +02:00
// Type: concept
// Title: About Avro name requirements
// ModuleID: about-avro-name-requirements
2020-05-20 10:26:28 +02:00
[[avro-naming]]
== Naming
As stated in the Avro link:https://avro.apache.org/docs/current/spec.html#names[documentation], names must adhere to the following rules:
* Start with `[A-Za-z_]`
2020-06-16 16:55:08 +02:00
* Subsequently contains only `[A-Za-z0-9_]` characters
2020-05-20 10:26:28 +02:00
{prodname} uses the column's name as the basis for the corresponding Avro field.
2020-06-16 16:55:08 +02:00
This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules.
Each {prodname} connector provides a configuration property, `sanitize.field.names` that you can set to `true` if you have columns that do not adhere to Avro rules for names. Setting `sanitize.field.names` to `true` allows serialization of non-conformant fields without having to actually modify your schema.
2020-05-20 10:26:28 +02:00
2020-06-16 16:55:08 +02:00
ifdef::community[]
[id="confluent-schema-registry"]
2020-05-20 10:26:28 +02:00
== Confluent Schema Registry
2020-06-17 08:55:26 +02:00
There is an alternative https://github.com/confluentinc/schema-registry[schema registry] implementation provided by Confluent.
2020-05-20 10:26:28 +02:00
The configuration is slightly different.
2020-05-22 12:23:41 +02:00
Here are the properties that should be used:
2020-05-20 10:26:28 +02:00
[source]
----
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
----
2020-05-22 12:23:41 +02:00
An instance of the Confluent Schema Registry can be deployed like so:
[source]
----
docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
----
Run a Kafka Connect image configured to use Avro:
[source,subs="attributes"]
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-p 8083:8083 debezium/connect:{debezium-docker-label}
----
Run a console consumer which reads new Avro messages from the `db.myschema.mytable` topic and decodes to JSON:
[source,subs="attributes"]
----
docker run -it --rm --name avro-consumer \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
debezium/connect:{debezium-docker-label} \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic db.myschema.mytable
----
2020-05-22 12:23:41 +02:00
== Getting More Information
link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the {prodname} blog
2020-05-22 12:23:41 +02:00
describes the concepts of serializers, converters etc. and discusses the advantages of using Avro.
Note that some details around Kafka Connect converters have slightly changed since the time of writing this post.
For a complete example of using Avro as the message format for {prodname} data change events,
2020-05-22 12:23:41 +02:00
please see the https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format] tutorial example.
endif::community[]