482 lines
23 KiB
Plaintext
482 lines
23 KiB
Plaintext
// Category: debezium-using
|
|
// Type: assembly
|
|
// ModuleID: configuring-debezium-connectors-to-use-avro-serialization
|
|
// Title: Configuring {prodname} connectors to use Avro serialization
|
|
[id="avro-serialization"]
|
|
= Avro Serialization
|
|
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
A {prodname} connector works in the Kafka Connect framework to capture each row-level change in a database by generating a change event record.
|
|
For each change event record, the {prodname} connector completes the following actions:
|
|
|
|
. Applies configured transformations.
|
|
. Serializes the record key and value into a binary form by using the configured link:https://kafka.apache.org/documentation/#connect_running[Kafka Connect converters].
|
|
. Writes the record to the correct Kafka topic.
|
|
|
|
You can specify converters for each individual {prodname} connector instance.
|
|
Kafka Connect provides a JSON converter that serializes the record keys and values into JSON documents.
|
|
The default behavior is that the JSON converter includes the record's message schema, which makes each record very verbose.
|
|
The {link-prefix}:{link-tutorial}[{name-tutorial}] shows what the records look like when both payload and schemas are included.
|
|
If you want records to be serialized with JSON, consider setting the following connector configuration properties to `false`:
|
|
|
|
* `key.converter.schemas.enable`
|
|
* `value.converter.schemas.enable`
|
|
|
|
Setting these properties to `false` excludes the verbose schema information from each record.
|
|
|
|
Alternatively, you can serialize the record keys and values by using https://avro.apache.org/[Apache Avro].
|
|
The Avro binary format is compact and efficient.
|
|
Avro schemas make it possible to ensure that each record has the correct structure.
|
|
Avro's schema evolution mechanism enables schemas to evolve.
|
|
This is essential for {prodname} connectors, which dynamically generate each record's schema to match the structure of the database table that was changed.
|
|
Over time, change event records written to the same Kafka topic might have different versions of the same schema.
|
|
Avro serialization makes it easier for the consumers of change event records to adapt to a changing record schema.
|
|
|
|
ifdef::community[]
|
|
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions.
|
|
Available options include the {registry-name-full} as well as the Confluent Schema Registry. Both are described here.
|
|
endif::community[]
|
|
|
|
ifdef::product[]
|
|
To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions.
|
|
For information about setting up this registry, see the documentation for link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}].
|
|
endif::product[]
|
|
|
|
[id="apicurio-registry"]
|
|
== {registry-name-full}
|
|
|
|
// Type: concept
|
|
// Title: About the {registry}
|
|
[id="about-the-registry"]
|
|
=== About the {registry-name-full}
|
|
|
|
ifdef::community[]
|
|
The link:https://github.com/Apicurio/apicurio-registry[{registry}] open-source project provides several components that work with Avro:
|
|
endif::community[]
|
|
|
|
ifdef::product[]
|
|
link:{LinkServiceRegistryUser}[{registry-name-full}] provides the following components that work with Avro:
|
|
endif::product[]
|
|
|
|
* An Avro converter that you can specify in {prodname} connector configurations.
|
|
This converter maps Kafka Connect schemas to Avro schemas.
|
|
The converter then uses the Avro schemas to serialize the record keys and values into Avro's compact binary form.
|
|
|
|
* An API and schema registry that tracks:
|
|
+
|
|
** Avro schemas that are used in Kafka topics.
|
|
** Where the Avro converter sends the generated Avro schemas.
|
|
|
|
+
|
|
Because the Avro schemas are stored in this registry, each record needs to contain only a tiny _schema identifier_.
|
|
This makes each record even smaller. For an I/O bound system like Kafka, this means more total throughput for producers and consumers.
|
|
|
|
* Avro _Serdes_ (serializers and deserializers) for Kafka producers and consumers.
|
|
Kafka consumer applications that you write to consume change event records can use Avro Serdes to deserialize the change event records.
|
|
|
|
To use the {registry} with {prodname}, add {registry} converters and their dependencies to the Kafka Connect container image that you are using for running a {prodname} connector.
|
|
|
|
[NOTE]
|
|
====
|
|
The {registry} project also provides a JSON converter.
|
|
This converter combines the advantage of less verbose messages with human-readable JSON.
|
|
Messages do not contain the schema information themselves, but only a schema ID.
|
|
====
|
|
|
|
[NOTE]
|
|
====
|
|
To use converters provided by {registry} you need to provide `apicurio.registry.url`.
|
|
====
|
|
|
|
// Type: concept
|
|
// Title: Overview of deploying a {prodname} connector that uses Avro serialization
|
|
[id="overview-of-deploying-a-debezium-connector-that-uses-avro-serialization"]
|
|
=== {registry} deployment overview
|
|
|
|
To deploy a {prodname} connector that uses Avro serialization, you must complete three main tasks:
|
|
|
|
ifdef::community[]
|
|
. Deploy an link:https://github.com/Apicurio/apicurio-registry[{registry-name-full}] instance.
|
|
endif::community[]
|
|
ifdef::product[]
|
|
. Deploy a {registry-name-full} instance by following the instructions in link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}].
|
|
endif::product[]
|
|
|
|
ifdef::community[]
|
|
. Install the Avro converter from the link:https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/{apicurio-version}/apicurio-registry-distro-connect-converter-{apicurio-version}.tar.gz[installation package] into a plug-in directory. If you are using the link:https://quay.io/repository/debezium/connect[Debezium Connect container image], it's not necessary to install the package. For more information, see <<deploying-with-debezium-containers>>.
|
|
endif::community[]
|
|
ifdef::product[]
|
|
. Install the Avro converter by downloading the {prodname} link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?product=red.hat.integration&downloadType=distributions[Service Registry Kafka Connect] zip file and extracting it into the {prodname} connector's directory.
|
|
endif::product[]
|
|
|
|
. Configure a {prodname} connector instance to use Avro serialization by setting configuration properties as follows:
|
|
+
|
|
[source,options="nowrap"]
|
|
----
|
|
key.converter=io.apicurio.registry.utils.converter.AvroConverter
|
|
key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
|
|
key.converter.apicurio.registry.auto-register=true
|
|
key.converter.apicurio.registry.find-latest=true
|
|
value.converter=io.apicurio.registry.utils.converter.AvroConverter
|
|
value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
|
|
value.converter.apicurio.registry.auto-register=true
|
|
value.converter.apicurio.registry.find-latest=true
|
|
schema.name.adjustment.mode=avro
|
|
----
|
|
|
|
Internally, Kafka Connect always uses JSON key/value converters for storing configuration and offsets.
|
|
|
|
// Type: procedure
|
|
// Title: Deploying connectors that use Avro in {prodname} containers
|
|
// ModuleID: deploying-connectors-that-use-avro-in-debezium-containers
|
|
[id="deploying-with-debezium-containers"]
|
|
=== Deploying {registry} with {prodname} containers
|
|
ifdef::community[]
|
|
In your environment, you might want to use a provided {prodname} container image to deploy {prodname} connectors that use Avro serialization. Follow the procedure here to do that. In this procedure, you enable Apicurio converters on the {prodname} Kafka Connect container image, and configure the {prodname} connector to use the Avro converter.
|
|
endif::community[]
|
|
ifdef::product[]
|
|
In your environment, you might want to use a provided {prodname} container to deploy {prodname} connectors that use Avro serialization.
|
|
Complete the following procedure to build a custom Kafka Connect container image for {prodname}, and configure the {prodname} connector to use the Avro converter.
|
|
endif::product[]
|
|
|
|
.Prerequisites
|
|
|
|
* You have Docker installed and sufficient rights to create and manage containers.
|
|
* You downloaded the {prodname} connector plug-in(s) that you want to deploy with Avro serialization.
|
|
|
|
.Procedure
|
|
|
|
ifdef::community[]
|
|
. Deploy an instance of {registry}.
|
|
+
|
|
The following example uses a non-production, in-memory, {registry} instance:
|
|
+
|
|
[source,subs="attributes+"]
|
|
----
|
|
docker run -it --rm --name apicurio \
|
|
-p 8080:8080 apicurio/apicurio-registry-mem:{apicurio-version}
|
|
----
|
|
|
|
. Run the {prodname} container image for Kafka Connect, configuring it to provide the Avro converter by enabling Apicurio via `ENABLE_APICURIO_CONVERTERS=true` environment variable:
|
|
+
|
|
[source,subs="attributes+"]
|
|
----
|
|
docker run -it --rm --name connect \
|
|
--link zookeeper:zookeeper \
|
|
--link kafka:kafka \
|
|
--link mysql:mysql \
|
|
--link apicurio:apicurio \
|
|
-e ENABLE_APICURIO_CONVERTERS=true \
|
|
-e GROUP_ID=1 \
|
|
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
|
|
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
|
|
-e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
|
|
-e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
|
|
-e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
|
|
-e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
|
|
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
|
|
-e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
|
|
-e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
|
|
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
|
|
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
|
|
-e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
|
|
-e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \
|
|
-p 8083:8083 debezium/connect:{debezium-docker-label}
|
|
----
|
|
endif::community[]
|
|
|
|
ifdef::product[]
|
|
. Deploy an instance of {registry}. See link:{LinkServiceRegistryInstall}[{NameServiceRegistryInstall}], which provides instructions for:
|
|
+
|
|
* Installing {registry}
|
|
* Installing AMQ Streams
|
|
* Setting up AMQ Streams storage
|
|
|
|
. Extract the {prodname} connector archives to create a directory structure for the connector plug-ins.
|
|
If you downloaded and extracted the archives for multiple {prodname} connectors, the resulting directory structure looks like the one in the following example:
|
|
+
|
|
[subs=+macros]
|
|
----
|
|
pass:quotes[*tree ./my-plugins/*]
|
|
./my-plugins/
|
|
├── debezium-connector-mongodb
|
|
| ├── ...
|
|
├── debezium-connector-mysql
|
|
│ ├── ...
|
|
├── debezium-connector-postgres
|
|
│ ├── ...
|
|
└── debezium-connector-sqlserver
|
|
├── ...
|
|
----
|
|
|
|
. Add the Avro converter to the directory that contains the {prodname} connector that you want to configure to use Avro serialization:
|
|
|
|
.. Go to the link:{LinkDebeziumDownloads}[{NameDebeziumDownloads}] and download the {registry} Kafka Connect zip file.
|
|
.. Extract the archive into the desired {prodname} connector directory.
|
|
|
|
+
|
|
To configure more than one type of {prodname} connector to use Avro serialization, extract the archive into the directory for each relevant connector type.
|
|
Although extracting the archive to each directory duplicates the files, by doing so you remove the possibility of conflicting dependencies.
|
|
|
|
. Create and publish a custom image for running {prodname} connectors that are configured to use the Avro converter:
|
|
|
|
.. Create a new `Dockerfile` by using `{DockerKafkaConnect}` as the base image.
|
|
In the following example, replace _my-plugins_ with the name of your plug-ins directory:
|
|
+
|
|
[subs="+attributes,+macros"]
|
|
----
|
|
FROM {DockerKafkaConnect}
|
|
USER root:root
|
|
pass:quotes[COPY _./my-plugins/_ /opt/kafka/plugins/]
|
|
USER 1001
|
|
----
|
|
+
|
|
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the `/opt/kafka/plugins` directory.
|
|
|
|
.. Build the docker container image. For example, if you saved the docker file that you created in the previous step as `debezium-container-with-avro`, then you would run the following command:
|
|
+
|
|
`docker build -t debezium-container-with-avro:latest`
|
|
|
|
.. Push your custom image to your container registry, for example:
|
|
+
|
|
`docker push _<myregistry.io>_/debezium-container-with-avro:latest`
|
|
|
|
.. Point to the new container image. Do one of the following:
|
|
+
|
|
* Edit the `KafkaConnect.spec.image` property of the `KafkaConnect` custom resource.
|
|
If set, this property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator.
|
|
For example:
|
|
+
|
|
[source,yaml,subs=attributes+]
|
|
----
|
|
apiVersion: {KafkaConnectApiVersion}
|
|
kind: KafkaConnect
|
|
metadata:
|
|
name: my-connect-cluster
|
|
spec:
|
|
#...
|
|
image: debezium-container-with-avro
|
|
----
|
|
+
|
|
* In the `install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml` file, edit the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you will need to apply it to your OpenShift cluster.
|
|
|
|
. Deploy each {prodname} connector that is configured to use the Avro converter.
|
|
For each {prodname} connector:
|
|
|
|
.. Create a {prodname} connector instance. The following `inventory-connector.yaml` file example creates a `KafkaConnector` custom resource that defines a MySQL connector instance that is configured to use the Avro converter:
|
|
+
|
|
[source,yaml,options="nowrap"]
|
|
----
|
|
apiVersion: kafka.strimzi.io/v1beta1
|
|
kind: KafkaConnector
|
|
metadata:
|
|
name: inventory-connector
|
|
labels:
|
|
strimzi.io/cluster: my-connect-cluster
|
|
spec:
|
|
class: io.debezium.connector.mysql.MySqlConnector
|
|
tasksMax: 1
|
|
config:
|
|
database.hostname: mysql
|
|
database.port: 3306
|
|
database.user: debezium
|
|
database.password: dbz
|
|
database.server.id: 184054
|
|
topic.prefix: dbserver1
|
|
database.include.list: inventory
|
|
schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092
|
|
schema.history.internal.kafka.topic: schema-changes.inventory
|
|
schema.name.adjustment.mode: avro
|
|
key.converter: io.apicurio.registry.utils.converter.AvroConverter
|
|
key.converter.apicurio.registry.url: http://apicurio:8080/api
|
|
key.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
|
|
value.converter: io.apicurio.registry.utils.converter.AvroConverter
|
|
value.converter.apicurio.registry.url: http://apicurio:8080/api
|
|
value.converter.apicurio.registry.global-id: io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
|
|
----
|
|
|
|
.. Apply the connector instance, for example:
|
|
+
|
|
`oc apply -f inventory-connector.yaml`
|
|
+
|
|
This registers `inventory-connector` and the connector starts to run against the `inventory` database.
|
|
|
|
. Verify that the connector was created and has started to track changes in the specified database.
|
|
You can verify the connector instance by watching the Kafka Connect log output as, for example, `inventory-connector` starts.
|
|
|
|
.. Display the Kafka Connect log output:
|
|
+
|
|
[source,shell,options="nowrap"]
|
|
----
|
|
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
|
|
----
|
|
|
|
.. Review the log output to verify that the initial snapshot has been executed.
|
|
You should see something like the following lines:
|
|
+
|
|
[source,shell,options="nowrap"]
|
|
----
|
|
...
|
|
2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
...
|
|
----
|
|
+
|
|
Taking the snapshot involves a number of steps:
|
|
+
|
|
[source,shell,options="nowrap"]
|
|
----
|
|
...
|
|
2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL primary server (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:30,843 INFO using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
...
|
|
2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
|
|
...
|
|
----
|
|
+
|
|
After completing the snapshot, {prodname} begins tracking changes in, for example, the `inventory` database's `binlog` for change events:
|
|
+
|
|
[source,shell,options="nowrap"]
|
|
----
|
|
...
|
|
2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
|
|
2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
|
|
2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
|
|
Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
|
|
INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
|
|
2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
|
|
...
|
|
----
|
|
endif::product[]
|
|
|
|
ifdef::community[]
|
|
[id="confluent-schema-registry"]
|
|
== Confluent Schema Registry
|
|
|
|
There is an alternative https://github.com/confluentinc/schema-registry[schema registry] implementation provided by Confluent.
|
|
|
|
[id="overview-of-deploying-confluent-schema-registry"]
|
|
=== Confluent Schema Registry deployment overview
|
|
|
|
For information about installing the standalone Confluent Schema Registry, see the https://docs.confluent.io/platform/current/installation/installing_cp/overview.html[Confluent Platform deployment documentation].
|
|
|
|
As an alternative, you can https://docs.confluent.io/platform/current/installation/docker/installation.html[install] the standalone Confluent Schema Registry as a container.
|
|
|
|
[id="deploying-confluent-schema-registry-with-debezium-containers"]
|
|
=== Deploying Confluent Schema Registry with {prodname} containers
|
|
|
|
Beginning with {prodname} 2.0.0, Confluent Schema Registry support is not included in the {prodname} containers.
|
|
To enable the Confluent Schema Registry for a {prodname} container, install the following Confluent Avro converter JAR files into the Connect plugin directory:
|
|
|
|
* `kafka-connect-avro-converter`
|
|
* `kafka-connect-avro-data`
|
|
* `kafka-avro-serializer`
|
|
* `kafka-schema-serializer`
|
|
* `kafka-schema-registry-client`
|
|
* `common-config`
|
|
* `common-utils`
|
|
|
|
You can download the preceding files from the https://packages.confluent.io/maven/[Confluent Maven repository].
|
|
|
|
The configuration is slightly different.
|
|
|
|
. In your {prodname} connector configuration, specify the following properties:
|
|
+
|
|
[source]
|
|
----
|
|
key.converter=io.confluent.connect.avro.AvroConverter
|
|
key.converter.schema.registry.url=http://localhost:8081
|
|
value.converter=io.confluent.connect.avro.AvroConverter
|
|
value.converter.schema.registry.url=http://localhost:8081
|
|
----
|
|
|
|
. Deploy an instance of the Confluent Schema Registry:
|
|
+
|
|
[source]
|
|
----
|
|
docker run -it --rm --name schema-registry \
|
|
--link zookeeper \
|
|
-e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
|
|
-e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
|
|
-e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
|
|
-p 8181:8181 confluentinc/cp-schema-registry
|
|
----
|
|
|
|
. Run a Kafka Connect image configured to use Avro:
|
|
+
|
|
[source,subs="attributes+"]
|
|
----
|
|
docker run -it --rm --name connect \
|
|
--link zookeeper:zookeeper \
|
|
--link kafka:kafka \
|
|
--link mysql:mysql \
|
|
--link schema-registry:schema-registry \
|
|
-e GROUP_ID=1 \
|
|
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
|
|
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
|
|
-e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
|
|
-e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
|
|
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
|
|
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
|
|
-p 8083:8083 debezium/connect:{debezium-docker-label}
|
|
----
|
|
|
|
. Run a console consumer that reads new Avro messages from the `db.myschema.mytable` topic and decodes to JSON:
|
|
+
|
|
[source,subs="attributes+"]
|
|
----
|
|
docker run -it --rm --name avro-consumer \
|
|
--link zookeeper:zookeeper \
|
|
--link kafka:kafka \
|
|
--link mysql:mysql \
|
|
--link schema-registry:schema-registry \
|
|
debezium/connect:{debezium-docker-label} \
|
|
/kafka/bin/kafka-console-consumer.sh \
|
|
--bootstrap-server kafka:9092 \
|
|
--property print.key=true \
|
|
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
|
|
--property schema.registry.url=http://schema-registry:8081 \
|
|
--topic db.myschema.mytable
|
|
|
|
----
|
|
endif::community[]
|
|
|
|
// Type: concept
|
|
// Title: About Avro name requirements
|
|
// ModuleID: about-avro-name-requirements
|
|
[[avro-naming]]
|
|
== Naming
|
|
|
|
As stated in the Avro link:https://avro.apache.org/docs/current/spec.html#names[documentation], names must adhere to the following rules:
|
|
|
|
* Start with `[A-Za-z_]`
|
|
* Subsequently contains only `[A-Za-z0-9_]` characters
|
|
|
|
{prodname} uses the column's name as the basis for the corresponding Avro field.
|
|
This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules.
|
|
Each {prodname} connector provides a configuration property, `field.name.adjustment.mode` that you can set to `avro` if you have columns that do not adhere to Avro rules for names.
|
|
Setting `field.name.adjustment.mode` to `avro` allows serialization of non-conformant fields without having to actually modify your schema.
|
|
|
|
ifdef::community[]
|
|
== Getting More Information
|
|
|
|
link:/blog/2016/09/19/Serializing-Debezium-events-with-Avro/[This post] from the {prodname} blog
|
|
describes the concepts of serializers, converters, and other components, and discusses the advantages of using Avro.
|
|
Some Kafka Connect converter details have slightly changed since that post was written.
|
|
|
|
For a complete example of using Avro as the message format for {prodname} change data events,
|
|
see https://github.com/debezium/debezium-examples/tree/main/tutorial#using-mysql-and-the-avro-message-format[MySQL and the Avro message format].
|
|
|
|
endif::community[]
|