tet123/documentation/modules/ROOT/pages/configuration/avro.adoc

452 lines
21 KiB
Plaintext

// Category: debezium-using
// Type: assembly
// ModuleID: configuring-debezium-connectors-to-use-avro-serialization
// Title: Configuring {prodname} connectors to use Avro serialization
[id="avro-serialization"]
= Avro Serialization
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
A {prodname} connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record.
For each change event record, the {prodname} connector completes the following actions:
. Applies configured transformations.
. Serializes the record key and value into a binary form by using the configured link:https://kafka.apache.org/documentation/#connect_running[Kafka Connect converters].
. Writes the record to the correct Kafka topic.
You can specify converters for each individual {prodname} connector instance.
Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents.
The default behavior is that the JSON converter includes the record's message schema, which makes each record very verbose.
The {link-prefix}:{link-tutorial}[{name-tutorial}] shows what the records look like when both payload and schemas are included.
If you want records to be serialized with JSON, consider setting the following connector configuration properties to `false`:
* `key.converter.schemas.enable`
* `value.converter.schemas.enable`
Setting these properties to `false` excludes the verbose schema information from each record.
Alternatively, you can serialize the record keys and values by using https://avro.apache.org/[Apache Avro].
The Avro binary format is compact and efficient.
Avro schemas make it possible to ensure that each record has the correct structure.
Avro's schema evolution mechanism enables schemas to evolve.
This is essential for {prodname} connectors, which dynamically generate each record's schema to match the structure of the database table that was changed.
Over time, change event records written to the same Kafka topic might have different versions of the same schema.
Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.
ifdef::community[]
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions.
Available options include the {registry-name-full} as well as the Confluent Schema Registry. Both are described here.
endif::community[]
ifdef::product[]
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions.
For information about setting up this registry, see the documentation for link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}].
endif::product[]
// Type: concept
// Title: About the {registry}
[id="about-the-registry"]
== About the {registry-name-full}
ifdef::community[]
The link:https://github.com/Apicurio/apicurio-registry[{registry}] open-source project provides several components that work with Avro:
endif::community[]
ifdef::product[]
link:{LinkServiceRegistryUser}[{registry-name-full}] provides the following components that work with Avro:
endif::product[]
* An Avro converter that you can specify in {prodname} connector configurations.
This converter maps Kafka Connect schemas to Avro schemas.
The converter then uses the Avro schemas to serialize the record keys and values into Avro's compact binary form.
* An API and schema registry that tracks:
+
** Avro schemas that are used in Kafka topics.
** Where the Avro converter sends the generated Avro schemas.
+
Because the Avro schemas are stored in this registry, each record needs to contain only a tiny _schema identifier_.
This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
* Avro _Serdes_ (serializers and deserializers) for Kafka producers and consumers.
Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
To use the {registry} with {prodname}, add {registry} converters and their dependencies to the Kafka Connect container image that you are using for running a {prodname} connector.
[NOTE]
====
The {registry} project also provides a JSON converter.
This converter combines the advantage of less verbose messages with human-readable JSON.
Messages do not contain the schema information themselves, but only a schema ID.
====
[NOTE]
====
To use converters provided by {registry} you need to provide `apicurio.registry.url`.
====
// Type: concept
// Title: Overview of deploying a {prodname} connector that uses Avro serialization
[id="overview-of-deploying-a-debezium-connector-that-uses-avro-serialization"]
== Deployment overview
To deploy a {prodname} connector that uses Avro serialization, you must complete three main tasks:
ifdef::community[]
. Deploy an link:https://github.com/Apicurio/apicurio-registry[{registry-name-full}] instance.
endif::community[]
ifdef::product[]
. Deploy a {registry-name-full} instance by following the instructions in link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}].
endif::product[]
ifdef::community[]
. Install the Avro converter from link:https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into a plug-in directory. This is not needed when using the link:https://hub.docker.com/r/debezium/connect[Debezium Connect container image], see details in <<deploying-with-debezium-containers>>.
endif::community[]
ifdef::product[]
. Install the Avro converter by downloading the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[Service Registry Kafka Connect] zip file and extracting it into the {prodname} connector's directory.
endif::product[]
. Configure a {prodname} connector instance to use Avro serialization by setting configuration properties as follows:
+
[source,options="nowrap"]
----
key.converter=io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
key.converter.apicurio.registry.auto-register=true
key.converter.apicurio.registry.find-latest=true
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.apicurio.registry.auto-register=true
value.converter.apicurio.registry.find-latest=true
----
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
// Type: procedure
// Title: Deploying connectors that use Avro in {prodname} containers
// ModuleID: deploying-connectors-that-use-avro-in-debezium-containers
[id="deploying-with-debezium-containers"]
== Deploying with {prodname} containers
ifdef::community[]
In your environment, you might want to use a provided {prodname} container image to deploy {prodname} connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you enable Apicurio converters on the {prodname} Kafka Connect container image, and configure the {prodname} connector to use the Avro converter.
endif::community[]
ifdef::product[]
In your environment, you might want to use a provided {prodname} container to deploy {prodname} connectors that use Avro serialization.
Complete the following procedure to build a custom Kafka Connect container image for {prodname}, and configure the {prodname} connector to use the Avro converter.
endif::product[]
.Prerequisites
* You have Docker installed and sufficient rights to create and manage containers.
* You downloaded the {prodname} connector plug-in(s) that you want to deploy with Avro serialization.
.Procedure
ifdef::community[]
. Deploy an instance of {registry}.
+
The following example uses a non-production, in-memory, {registry} instance:
+
[source,subs="attributes+"]
----
docker run -it --rm --name apicurio \
-p 8080:8080 apicurio/apicurio-registry-mem:{apicurio-version}
----
. Run the {prodname} container image for Kafka Connect, configuring it to provide the Avro converter by enabling Apicurio via `ENABLE_APICURIO_CONVERTERS=true` environment variable:
+
[source,subs="attributes+"]
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link apicurio:apicurio \
-e ENABLE_APICURIO_CONVERTERS=true \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
-e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
-p 8083:8083 debezium/connect:{debezium-docker-label}
----
endif::community[]
ifdef::product[]
. Deploy an instance of {registry}. See link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}], which provides instructions for:
+
* Installing {registry}
* Installing AMQ Streams
* Setting up AMQ Streams storage
. Extract the {prodname} connector archives to create a directory structure for the connector plug-ins.
If you downloaded and extracted the archives for multiple {prodname} connectors, the resulting directory structure looks like the one in the following example:
+
[subs=+macros]
----
pass:quotes[*tree ./my-plugins/*]
./my-plugins/
├── debezium-connector-mongodb
| ├── ...
├── debezium-connector-mysql
│ ├── ...
├── debezium-connector-postgres
│ ├── ...
└── debezium-connector-sqlserver
├── ...
----
. Add the Avro converter to the directory that contains the {prodname} connector that you want to configure to use Avro serialization:
.. Go to the link:{DebeziumDownload} and download the {registry} Kafka Connect zip file.
.. Extract the archive into the desired {prodname} connector directory.
+
To configure more than one type of {prodname} connector to use Avro serialization, extract the archive into the directory for each relevant connector type.
Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.
. Create and publish a custom image for running {prodname} connectors that are configured to use the Avro converter:
.. Create a new `Dockerfile` by using `{DockerKafkaConnect}` as the base image.
In the following example, replace _my-plugins_ with the name of your plug-ins directory:
+
[subs=+macros]
----
FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
USER root:root
pass:quotes[COPY _./my-plugins/_ /opt/kafka/plugins/]
USER 1001
----
+
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the `/opt/kafka/plugins` directory.
.. Build the docker container image. For example, if you saved the docker file that you created in the previous step as `debezium-container-with-avro`, then you would run the following command:
+
`docker build -t debezium-container-with-avro:latest`
.. Push your custom image to your container registry, for example:
+
`docker push _<myregistry.io>_/debezium-container-with-avro:latest`
.. Point to the new container image. Do one of the following:
+
* Edit the `KafkaConnect.spec.image` property of the `KafkaConnect` custom resource.
If set, this property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator.
For example:
+
[source,yaml,subs=attributes+]
----
apiVersion: {KafkaConnectApiVersion}
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
#...
image: debezium-container-with-avro
----
+
* In the `install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml` file, edit the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
. Deploy each {prodname} connector that is configured to use the Avro converter.
For each {prodname} connector:
.. Create a {prodname} connector instance. The following `inventory-connector.yaml` file example creates a `KafkaConnector` custom resource that defines a MySQL connector instance that is configured to use the Avro converter:
+
[source,yaml,options="nowrap"]
----
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
name: inventory-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
database.hostname: mysql
database.port: 3306
database.user: debezium
database.password: dbz
database.server.id: 184054
database.server.name: dbserver1
database.include.list: inventory
database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
database.history.kafka.topic: schema-changes.inventory
key.converter: io.apicurio.registry.utils.converter.AvroConverter
key.converter.apicurio.registry.url: http://apicurio:8080/api
key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
value.converter: io.apicurio.registry.utils.converter.AvroConverter
value.converter.apicurio.registry.url: http://apicurio:8080/api
value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
----
.. Apply the connector instance, for example:
+
`oc apply -f inventory-connector.yaml`
+
This registers `inventory-connector` and the connector starts to run against the `inventory` database.
. Verify that the connector was created and has started to track changes in the specified database.
You can verify the connector instance by watching the Kafka Connect log output as, for example, `inventory-connector` starts.
.. Display the Kafka Connect log output:
+
[source,shell,options="nowrap"]
----
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
----
.. Review the log output to verify that the initial snapshot has been executed.
You should see something like the following lines:
+
[source,shell,options="nowrap"]
----
...
2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
...
----
+
Taking the snapshot involves a number of steps:
+
[source,shell,options="nowrap"]
----
...
2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
...
2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
...
----
+
After completing the snapshot, {prodname} begins tracking changes in, for example, the `inventory` database's `binlog` for change events:
+
[source,shell,options="nowrap"]
----
...
2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
...
----
endif::product[]
ifdef::community[]
[id="confluent-schema-registry"]
== Confluent Schema Registry
There is an alternative https://github.com/confluentinc/schema-registry[schema registry] implementation provided by Confluent.
The configuration is slightly different.
. In your {prodname} connector configuration, specify the following properties:
+
[source]
----
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
----
. Deploy an instance of the Confluent Schema Registry:
+
[source]
----
docker run -it --rm --name schema-registry \
--link zookeeper \
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
-p 8181:8181 confluentinc/cp-schema-registry
----
. Run a Kafka Connect image configured to use Avro:
+
[source,subs="attributes+"]
----
docker run -it --rm --name connect \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
-p 8083:8083 debezium/connect:{debezium-docker-label}
----
. Run a console consumer that reads new Avro messages from the `db.myschema.mytable` topic and decodes to JSON:
+
[source,subs="attributes+"]
----
docker run -it --rm --name avro-consumer \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
--link schema-registry:schema-registry \
debezium/connect:{debezium-docker-label} \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic db.myschema.mytable
----
endif::community[]
// Type: concept
// Title: About Avro name requirements
// ModuleID: about-avro-name-requirements
[[avro-naming]]
== Naming
As stated in the Avro link:https://avro.apache.org/docs/current/spec.html#names[documentation], names must adhere to the following rules:
* Start with `[A-Za-z_]`
* Subsequently contains only `[A-Za-z0-9_]` characters
{prodname} uses the column's name as the basis for the corresponding Avro field.
This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules.
Each {prodname} connector provides a configuration property, `sanitize.field.names` that you can set to `true` if you have columns that do not adhere to Avro rules for names.
Setting `sanitize.field.names` to `true` allows serialization of non-conformant fields without having to actually modify your schema.
ifdef::community[]
== Getting More Information
link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the {prodname} blog
describes the concepts of serializers, converters, and other components, and discusses the advantages of using Avro.
Some Kafka Connect converter details have slightly changed since that post was written.
For a complete example of using Avro as the message format for {prodname} change data events,
see https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format].
endif::community[]