. Serializes the record key and value into a binary form by using the configured link:https://kafka.apache.org/documentation/#connect_running[Kafka Connect converters].
Alternatively, you can serialize the record keys and values by using https://avro.apache.org/[Apache Avro].
The Avro binary format is compact and efficient.
Avro schemas make it possible to ensure that each record has the correct structure.
Avro's schema evolution mechanism enables schemas to evolve.
This is essential for {prodname} connectors, which dynamically generate each record's schema to match the structure of the database table that was changed.
Over time, change event records written to the same Kafka topic might have different versions of the same schema.
Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. For information about setting up this registry, see the documentation for {LinkServiceRegistryGetStart}[{registry-name-full}].
To use the {registry} with {prodname}, add {registry} converters and their dependencies to the Kafka Connect container image that you are using for running a {prodname} connector.
. Install the Avro converter from link:https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}-converter.tar.gz[the installation package] into a plug-in directory. This is not needed when using the link:https://hub.docker.com/r/debezium/connect[Debezium Connect container image], see details in <<deploying-with-debezium-containers>>.
. Install the Avro converter by downloading the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[Service Registry Kafka Connect] zip file and extracting it into the {prodname} connector's directory.
In your environment, you might want to use a provided {prodname} container image to deploy {prodname} connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you enable Apicurio converters on the {prodname} Kafka Connect container image, and configure the {prodname} connector to use the Avro converter.
In your environment, you might want to use a provided {prodname} container to deploy {prodname} connectors that use Avro serialization.
Complete the following procedure to build a custom Kafka Connect container image for {prodname}, and configure the {prodname} connector to use the Avro converter.
. Run the {prodname} container image for Kafka Connect, configuring it to provide the Avro converter by enabling Apicurio via `ENABLE_APICURIO_CONVERTERS=true` environment variable:
. Deploy an instance of {registry}. See link:{LinkServiceRegistryGetStart}#installing-registry-operatorhub[{NameServiceRegistryGetStart}, Installing Service Registry from the OpenShift OperatorHub], which provides instructions for:
. Extract the {prodname} connector archives to create a directory structure for the connector plug-ins.
If you downloaded and extracted the archives for multiple {prodname} connectors, the resulting directory structure looks like the one in the following example:
To configure more than one type of {prodname} connector to use Avro serialization, extract the archive into the directory for each relevant connector type.
Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.
.. Build the docker container image. For example, if you saved the docker file that you created in the previous step as `debezium-container-with-avro`, then you would run the following command:
* In the `install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml` file, edit the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
.. Create a {prodname} connector instance. The following `inventory-connector.yaml` file example creates a `KafkaConnector` custom resource that defines a MySQL connector instance that is configured to use the Avro converter:
2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
...
----
+
Taking the snapshot involves a number of steps:
+
[source,shell,options="nowrap"]
----
...
2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
...
2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
Each {prodname} connector provides a configuration property, `sanitize.field.names` that you can set to `true` if you have columns that do not adhere to Avro rules for names.
Setting `sanitize.field.names` to `true` allows serialization of non-conformant fields without having to actually modify your schema.