tet123/documentation/modules/ROOT/pages/connectors/vitess.adoc
2024-06-25 12:10:13 +02:00

1894 lines
98 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Category: debezium-using
// Type: assembly
[id="debezium-connector-for-vitess"]
= {prodname} connector for Vitess
:context: vitess
:mbean-name: {context}
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
{prodname}'s Vitess connector captures row-level changes in the shards of a Vitess link:https://vitess.io/docs/concepts/keyspace/[keyspace].
For information about the Vitess versions that are compatible with this connector, see the link:https://debezium.io/releases/[{prodname} release overview].
When the connector first connects to a Vitess cluster, it takes a consistent snapshot of the keyspace.
After that snapshot is complete, the connector continuously captures row-level changes that are committed to a Vitess keyspace to insert, update, or delete database content.
The connector generates data change event records and streams them to Kafka topics.
For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table.
Applications and services can then consume data change event records from the resulting topic.
// Type: concept
// Title: Overview of {prodname} Vitess connector
// ModuleID: overview-of-debezium-vitess-connector
[[vitess-overview]]
== Overview
Vitess's link:https://vitess.io/docs/concepts/vstream/[VStream] feature was introduced in version 4.0. It is a change event subscription service that provides equivalent information to the MySQL binary logs from the underlying MySQL shards of the Vitess cluster. An user can subscribe to multiple shards in a keyspace, making it a convenient tool to feed downstream CDC processes.
To read and process database changes, the Vitess connector subscribes to link:https://vitess.io/docs/concepts/vtgate/[VTGate]'s VStream gRPC service. VTGate is a lightweight, stateless gRPC server, which is part of the Vitess cluster setup.
The connector gives you the flexibility to choose to subscribe to the `MASTER` nodes, or to the `REPLICA` nodes for change events.
The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.
The underlying MySQL implementation in Vitess purges binary logs based on some configurable period of time.
Because the contents of the binlog might not be complete, the connector requires another mechanism to ensure that it captures the complete content of a particular database.
Thus, when the connector connects to a database for the first time, it performs a consistent snapshot of the database.
After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made.
In this way, the connector starts with a consistent view of all of the data, and does not omit any changes that were made while the snapshot was being taken.
The connector is tolerant of failures.
As the connector reads changes and produces events, it records the Vitess global transaction ID (VGTID) position for each event.
If the connector stops for any reason (including communication failures, network problems, or crashes), after the connector restarts, it continues reading from VStream from the last change event entry that it stored.
This behavior does not apply to snapshots. If the connector stops during a snapshot, after a restart, the connector does not continue performing snapshots where it last left off.
We'll talk later about how the connector behaves xref:{link-vitess-connector}#vitess-when-things-go-wrong[when things go wrong].
// Type: assembly
// ModuleID: how-debezium-vitess-connectors-work
// Title: How {prodname} Vitess connectors work
[[how-the-vitess-connector-works]]
== How the connector works
To optimally configure and run a {prodname} Vitess connector, it is helpful to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata.
// Type: concept
// ModuleID: how-debezium-vitess-connectors-perform-snapshot
// Title: How {prodname} Vitess connectors perform a snapshot
[[vitess-snapshot]]
=== Snapshot
Typically, a MySQL server is not configured to retain the complete history of the database in the binary logs.
As a result, the connector is unable to read the entire history of the database from the binary logs.
For this reason, the first time that the connector starts, it performs an initial consistent snapshot of the database.
You can change this behavior by setting the xref:{link-vitess-connector}#vitess-property-snapshot-mode[`snapshot.mode` connector configuration property] to a value other than `initial`.
This snapshot feature is built on link:https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/[VStream Copy] introduced in version 7.0.
[NOTE]
====
Automatic retry of a failed snapshot is expected to be available in a future release.
====
// Type: concept
// ModuleID: how-debezium-vitess-connectors-stream-change-event-records
// Title: How {prodname} Vitess connectors stream change event records
[[vitess-streaming-changes]]
=== Streaming changes
The Vitess connector spends all its time streaming changes from the VTGate's VStream gRPC service to which it is subscribed. The client receives changes from VStream as they are committed in the underlying MySQL server's binlog at certain positions, which are referred to as VGTID.
The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the position in the VStream in which a change event happens. Typically, A VGTID has multiple shard GTIDs, each shard GTID is a tuple of `(Keyspace, Shard, GTID)`, which describes the GTID position of a given shard.
When subscribing to a VStream service, the connector needs to provide a VGTID and a link:https://vitess.io/docs/concepts/tablet/#tablet-types[Tablet Type] (e.g. `MASTER`, `REPLICA`). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from.
The first time the connector connects to a Vitess cluster, it gets the current VGTID from a Vitess component called link:https://vitess.io/docs/concepts/vtctld/[VTCtld] and provides the current VGTID to VStream.
The {prodname} Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into {prodname} _create_, _update_, or _delete_ events that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
Periodically, Kafka Connect records the most recent _offset_ in another Kafka topic. The offset indicates source-specific position information that {prodname} includes with each event. For the Vitess connector, the VGTID recorded in each change event is the offset.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to VStream to send the events starting just after that position.
// Type: concept
// ModuleID: default-names-of-kafka-topics-that-receive-debezium-vitess-change-event-records
// Title: Default names of Kafka topics that receive {prodname} Vitess change event records
[[vitess-topic-names]]
=== Topics names
The Vitess connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is _topicPrefix_._keyspaceName_._tableName_ where:
* _topicPrefix_ is the topic prefix as specified by the `topic.prefix` connector configuration property.
* _keyspaceName_ is the name of the keyspace (a.k.a. database) where the operation occurred.
* _tableName_ is the name of the database table in which the operation occurred.
For example, suppose that `fulfillment` is the logical server name in the configuration for a connector that is capturing changes in a Vitess installation that has an `commerce` keyspace that contains four tables: `products`, `products_on_hand`, `customers`, and `orders`. Regardless of how many shards the keyspace has, the connector would stream records to these four Kafka topics:
* `fulfillment.commerce.products`
* `fulfillment.commerce.products_on_hand`
* `fulfillment.commerce.customers`
* `fulfillment.commerce.orders`
[[vitess-transaction-metadata]]
=== Transaction metadata
{prodname} can generate events that represent transaction boundaries and that enrich data change event messages.
[NOTE]
.Limits on when {prodname} receives transaction metadata
====
{prodname} registers and receives metadata only for transactions that occur after you deploy the connector.
Metadata for transactions that occur before you deploy the connector is not available.
====
{prodname} generates transaction boundary events for the `BEGIN` and `END` delimiters in every transaction.
Transaction boundary events contain the following fields:
`status`:: `BEGIN` or `END`.
`id`:: String representation of the unique transaction identifier.
`ts_ms`:: The time of a transaction boundary event (`BEGIN` or `END` event) at the data source.
If the data source does not provide {prodname} with the event time, then the field instead represents the time at which {prodname} processes the event. Note:`ts_ms` is in milliseconds, but only has seconds-level precision due to a limitation of MySQL, which can only provide binlog timestamps in seconds.
`event_count` (for `END` events):: Total number of events emitted by the transaction.
`data_collections` (for `END` events):: An array of pairs of `data_collection` and `event_count` elements that indicates the number of events that the connector emits for changes that originate from a data collection.
.Example
[source,json,indent=0,subs="+attributes"]
----
{
"status": "BEGIN",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": 1,
"data_collections": [
{
"data_collection": "test_unsharded_keyspace.my_seq",
"event_count": 1
}
]
}
----
Unless overridden via the xref:vitess-property-topic-transaction[`topic.transaction`] option,
the connector emits transaction events to the xref:vitess-property-topic-prefix[`_<topic.prefix>_`]`.transaction` topic.
.Change data event enrichment
When transaction metadata is enabled the data message `Envelope` is enriched with a new `transaction` field.
This field provides information about every event in the form of a composite of fields:
* `id` - string representation of unique transaction identifier
* `total_order` - absolute position of the event among all events generated by the transaction
* `data_collection_order` - the per-data collection position of the event among all events that were emitted by the transaction
Following is an example of a message:
[source,json,indent=0,subs="+attributes"]
----
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1
}
}
----
[[vitess-ordered-transaction-metadata]]
=== Ordered Transaction Metadata
You can configure {prodname} to include additional metadata in data change event records.
Such supplemental metadata can assist downstream consumers in processing messages in the correct order when repartitioning, or some other disruption, might otherwise lead to data being consumed out of order.
.Change Data Enrichment
To configure the connector to emit enriched data change event records, set the xref:vitess-property-transaction-metadata-factory[`transaction.metadata.factory`] property.
When this property is set to `VitessOrderedTransactionMetadataFactory`, the connector includes a `transaction` field in the message `Envelope`.
The `transaction` field adds metadata that provides information about the order in which transactions occur.
The following fields are added to every message:
`transaction_epoch`:: A non-decreasing value that represents the epoch that the transaction rank belongs to.
`transaction_rank`:: A non-decreasing value within an epoch that represents the order of the transaction.
A third field is also relevant to event ordering:
`total_order`:: Represents the absolute position of an event among all events generated by a transaction.
This field is included by default in the standard transaction metadata.
The following example illustrates how to use these fields to establish event order.
Suppose {prodname} emits change event records for two events that occur in the same shard and share the same primary key.
If the Kafka topic where these events are sent is repartitioned, then the consumer order of the two events cannot be trusted.
If {prodname} is configured to provide enriched transaction metadata, applications that consume from the topic can apply the following logic to determine which of the two event to apply (the newer event) and which to discard:
1. If the values for `transaction_epoch` are not equal, return the event with the higher `transaction_epoch` value. Otherwise, continue.
2. If the values for `transaction_rank` are not equal, return the event with the higher `transaction_rank` value. Otherwise, continue.
3. Return the event with a greater `total_order` value.
If neither of the two events have a higher `total_order` value, then the events are part of the same transaction.
Because the `total_order` field represents the order of events within a transaction, the event with the greater value is the most recent event.
The following example shows a data change event with ordered transaction metadata:
[source,json,indent=0,subs="+attributes"]
----
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1,
"transaction_rank": 68,
"transaction_epoch": 0
}
}
----
[[vitess-efficient-transaction-metadata]]
=== Efficient Transaction Metadata
If you enable the connector to provide transaction metadata, it generates significantly more data.
Not only does the connector send additional messages to the transaction topics, but messages that it sends to the data change topics are larger, because they include a transaction metadata block.
The added volume is due to the following factors:
* The VGTID is stored twice, once as `source.vgtid`, and then again as `transaction.id`.
In keyspaces that include many shards, these VGTIDs can be quite large
* In a sharded environment, the VGTID typically contains the VGTID for every shard.
In keyspaces with many shards, the amount of data in the VGTID field can be quite large.
* The connector sends transaction topic messages for every transaction boundary event.
Typically, keyspaces that include many shards tend to generate a high number of transaction boundary events.
To enable the Vitess connector to encode transaction metadata without significantly increasing the volume of data produced, {prodname} provides several single message transformations (SMTs).
The following SMTs are designed to reduce the amount of data in events that the Vitess connector emits:
* xref:transformations/vitess-use-local-vgtid.adoc[Use Local VGTID]
* xref:transformations/vitess-remove-field.adoc[Remove Field]
* xref:transformations/vitess-filter-transaction-topic-records.adoc[Filter Transaction Topic Records]
The following example shows an excerpt from the configuration of a Vitess connector that uses the preceding transformations:
```
}
[...]
"provide.transaction.metadata": "true",
"transaction.metadata.factory": "io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory",
"transforms": "filterTransactionTopicRecords,removeField,useLocalVgtid",
"transforms.filterTransactionTopicRecords.type": "io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords",
"transforms.removeField.type": "io.debezium.connector.vitess.transforms.RemoveField",
"transforms.removeField.field_names": "transaction.id",
"transforms.useLocalVgtid.type": "io.debezium.connector.vitess.transforms.UseLocalVgtid",
[...]
}
```
// Type: assembly
// ModuleID: descriptions-of-debezium-vitess-connector-data-change-events
// Title: Descriptions of {prodname} Vitess connector data change events
[[vitess-events]]
== Data change events
The {prodname} Vitess connector generates a data change event for each row-level `INSERT`, `UPDATE`, and `DELETE` operation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.
{prodname} and Kafka Connect are designed around _continuous streams of event messages_. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A `schema` field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:
[source,json,index=0]
----
{
"schema": { // <1>
...
},
"payload": { // <2>
...
},
"schema": { // <3>
...
},
"payload": { // <4>
...
},
}
----
.Overview of change event basic content
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The first `schema` field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key's `payload` portion. In other words, the first `schema` field describes the structure of the primary key, or the first single-column unique key if the table does not have a primary key, for the table that was changed. Multi-column unique key is not supported. +
+
It is possible to override the table's primary key by setting the xref:vitess-property-message-key-columns[`message.key.columns` connector configuration property]. In this case, the first schema field describes the structure of the key identified by that property.
|2
|`payload`
|The first `payload` field is part of the event key. It has the structure described by the previous `schema` field and it contains the key for the row that was changed.
|3
|`schema`
|The second `schema` field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value's `payload` portion. In other words, the second `schema` describes the structure of the row that was changed. Typically, this schema contains nested schemas.
|4
|`payload`
|The second `payload` field is part of the event value. It has the structure described by the previous `schema` field and it contains the actual data for the row that was changed.
|===
By default behavior is that the connector streams change event records to xref:vitess-topic-names[topics with names that are the same as the event's originating table].
[NOTE]
====
Starting with Kafka 0.10, Kafka can optionally record the event key and value with the {link-kafka-docs}.html#upgrade_10_performance_impact[_timestamp_] at which the message was created (recorded by the producer) or written to the log by Kafka.
====
[WARNING]
====
The Vitess connector ensures that all Kafka Connect schema names adhere to the http://avro.apache.org/docs/current/spec.html#names[Avro schema name format]. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or \_. Each remaining character in the logical server name and each character in the schema and table names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.
This can lead to unexpected conflicts if the logical server name, a schema name, or a table name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.
====
[IMPORTANT]
====
The connector doesn't allow to name columns with the `@` prefix at the moment. For example, `age` is a valid column name, and `@age` is not. The reason is that Vitess vstreamer has a bug that would send events with anonymized column names (e.g. column name `age` is anonymized to `@1`). There's no easy way to differentiate between a legit column name with the `@` prefix, and the Vitess bug. See more discussion link:https://vitess.slack.com/archives/C0PQY0PTK/p1606817216038500[here].
====
// Type: concept
// ModuleID: about-keys-in-debezium-vitess-change-events
// Title: About keys in {prodname} Vitess change events
[[vitess-change-events-key]]
=== Change event keys
For a given table, the change event's key has a structure that contains a field for each column in the primary key of the table at the time the event was created.
Consider a `customers` table defined in the `commerce` keyspace and the example of a change event key for that table.
.Example table
[source,sql,indent=0]
----
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
----
.Example change event key
If the `topic.prefix` connector configuration property has the value `Vitess_server`, every change event for the `customers` table while it has this definition has the same key structure, which in JSON looks like this:
[source,json,indent=0]
----
{
"schema": { // <1>
"type": "struct",
"name": "Vitess_server.commerce.customers.Key", // <2>
"optional": false, // <3>
"fields": [ // <4>
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": { // <5>
"id": "1"
},
}
----
.Description of change event key
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The schema portion of the key specifies a Kafka Connect schema that describes what is in the key's `payload` portion.
|2
|`Vitess_server.commerce.customers.Key`
a|Name of the schema that defines the structure of the key's payload. This schema describes the structure of the primary key for the table that was changed. Key schema names have the format _connector-name_._keyspace-name_._table-name_.`Key`. In this example: +
* `Vitess_server` is the name of the connector that generated this event. +
* `commerce` is the keyspace that contains the table that was changed. +
* `customers` is the table that was updated.
|3
|`optional`
|Indicates whether the event key must contain a value in its `payload` field. In this example, a value in the key's payload is required. A value in the key's payload field is optional when a table does not have a primary key.
|4
|`fields`
|Specifies each field that is expected in the `payload`, including each field's name, index, and schema.
|5
|`payload`
|Contains the key for the row for which this change event was generated. In this example, the key, contains a single `id` field whose value is `1`.
|===
[NOTE]
====
Although the `column.exclude.list` and `column.include.list` connector configuration properties allow you to capture only a subset of table columns, all columns in a primary or unique key are always included in the event's key.
====
[WARNING]
====
If the table does not have a primary, then the change event's key is null. The rows in a table without a primary key constraint cannot be uniquely identified.
====
// Type: concept
// ModuleID: about-values-in-debezium-vitess-change-events
// Title: About values in {prodname} Vitess change events
[[vitess-change-events-value]]
=== Change event values
The value in a change event is a bit more complicated than the key. Like the key, the value has a `schema` section and a `payload` section. The `schema` section contains the schema that describes the `Envelope` structure of the `payload` section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.
Consider the same sample table that was used to show an example of a change event key:
[source,sql,indent=0]
----
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
----
The emitted events for `UPDATE` and `DELETE` operations contain the previous values of all columns in the table.
// Type: continue
[[vitess-create-events]]
=== _create_ events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the `customers` table:
[source,json,options="nowrap",indent=0,subs="+attributes"]
----
{
"schema": { // <1>
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value", // <2>
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "keyspace"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": false,
"field": "shard"
},
{
"type": "int64",
"optional": true,
"field": "vgtid"
}
],
"optional": false,
"name": "io.debezium.connector.vitess.Source", // <3>
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "Vitess_server.commerce.customers.Envelope" // <4>
},
"payload": { // <5>
"before": null, // <6>
"after": { // <7>
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { // <8>
"version": "{debezium-version}",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
},
"op": "c", // <9>
"ts_ms": 1559033904863, // <10>
"ts_us": 1559033904863497, // <10>
"ts_ns": 1559033904863497147 // <10>
}
}
----
.Descriptions of _create_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The value's schema, which describes the structure of the value's payload. A change event's value schema is the same in every change event that the connector generates for a particular table.
|2
|`name`
a|In the `schema` section, each `name` field specifies the schema for a field in the value's payload. +
+
`Vitess_server.commerce.customers.Value` is the schema for the payload's `before` and `after` fields. This schema is specific to the `customers` table. +
+
Names of schemas for `before` and `after` fields are of the form `_logicalName_._keyspaceName_._tableName_.Value`, which ensures that the schema name is unique in the database. This means that when using the xref:{link-avro-serialization}[Avro converter], the resulting Avro schema for each table in each logical source has its own evolution and history.
|3
|`name`
a|`io.debezium.connector.vitess.Source` is the schema for the payload's `source` field. This schema is specific to the Vitess connector. The connector uses it for all events that it generates.
|4
|`name`
a|`Vitess_server.commerce.customers.Envelope` is the schema for the overall structure of the payload, where `Vitess_server` is the connector name, `commerce` is the keyspace, and `customers` is the table.
|5
|`payload`
|The value's actual data. This is the information that the change event is providing. +
+
It may appear that the JSON representations of the events are much larger than the rows they describe. This is because the JSON representation must include the schema and the payload portions of the message.
However, by using the xref:{link-avro-serialization}[Avro converter], you can significantly decrease the size of the messages that the connector streams to Kafka topics.
|6
|`before`
a|An optional field that specifies the state of the row before the event occurred. When the `op` field is `c` for create, as it is in this example, the `before` field is `null` since this change event is for new content.
|7
|`after`
|An optional field that specifies the state of the row after the event occurred. In this example, the `after` field contains the values of the new row's `id`, `first_name`, `last_name`, and `email` columns.
|8
|`source`
a|Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
* {prodname} version
* Connector type and name
* Database (also known as keyspace), table, and shard that contains the new row
* If the event was part of a snapshot (always `false`)
* VGTID of the operation
* Timestamp for when the change was made in the database
|9
|`op`
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `c` indicates that the operation created a row. Valid values are:
* `c` = create
* `u` = update
* `d` = delete
|10
|`ts_ms`, `ts_us`, `ts_ns`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}. Note:`ts_ms` is in milliseconds, but only has seconds-level precision due to a limitation of MySQL, which can only provide binlog timestamps in seconds.
|===
// Type: continue
[[vitess-update-events]]
=== _update_ events
The value of a change event for an update in the sample `customers` table has the same schema as a _create_ event for that table. Likewise, the event value's payload has the same structure. However, the event value payload contains different values in an _update_ event. Here is an example of a change event value in an event that the connector generates for an update in the `customers` table:
[source,json,indent=0,options="nowrap",subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"before": { // <1>
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { // <2>
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { // <3>
"version": "{debezium-version}",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]"
},
"op": "u", // <4>
"ts_ms": 1465584025523, // <5>
"ts_us": 1465584025523763, // <5>
"ts_ns": 1465584025523763547 // <5>
}
}
----
.Descriptions of _update_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`before`
|An optional field that contains all values of all columns that were in the row before the database commit.
|2
|`after`
|An optional field that specifies the state of the row after the event occurred. In this example, the `first_name` value is now `Anne Marie`.
|3
|`source`
a|Mandatory field that describes the source metadata for the event. The `source` field structure has the same fields as in a _create_ event, but some values are different. The source metadata includes:
* {prodname} version
* Connector type and name
* Database (also known as keyspace), table, and shard that contains the new row
* If the event was part of a snapshot (always `false`)
* VGTID of the operation
* Timestamp for when the change was made in the database
|4
|`op`
a|Mandatory string that describes the type of operation. In an _update_ event value, the `op` field value is `u`, signifying that this row changed because of an update.
|5
|`ts_ms`, `ts_us`, `ts_ns`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}. Note:`ts_ms` is in milliseconds, but only has seconds-level precision due to a limitation of MySQL, which can only provide binlog timestamps in seconds.
|===
[NOTE]
====
Updating the columns for a row's primary key changes the value of the row's key. When a key changes, {prodname} outputs _three_ events: a `DELETE` event and a xref:vitess-tombstone-events[tombstone event] with the old key for the row, followed by an event with the new key for the row. Details are in the next section.
====
[id="vitess-primary-key-updates"]
=== Primary key updates
An `UPDATE` operation that changes a row's primary key field(s) is known
as a primary key change. For a primary key change, in place of an `UPDATE` event record, the connector emits a `DELETE` event record for the old key and a `CREATE` event record for the new (updated) key. These events have the usual structure and content, and in addition, each one has a message header related to the primary key change:
* The `DELETE` event record has `__debezium.newkey` as a message header. The value of this header is the new primary key for the updated row.
* The `CREATE` event record has `__debezium.oldkey` as a message header. The value of this header is the previous (old) primary key that the updated row had.
[[vitess-delete-events]]
=== _delete_ events
The value in a _delete_ change event has the same `schema` portion as _create_ and _update_ events for the same table. The `payload` portion in a _delete_ event for the sample `customers` table looks like this:
[source,json,indent=0,subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"before": { // <1>
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, // <2>
"source": { // <3>
"version": "{debezium-version}",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]"
},
"op": "d", // <4>
"ts_ms": 1465581902461, // <5>
"ts_us": 1465581902461324, // <5>
"ts_ns": 1465581902461324871 // <5>
}
}
----
.Descriptions of _delete_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`before`
|Optional field that specifies the state of the row before the event occurred. In a _delete_ event value, the `before` field contains the values that were in the row before it was deleted with the database commit.
|2
|`after`
|Optional field that specifies the state of the row after the event occurred. In a _delete_ event value, the `after` field is `null`, signifying that the row no longer exists.
|3
|`source`
a|Mandatory field that describes the source metadata for the event. In a _delete_ event value, the `source` field structure is the same as for _create_ and _update_ events for the same table. Many `source` field values are also the same. In a _delete_ event value, the `ts_ms` and `lsn` field values, as well as other values, might have changed. But the `source` field in a _delete_ event value provides the same metadata:
* {prodname} version
* Connector type and name
* Database (also known as keyspace), table, and shard that contains the new row
* If the event was part of a snapshot (always `false`)
* VGTID of the operation
* Timestamp for when the change was made in the database
|4
|`op`
a|Mandatory string that describes the type of operation. The `op` field value is `d`, signifying that this row was deleted.
|5
|`ts_ms`, `ts_us`, `ts_ns`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}. Note:`ts_ms` is in milliseconds, but only has seconds-level precision due to a limitation of MySQL, which can only provide binlog timestamps in seconds.
|===
A _delete_ change event record provides a consumer with the information it needs to process the removal of this row.
Vitess connector events are designed to work with link:{link-kafka-docs}#compaction[Kafka log compaction]. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.
// Type: continue
[[vitess-tombstone-events]]
.Tombstone events
When a row is deleted, the _delete_ event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be `null`. To make this possible, the Vitess connector follows a _delete_ event with a special _tombstone_ event that has the same key but a `null` value.
// Type: reference
// ModuleID: how-debezium-vitess-connectors-map-data-types
// Title: How {prodname} Vitess connectors map data types
[[vitess-data-types]]
== Data type mappings
The Vitess connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the Vitess data type of the column. This section describes these mappings.
If the default data type conversions do not meet your needs, you can {link-prefix}:{link-custom-converters}#custom-converters[create a custom converter] for the connector.
[id="vitess-basic-types"]
=== Basic types
The following table describes how the connector maps basic Vitess data types to a _literal type_ and a _semantic type_ in event fields.
* _literal type_ describes how the value is literally represented using Kafka Connect schema types: `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`.
* _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field.
.Mappings for Vitess basic data types
[cols="25%a,20%a,55%a",options="header"]
|===
|Vitess data type
|Literal type (schema type)
|Semantic type (schema name) and Notes
|`BOOLEAN, BOOL`
|`INT16`
a|_n/a_
|`BIT(1)`
|Unsupported yet
a|_n/a_
|`BIT(>1)`
|Unsupported yet
a|_n/a_
|`TINYINT`
|`INT16`
a|_n/a_
|`SMALLINT[(M)]`
|`INT16`
a|_n/a_
|`MEDIUMINT[(M)]`
|`INT32`
a|_n/a_
|`INT, INTEGER[(M)]`
|`INT32`
a|_n/a_
|`BIGINT[(M)]`
|`INT64`
a|_n/a_
|`REAL[(M,D)]`
|`FLOAT64`
a|_n/a_
|`FLOAT[(M,D)]`
|`FLOAT64`
a|_n/a_
|`DOUBLE[(M,D)]`
|`FLOAT64`
a|_n/a_
|`CHAR[(M)]`
|`STRING`
a|_n/a_
|`VARCHAR[(M)]`
|`STRING`
a|_n/a_
|`BINARY[(M)]`
|`BYTES`
a|_n/a_
|`VARBINARY[(M)]`
|`BYTES`
a|_n/a_
|`TINYBLOB`
|`BYTES`
a|_n/a_
|`TINYTEXT`
|`STRING`
a|_n/a_
|`BLOB`
|`BYTES`
a|_n/a_
|`TEXT`
|`STRING`
a|_n/a_
|`MEDIUMBLOB`
|`BYTES`
a|_n/a_
|`MEDIUMTEXT`
|`STRING`
a|_n/a_
|`LONGBLOB`
|`BYTES`
a|_n/a_
|`LONGTEXT`
|`STRING`
a|_n/a_
|`JSON`
|`STRING`
a|`io.debezium.data.Json` +
Contains the string representation of a `JSON` document, array, or scalar.
|`ENUM`
|`STRING`
a|`io.debezium.data.Enum` +
The `allowed` schema parameter contains the comma-separated list of allowed values.
|`SET`
|`STRING`
a|`io.debezium.data.EnumSet` +
The `allowed` schema parameter contains the comma-separated list of allowed values.
|`YEAR[(2\|4)]`
|`INT32`
|io.debezium.time.Year
|`TIMESTAMP[(M)]`
|`STRING`
a|_n/a_ +
In `yyyy-MM-dd HH:mm:ss.SSS` format with microsecond precision based on UTC. MySQL allows `M` to be in the range of `0-6`.
|`NUMERIC[(M[,D])]`
|`STRING`
a|_n/a_
|`DECIMAL[(M[,D])]`
|`STRING`
a|_n/a_
|`GEOMETRY, +
LINESTRING, +
POLYGON, +
MULTIPOINT, +
MULTILINESTRING, +
MULTIPOLYGON, +
GEOMETRYCOLLECTION`
|Unsupported yet
a|_n/a_
|===
[id="vitess-temporal-types"]
=== Temporal types
Excluding the `TIMESTAMP` data type, Vitess temporal types depend on the value of the xref:vitess-property-time-precision-mode[`time.precision.mode`] connector configuration property.
.Temporal values without time zones
The `DATETIME` type represents a local date and time such as "2018-01-13 09:48:27".
As you can see in the preceding example, this type does not include time zone information.
Columns of this type are converted into epoch milliseconds or microseconds based on the columns precision by using UTC.
The `TIMESTAMP` type represents a timestamp without time zone information.
When writing data, MySQL converts the `TIMESTAMP` type from the time zone of the server or session into UTC format.
When it reads values, the database converts from UTC format to the current time zone of the server or session.
For example:
* `DATETIME` with a value of `2018-06-20 06:37:03` becomes `1529476623000`.
* `TIMESTAMP` with a value of `2018-06-20 06:37:03` becomes `2018-06-20T13:37:03Z`.
Such columns are converted into an equivalent `io.debezium.time.ZonedTimestamp` in UTC, based on the time zone of the server or session.
By default, {prodname} queries the server for the time zone.
If this fails, you must explicitly specify the timezone by setting the `connectionTimeZone` option in the JDBC connection string.
For example, if the databases time zone (either globally, or as configured for the connector by means of the `connectionTimeZone` option) is "America/Los_Angeles", the TIMESTAMP value "2018-06-20 06:37:03" is represented by a `ZonedTimestamp` with the value "2018-06-20T13:37:03Z".
The time zone of the JVM that runs Kafka Connect and {prodname} does not affect these conversions.
For more information about properties that affect temporal values, see the xref:vitess-connector-properties[connector configuration properties].
time.precision.mode=adaptive_time_microseconds(default)::
The Vitess connector determines the literal type and semantic type based on the column's data type definition so that events represent exactly the values in the database.
All time fields are in microseconds.
Only positive `TIME` field values in the range of `00:00:00.000000` to `23:59:59.999999` can be captured correctly.
+
.Mappings when `time.precision.mode=adaptive_time_microseconds`
[cols="25%a,20%a,55%a",options="header",subs="+attributes"]
|===
|Vitess type |Literal type |Semantic type
|`DATE`
|`INT32`
a|`io.debezium.time.Date` +
Represents the number of days elapsed since the UNIX epoch.
|`TIME[(M)]`
|`INT64`
a|`io.debezium.time.MicroTime` +
Represents the time value in microseconds and does not include time zone information.
MySQL allows `M` to be in the range of `0-6`.
|`DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)`
|`INT64`
a|`io.debezium.time.Timestamp` +
Represents the number of milliseconds elapsed since the UNIX epoch and does not include time zone information.
|`DATETIME(4), DATETIME(5), DATETIME(6)`
|`INT64`
a|`io.debezium.time.MicroTimestamp` +
Represents the number of microseconds elapsed since the UNIX epoch and does not include time zone information.
|===
time.precision.mode=connect::
The Vitess connector uses defined Kafka Connect logical types.
This approach is less precise than the default approach, and the events could be less precise if the database column has a _fractional second precision_ value that is greater than `3`.
The connector can process values that range from `00:00:00.000` to `23:59:59.999`.
Set `time.precision.mode=connect` only if you are certain that the `TIME` values in your tables never exceed the supported ranges.
The `connect` setting is expected to be removed in a future version of {prodname}.
+
.Mappings when `time.precision.mode=connect`
[cols="25%a,20%a,55%a",options="header",subs="+attributes"]
|===
|Vitess data type |Literal type |Semantic type
|`DATE`
|`INT32`
a|`org.apache.kafka.connect.data.Date` +
Represents the number of days elapsed since the UNIX epoch.
|`TIME[(M)]`
|`INT64`
a|`org.apache.kafka.connect.data.Time` +
Represents the time value in microseconds since midnight and does not include time zone information.
|`DATETIME[(M)]`
|`INT64`
a|`org.apache.kafka.connect.data.Timestamp` +
Represents the number of milliseconds elapsed since the UNIX epoch, and does not include time zone information.
|===
// Type: assembly
// ModuleID: setting-up-vitess-to-run-a-debezium-connector
// Title: Setting up Vitess to run a {prodname} connector
[[setting-up-vitess]]
== Seting up Vitess
{prodname} does not require any specific configuration for use with Vitess. Install Vitess according to the standard instructions in the link:https://vitess.io/docs/get-started/local-docker/[Local Install via Docker] guide, or the link:https://vitess.io/docs/get-started/operator/[Vitess Operator for Kubernetes] guide.
.Checklist
* Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed
* Make sure that the VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed
// Type: procedure
// ModuleID: grpc-authentication
// Title: gRPC authentication for a {prodname} connector
[[grpc-authentication]]
=== gRPC authentication
Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances. Therefore, no special database user and permissions are needed. At the moment, Vitess connector only supports unauthenticated access to the VTGate gRPC server.
// Type: assembly
// ModuleID: deploying-and-managing-debezium-vitess-connectors
// Title: Deploying and managing {prodname} Vitess connectors
[[vitess-deploying-a-connector]]
== Deployment
With link:https://zookeeper.apache.org[Zookeeper], link:http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect] installed, the remaining tasks to deploy a {prodname} Vitess connector are to download the link:https://repo1.maven.org/maven2/io/debezium/debezium-connector-vitess/{debezium-version}/debezium-connector-vitess-{debezium-version}-plugin.tar.gz[connector's plug-in archive], extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`]. You then need to restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see link:https://quay.io/organization/debezium[{prodname}'s Container images] for Zookeeper, Kafka and Kafka Connect with the Vitess connector already installed and ready to run. You can also xref:operations/openshift.adoc[run {prodname} on Kubernetes and OpenShift].
// Type: concept
// ModuleID:debezium-vitess-connector-configuration-example
// Title: {prodname} Vitess connector configuration example
[[vitess-example-configuration]]
=== Connector configuration example
Following is an example of the configuration for a Vitess connector that connects to a Vitess (VTGate's VStream) server on port 15991 at 192.168.99.100, whose logical name is `fullfillment`. It also connects to a VTCtld server on port 15999 at 192.168.99.101 to get the initial VGTID. Typically, you configure the {prodname} Vitess connector in a `.json` file using the configuration properties available for the connector.
You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.
[source,json]
----
{
"name": "inventory-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector", // <2>
"database.hostname": "192.168.99.100", // <3>
"database.port": "15991", // <4>
"database.user": "vitess", // <5>
"database.password": "vitess_password", // <6>
"vitess.keyspace": "commerce", // <7>
"vitess.tablet.type": "MASTER", // <8>
"vitess.vtctld.host": "192.168.99.101", // <9>
"vitess.vtctld.port": "15999", // <10>
"vitess.vtctld.user": "vitess", // <11>
"vitess.vtctld.password": "vitess_password", // <12>
"topic.prefix": "fullfillment", // <13>
"tasks.max": 1 // <14>
}
}
----
<1> The name of the connector when registered with a Kafka Connect service.
<2> The name of this Vitess connector class.
<3> The address of the Vitess (VTGate's VStream) server.
<4> The port number of the Vitess (VTGate's VStream) server.
<5> The username of the Vitess database server (VTGate gRPC).
<6> The password of the Vitess database server (VTGate gRPC).
<7> The name of the keyspce (a.k.a database). Because no shard is specified, it reads change events from all shards in the keyspace.
<8> The type of MySQL instance (MASTER OR REPLICA) to read change events from.
<9> The address of the VTCtld server.
<10> The port of the VTCtld server.
<11> The username of the VTCtld server (VTCtld gRPC).
<12> The password of the VTCtld database server (VTCtld gRPC).
<13> The topic prefix for the Vitess cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
<14> Only one task should operate at any one time.
See the xref:vitess-connector-properties[complete list of Vitess connector properties] that can be specified in these configurations.
You can send this configuration with a `POST` command to a running Kafka Connect service. The service records the configuration and starts the connector task that connects to the Vitess database and streams change event records to Kafka topics.
// Type: concept
// ModuleID:debezium-vitess-connector-configuration-example-offset-storage-per-task
// Title: {prodname} Vitess connector configuration example-offset-storage-per-task
[[vitess-example-configuration-offset-storage-per-task]]
=== Connector configuration example for offset-storage-per-task mode
When you have a big Vitess installation which requires more than one connector task to process the change logs, you can use offset-storage-per-task feature to launch multiple connector tasks and have each task work with a subset of vitess shards. Each task will persist its offsets (the vgtids for the shards it's tracking) in Kafka's offset topic in its own partition space.
Following is the same example for a Vitess connector that connects to a Vitess (VTGate's VStream) server but with three additional parameteres to invoke the offset-storage-per-task mode.
[source,json]
----
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector",
"database.hostname": "192.168.99.100",
"database.port": "15991",
"database.user": "vitess",
"database.password": "vitess_password",
"topic.prefix": "fullfillment",
"vitess.keyspace": "commerce",
"vitess.tablet.type": "MASTER",
"vitess.vtctld.host": "192.168.99.101",
"vitess.vtctld.port": "15999",
"vitess.vtctld.user": "vitess",
"vitess.vtctld.password": "vitess_password",
"vitess.offset.storage.per.task": true, // <1>
"vitess.offset.storage.task.key.gen": 1, // <2>
"vitess.prev.num.tasks": 1, // <3>
"tasks.max": 2 // <4>
}
}
----
<1> Specify that we want to turn on offset-storage-per-task feature
<2> Specify that the generation number for the current task parallelism is 1
<3> Specify that the number of tasks in the previous generation of task parallelism is 1
<4> Specify that we want to launch two tasks for the current task parallelism
The task to vitess shards distribution is based on a simple round robin algorithm. In this example of launching two connector tasks and assume we have 4 vitess shards (-40,40-80,80-c0,c0-), task0 will be working on shards (-40,80-c0) and task1 will be working on shards (40-80,c0-).
The reason that we need three config params is to make sure the offsets saved by each connector task don't collide with each other and to handle the migration of offsets by the previous task paralleism automatically. In order to make sure that we don't collide on the partition keys in Kafka offset topic, we are using this partition name scheme for each connector task: taskId_numTasks_gen. So for the current example of launching two tasks with generation number 1, task0 will be writing its offsets in Kafka's offset topic in partition key: task0_2_1 and task1 will be using partition key: (task1_2_1). The gen config param is used to distinguish the partition keys generated from different generations (generation corresponds to each change of task parallelism)
When the task paralleism changes (e.g. you want to launch 4 connector tasks instead of 2 to handle the bigger volume of traffic from vitess), you will specify tasks.max=4, vitess.offset.storage.task.key.gen=2, vitess.prev.num.tasks=2, the offset partition for this task paralleism generation will be: task0_4_2, task1_4_2, task2_4_2, task3_4_2. Once the connector restarts, the connector will detect there is no previous offsets saved for the current 4 partition keys and it will invoke an automatic offset migration from the offsets saved in the previous generation keys: task0_2_1 and task1_2_1. For the current example of 4 vitess shards (-40,40-80,80-c0,c0-), task0 will be working on shard:(-40), task1:(40-80), task2:(80-c0), task3:(c0-). The offsets for those 4 shards from the previous generation of parallelism (using 2 tasks with each task working with 2 shards) will be auto-migrated to this generation of using 4 tasks (one task working with one shard each).
Note that the task parallelism gen number is defaulted to be 0 for the offsets saved in Kafka offset topic before offset-storage-per-task feature is enabled, there is a special offset lookup during offset migration. So if you have the vitess connector running for a while without the offset-storage-per-task feature on and now you want to turn on this feature, please specify vitess.offset.storage.task.key.gen=1, vitess.prev.num.tasks=1 to help the offset auto migration.
Note that vitess.prev.num.tasks needs to match the actual number of tasks launched in the previous task parallelism generation. The number of connector tasks is usually the same as the tasks.max config params you specified, but in the rare case that tasks.max > number of vitess shards, the connector will only launch the_number_of_tasks = the_number_of_vitess_shards. This rare case is probably a mis-configuration in the first place.
See the xref:vitess-connector-properties[complete list of Vitess connector properties] that can be specified in these configurations.
You can send this configuration with a `POST` command to a running Kafka Connect service. The service records the configuration and starts the connector task that connects to the Vitess database and streams change event records to Kafka topics.
// Type: procedure
// ModuleID: adding-debezium-vitess-connector-configuration-to-kafka-connect
// Title: Adding {prodname} Vitess connector configuration to Kafka Connect
[[vitess-adding-connector-configuration]]
=== Adding connector configuration
To start running a Vitess connector, create a connector configuration and add the configuration to your Kafka Connect cluster.
.Prerequisites
* The VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed
* The VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed
* The Vitess connector is installed.
.Procedure
. Create a configuration for the Vitess connector.
. Use the link:{link-kafka-docs}/#connect_rest[Kafka Connect REST API] to add that connector configuration to your Kafka Connect cluster.
.Results
When the connector starts, it starts generating data change events for row-level operations and streaming change event records to Kafka topics.
// Type: assembly
// ModuleID: monitoring-debezium-vitess-connector-performance
// Title: Monitoring {prodname} Vitess connector performance
[[vitess-monitoring]]
=== Monitoring
The {prodname} Vitess connector provides only one type of metrics that are in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.
* xref:vitess-streaming-metrics[Streaming metrics] provide information about connector operation when the connector is capturing changes and streaming change event records.
xref:{link-debezium-monitoring}#monitoring-debezium[{prodname} monitoring documentation] provides details for how to expose these metrics by using JMX.
// Type: concept
// ModuleID: monitoring-debezium-vitess-connectors-customized-mbean-names
// Title: Customized names for Vitess connector snapshot and streaming MBean objects
=== Customized MBean names
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=mbeans-shared]
// Type: reference
// ModuleID: monitoring-debezium-vitess-connector-record-streaming
// Title: Monitoring {prodname} Vitess connector record streaming
[[vitess-streaming-metrics]]
==== Streaming metrics
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=common-streaming]
[cols="45%a,25%a,30%a",options="header"]
|===
|Attributes |Type |Description
|[[connectors-strm-metric-millisecondssincelastevent]]<<connectors-strm-metric-millisecondssincelastevent, `+MilliSecondsSinceLastEvent+`>>
|`long`
|The number of milliseconds since the connector has read and processed the most recent event.
|[[connectors-strm-metric-totalnumberofeventsseen]]<<connectors-strm-metric-totalnumberofeventsseen, `+TotalNumberOfEventsSeen+`>>
|`long`
|The total number of events that this connector has seen since last started or reset.
|[[connectors-strm-metric-numberofeventsfiltered]]<<connectors-strm-metric-numberofeventsfiltered, `+NumberOfEventsFiltered+`>>
|`long`
|The number of events that have been filtered by include/exclude list filtering rules configured on the connector.
|[[connectors-strm-metric-queuetotalcapacity]]<<connectors-strm-metric-queuetotalcapacity, `+QueueTotalCapacity+`>>
|`int`
|The length the queue used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-queueremainingcapacity]]<<connectors-strm-metric-queueremainingcapacity, `+QueueRemainingCapacity+`>>
|`int`
|The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-connected]]<<connectors-strm-metric-connected, `+Connected+`>>
|`boolean`
|Flag that denotes whether the connector is currently connected to the database server.
|[[connectors-strm-metric-millisecondsbehindsource]]<<connectors-strm-metric-millisecondsbehindsource, `+MilliSecondsBehindSource+`>>
|`long`
|The number of milliseconds between the last change event's timestamp and the connector processing it.
The values will incorporate any differences between the clocks on the machines where the database server and the connector are running.
|[[connectors-strm-metric-numberofcommittedtransactions]]<<connectors-strm-metric-numberofcommittedtransactions, `+NumberOfCommittedTransactions+`>>
|`long`
|The number of processed transactions that were committed.
|[[connectors-strm-metric-maxqueuesizeinbytes]]<<connectors-strm-metric-maxqueuesizeinbytes, `+MaxQueueSizeInBytes+`>>
|`long`
|The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-currentqueuesizeinbytes]]<<connectors-strm-metric-currentqueuesizeinbytes, `+CurrentQueueSizeInBytes+`>>
|`long`
|The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.
|===
// Type: reference
// ModuleID: descriptions-of-debezium-vitess-connector-configuration-properties
// Title: Description of {prodname} Vitess connector configuration properties
[[vitess-connector-properties]]
=== Connector configuration properties
The {prodname} Vitess connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:
* xref:vitess-required-configuration-properties[Required configuration properties]
* xref:vitess-advanced-configuration-properties[Advanced configuration properties]
* xref:vitess-pass-through-properties[Pass-through configuration properties]
[id="vitess-required-configuration-properties"]
The following configuration properties are _required_ unless a default value is available.
.Required connector configuration properties
[cols="30%a,25%a,45%a",options="header"]
|===
|Property
|Default
|Description
|[[vitess-property-name]]<<vitess-property-name, `+name+`>>
|No default
|Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.
|[[vitess-property-connector-class]]<<vitess-property-connector-class, `+connector.class+`>>
|No default
|The name of the Java class for the connector. Always use a value of `io.debezium.connector.vitess.VitessConnector` for the Vitess connector.
|[[vitess-property-tasks-max]]<<vitess-property-tasks-max, `+tasks.max+`>>
|`1`
|The maximum number of tasks that should be created for this connector. The Vitess connector can use more than 1 tasks if you enable offset.storage.per.task mode.
|[[vitess-property-database-hostname]]<<vitess-property-database-hostname, `+database.hostname+`>>
|No default
|IP address or hostname of the Vitess database server (VTGate).
|[[vitess-property-database-port]]<<vitess-property-database-port, `+database.port+`>>
|`15991`
|Integer port number of the Vitess database server (VTGate).
|[[vitess-property-keyspace]]<<vitess-property-keyspace, `+vitess.keyspace+`>>
|
|The name of the keyspace from which to stream the changes.
|[[vitess-property-shard]]<<vitess-property-shard, `+vitess.shard+`>>
|_n/a_
|An optional name of the shard from which to stream the changes. If not configured, in case of unsharded keyspace, the connector streams changes from the only shard, in case of sharded keyspace, the connector streams changes from all shards in the keyspace. We recommend not configuring it in order to stream from all shards in the keyspace because it has better support for reshard operation. If configured, for example, `-80`, the connector will stream changes from the `-80` shard.
|[[vitess-property-gtid]]<<vitess-property-gtid, `+vitess.gtid+`>>
|`current`
|An optional GTID position for a shard to stream from. This has to be set together with `vitess.shard`. If not configured, the connector streams changes from the latest position for the given shard.
|[[vitess-property-stop-on-reshard]]<<vitess-property-stop-on-reshard, `+vitess.stop_on_reshard+`>>
|`false`
|Controls Vitess flag stop_on_reshard. +
+
`true` - the stream will be stopped after a reshard operation. +
+
`false` - the stream will be automatically migrated for the new shards after a reshard operation. +
+
If set to `true`, you should also consider setting `vitess.gtid` in the configuration.
|[[vitess-property-database-user]]<<vitess-property-database-user, `+vitess.database.user+`>>
|_n/a_
|An optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
|[[vitess-property-database-password]]<<vitess-property-database-password, `+vitess.database.password+`>>
|_n/a_
|An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
|[[vitess-property-tablet-type]]<<vitess-property-tablet-type, `+vitess.tablet.type+`>>
|`MASTER`
|The type of Tablet (hence MySQL) from which to stream the changes: +
+
`MASTER` represents streaming from the master MySQL instance +
+
`REPLICA` represents streaming from the replica slave MySQL instance +
+
`RDONLY` represents streaming from the read-only slave MySQL instance.
|[[vitess-property-topic-prefix]]<<vitess-property-topic-prefix, `+topic.prefix+`>>
|No default
|Topic prefix that provides a namespace for the particular Vitess database server or cluster in which {prodname} is capturing changes.
Only alphanumeric characters, hyphens, dots and underscores must be used in the database server logical name.
The prefix should be unique across all other connectors, since it is used as a topic name prefix for all Kafka topics that receive records from this connector.
+
[WARNING]
====
Do not change the value of this property.
If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
====
|[[vitess-property-table-include-list]]<<vitess-property-table-include-list, `+table.include.list+`>>
|No default
|An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you want to capture. Any table not included in `table.include.list` does not have its changes captured. Each identifier is of the form _keyspace_._tableName_. By default, the connector captures changes in every non-system table in each schema whose changes are being captured. Do not also set the `table.exclude.list` property.
|[[vitess-property-table-exclude.list]]<<vitess-property-table-exclude.list, `+table.exclude.list+`>>
|No default
|An optional, comma-separated list of regular expressions that match fully-qualified table identifiers for tables whose changes you *do not* want to capture. Any table not included in `table.exclude.list` has it changes captured. Each identifier is of the form _keyspace_._tableName_. Do not also set the `table.include.list` property.
|[[vitess-property-column-include-list]]<<vitess-property-column-include-list, `+column.include.list+`>>
|No default
|An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be included in change event record values. Fully-qualified names for columns are of the form _keyspace_._tableName_._columnName_. Do not also set the `column.exclude.list` property.
|[[vitess-property-column-exclude-list]]<<vitess-property-column-exclude-list, `+column.exclude.list+`>>
|No default
|An optional, comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event record values. Fully-qualified names for columns are of the form _keyspace_._tableName_._columnName_. Do not also set the `column.include.list` property.
|[[vitess-property-column-truncate-to-length-chars]]<<vitess-property-column-truncate-to-length-chars, `column.truncate.to._length_.chars`>>
|_n/a_
|An optional, comma-separated list of regular expressions that match the fully-qualified names of character-based columns.
Set this property if you want to truncate the data in a set of columns when it exceeds the number of characters specified by the _length_ in the property name.
Set `length` to a positive integer value, for example, `column.truncate.to.20.chars`.
The fully-qualified name of a column observes the following format: _databaseName_._tableName_._columnName_.
To match the name of a column, {prodname} applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the column; the expression does not match substrings that might be present in a column name.
You can specify multiple properties with different lengths in a single configuration.
|[[vitess-property-tombstones-on-delete]]<<vitess-property-tombstones-on-delete, `+tombstones.on.delete+`>>
|`true`
|Controls whether a _delete_ event is followed by a tombstone event. +
+
`true` - a delete operation is represented by a _delete_ event and a subsequent tombstone event. +
+
`false` - only a _delete_ event is emitted. +
+
After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case {link-kafka-docs}/#compaction[log compaction] is enabled for the topic.
|[[vitess-property-offset-storage-per-task]]<<vitess-property-offset-storage-per-task, `+vitess.offset.storage.per.task+`>>
|`false`
|Specify whether to turn on offset-storage-per-task mode launch multiple connector tasks and persist offsets partitioned by task. +
+
`true` - turn on offset-storage-per-task mode. +
+
`false` - do not use offset-storage-per-task mode. +
+
You will also you also need to specify vitess.offset.storage.task.key.gen and vitess.prev.num.tasks params if you turn on offset-storage-per-task mode.
|[[vitess-property-offset-storage-task-key-gen]]<<vitess-property-offset-storage-task-key-gen, `+vitess.offset.storage.task.key.gen+`>>
|`-1`
|Specify the task paralleism generation number when vitess.offset.storage.per.task is turned on. You should increase the generation number when you decide to change the connector task parallelism (either launch more connector tasks or less)
|[[vitess-property-prev-num-tasks]]<<vitess-property-prev-num-tasks, `+vitess.prev.num.tasks+`>>
|`-1`
|Specify the number of connector tasks used in the previous generation of task paralleism when vitess.offset.storage.per.task is turned on.
|[[vitess-property-message-key-columns]]<<vitess-property-message-key-columns, `+message.key.columns+`>>
|_empty string_
|A semicolon separated list of tables with regular expressions that match table column names. The connector maps values in matching columns to key fields in change event records that it sends to Kafka topics. This is useful when a table does not have a primary key, or when you want to order change event records in a Kafka topic according to a field that is not a primary key. +
+
Separate entries with semicolons. Insert a colon between the fully-qualified table name and its regular expression. The format is: +
+
_keyspace-name_._table-name_:_regexp_;... +
+
For example, +
+
`keyspaceA.table_a:regex_1;keyspaceA.table_b:regex_2;keyspaceA.table_c:regex_3` +
+
If `table_a` has a an `id` column, and `regex_1` is `^i` (matches any column that starts with `i`), the connector maps the value in ``table_a``'s `id` column to a key field in change events that the connector sends to Kafka.
|[[vitess-property-schema-name-adjustment-mode]]<<vitess-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|none
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
|[[vitess-property-field-name-adjustment-mode]]<<vitess-property-field-name-adjustment-mode,`+field.name.adjustment.mode+`>>
|none
|Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
See xref:{link-avro-serialization}#avro-naming[Avro naming] for more details.
|[[vitess-property-snapshot-mode]]<<vitess-property-snapshot-mode,`+snapshot.mode+`>>
|initial
|Specifies the criteria for performing a snapshot when the connector starts.
Set the property to one of the following values:
`initial`::
When the connector starts, if it does not detect a value in its offsets topic, it performs a snapshot of the database.
`never`::
When the connector starts, it skips the snapshot process and immediately begins to stream change events for operations that the database records to the binary logs.
|[[vitess-property-time-precision-mode]]<<vitess-property-time-precision-mode, `+time.precision.mode+`>>
|`adaptive_time_microseconds`
|You can set the following options to determine how {prodname} represents the precision of time, date, and timestamp values: +
+
`adaptive_time_microseconds`::
(Default) Captures date, datetime, and timestamp values exactly as they exist in the database.
Values are represented with a precision in milliseconds, microseconds, or nanoseconds, depending on the database column type, with the exception of `TIME` type fields, which are always captured as microseconds.
`connect`::
Time and timestamp values are always represented in the default Kafka Connect formats for Time, Date, and Timestamp, which use millisecond precision regardless of the database columns' precision.
|[[vitess-property-bigint-unsigned-handling-mode]]<<vitess-property-bigint-unsigned-handling-mode,`+bigint.unsigned.handling.mode.mode+`>>
|string
|Specify how BIGINT UNSIGNED columns should be represented in change events. +
Set the property to one of the following values: +
+
`string`:: +
represent values using Java's `string` +
+
`long`:: +
represents values using Java's `long`, which may not offer the precision but will be far easier to use in consumers. +
+
`precise`:: +
represents values as precise (Java's 'BigDecimal') values. This is precise but difficult to use.
|===
[id="vitess-advanced-configuration-properties"]
The following _advanced_ configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector's configuration.
.Advanced connector configuration properties
[cols="30%a,28%a,42%a",options="header"]
|===
|Property
|Default
|Description
|[[vitess-property-converters]]<<vitess-property-converters, `converters`>>
|No default
|Enumerates a comma-separated list of the symbolic names of the {link-prefix}:{link-custom-converters}#custom-converters[custom converter] instances that the connector can use.
For example, +
`isbn`
You must set the `converters` property to enable the connector to use a custom converter.
For each converter that you configure for a connector, you must also add a `.type` property, which specifies the fully-qualified name of the class that implements the converter interface.
The `.type` property uses the following format: +
`_<converterSymbolicName>_.type` +
For example, +
isbn.type: io.debezium.test.IsbnConverter
If you want to further control the behavior of a configured converter, you can add one or more configuration parameters to pass values to the converter.
To associate any additional configuration parameter with a converter, prefix the parameter names with the symbolic name of the converter.
For example, +
isbn.schema.name: io.debezium.vitess.type.Isbn
|[[vitess-property-event-processing-failure-handling-mode]]<<vitess-property-event-processing-failure-handling-mode, `+event.processing.failure.handling.mode+`>>
|`fail`
| Specifies how the connector should react to exceptions during processing of events: +
+
`fail` propagates the exception, indicates the offset of the problematic event, and causes the connector to stop. +
+
`warn` logs the offset of the problematic event, skips that event, and continues processing. +
+
`skip` skips the problematic event and continues processing.
|[[vitess-property-max-queue-size]]<<vitess-property-max-queue-size, `+max.queue.size+`>>
|`20240`
|Positive integer value that specifies the maximum number of records that the blocking queue can hold.
When {prodname} reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka.
The blocking queue can provide backpressure for reading change events from the database
in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable.
Events that are held in the queue are disregarded when the connector periodically records offsets.
Always set the value of `max.queue.size` to be larger than the value of xref:{context}-property-max-batch-size[`max.batch.size`].
|[[vitess-property-max-batch-size]]<<vitess-property-max-batch-size, `+max.batch.size+`>>
|`2048`
|Positive integer value that specifies the maximum size of each batch of events that the connector processes.
|[[vitess-property-max-queue-size-in-bytes]]<<vitess-property-max-queue-size-in-bytes, `+max.queue.size.in.bytes+`>>
|`0`
|A long integer value that specifies the maximum volume of the blocking queue in bytes.
By default, volume limits are not specified for the blocking queue.
To specify the number of bytes that the queue can consume, set this property to a positive long value. +
If xref:vitess-property-max-queue-size[`max.queue.size`] is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property.
For example, if you set `max.queue.size=1000`, and `max.queue.size.in.bytes=5000`, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.
|[[vitess-property-poll-interval-ms]]<<vitess-property-poll-interval-ms, `+poll.interval.ms+`>>
|`500`
|Positive integer value that specifies the number of milliseconds the connector should wait for new change events to appear before it starts processing a batch of events. Defaults to 500 milliseconds, or 0.5 second.
|[[vitess-property-skipped-operations]]<<vitess-property-skipped-operations, `+skipped.operations+`>>
|`t`
|A comma-separated list of operation types that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations.
By default, truncate operations are skipped (not emitted by this connector).
|[[vitess-property-provide-transaction-metadata]]<<vitess-property-provide-transaction-metadata, `provide.transaction.metadata`>>
|`false`
|Determines whether the connector generates events with transaction boundaries and enriches change event envelopes with transaction metadata. Specify `true` if you want the connector to do this. See xref:vitess-transaction-metadata[Transaction metadata] for details.
|[[vitess-property-transaction-metadata-factory]]<<vitess-property-transaction-metadata-factory, `transaction.metadata.factory`>>
|`io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory`
|Determines the class that the connector uses to track transaction context and build the data structures and schemas to represent transactions.
`io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory` provides additional transaction metadata that can help consumers to interpret the correct order of two events, regardless of the order in which they are consumed.
For more information, see xref:vitess-ordered-transaction-metadata[Ordered transaction metadata].
|[[vitess-property-keepalive-interval-ms]]<<vitess-property-keepalive-interval-ms, `+vitess.keepalive.interval.ms+`>>
|`Long.MAX_VALUE`
|Control the interval between periodic gPRC keepalive pings for VStream. Defaults to `Long.MAX_VALUE` (disabled).
|[[vitess-property-grpc-headers]]<<vitess-property-grpc-headers, `+vitess.grpc.headers+`>>
|No default
|Specify a comma-separated list of gRPC headers. Defaults to empty. The format is: +
+
_key1:value1,key2:value2_,... +
+
For example, +
+
`x-envoy-upstream-rq-timeout-ms:0,x-envoy-max-retries:2`
|[[vitess-property-grpc-max-inbound-message-size]]<<vitess-property-grpc-max-inbound-message-size, `+vitess.grpc.max_inbound_message_size+`>>
|No default
|Specify the maximum message size in bytes allowed to be received on the channel. +
+
Default is 4MiB
|[[vitess-property-column-propagate-source-type]]<<vitess-property-column-propagate-source-type, `+column.propagate.source.type+`>>
|_n/a_
a|An optional, comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change event records. These schema parameters:
`pass:[_]pass:[_]debezium.source.column.type`
are used to propagate the original type name and length for variable-width types, respectively. This is useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are of the following form:
_keyspaceName_._tableName_._columnName_
|[[vitess-property-datatype-propagate-source-type]]<<vitess-property-datatype-propagate-source-type, `+datatype.propagate.source.type+`>>
|_n/a_
a|An optional, comma-separated list of regular expressions that match the database-specific data type name of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change event records. These schema parameters:
`pass:[_]pass:[_]debezium.source.column.type`
are used to propagate the original type name and length for variable-width types, respectively. This is useful to properly size corresponding columns in sink databases. Fully-qualified names for columns are of the following form:
_keyspaceName_._tableName_._columnName_
See xref:vitess-data-types[how Vitess connectors map data types] for the list of Vitess-specific data type names.
|[[vitess-property-topic-naming-strategy]]<<vitess-property-topic-naming-strategy, `topic.naming.strategy`>>
|`io.debezium.schema.SchemaTopicNamingStrategy`
|The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to `SchemaTopicNamingStrategy`.
|[[vitess-property-topic-delimiter]]<<vitess-property-topic-delimiter, `topic.delimiter`>>
|`.`
|Specify the delimiter for topic name, defaults to `.`.
|[[vitess-property-topic-cache-size]]<<vitess-property-topic-cache-size, `topic.cache.size`>>
|`10000`
|The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
|[[vitess-property-topic-transaction]]<<vitess-property-topic-transaction, `topic.transaction`>>
|`transaction`
|Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern: +
+
_topic.prefix_._topic.transaction_ +
+
For example, if the topic prefix is `fulfillment`, the default topic name is `fulfillment.transaction`.
|[[vitess-property-custom-metric-tags]]<<vitess-property-custom-metric-tags, `custom.metric.tags`>>
|`No default`
|The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended the end of regular name, each key would represent a tag for the MBean object name, and the corresponding value would be the value of that tag the key is. For example: `k1=v1,k2=v2`.
|[[vitess-property-errors-max-retires]]<<vitess-property-errors-max-retires, `errors.max.retries`>>
|`-1`
|Specifies how the connector responds after an operation that results in a retriable error, such as a connection error. +
Set one of the following options:
`-1`:: No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures.
`0`:: Disabled. The connector fails immediately, and never retries the operation.
User intervention is required to restart the connector.
`> 0`:: The connector restarts automatically until it reaches the specified maximum number of retries.
After the next failure, the connector stops, and user intervention is required to restart it.
|===
[id="vitess-pass-through-properties"]
.Pass-through connector configuration properties
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer.
Be sure to consult the {link-kafka-docs}.html[Kafka documentation] for all of the configuration properties for Kafka producers and consumers. The Vitess connector does use the {link-kafka-docs}.html#consumerconfigs[new consumer configuration properties].
// Type: assembly
// ModuleID: how-debezium-vitess-connectors-handle-faults-and-problems
// Title: How {prodname} Vitess connectors handle faults and problems
[[vitess-when-things-go-wrong]]
== Behavior when things go wrong
{prodname} is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then {prodname} provides _exactly once_ delivery of every change event record.
If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, {prodname}, like Kafka, provides _at least once_ delivery of change events.
The rest of this section describes how {prodname} handles various kinds of faults and problems.
[id="vitess-connector-configuration-and-startup-errors"]
=== Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:
* The connector's configuration is invalid.
* The connector cannot successfully connect to Vitess by using the specified connection parameters.
In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the Vitess problem, restart the connector.
[id="vitess-becomes-unavailable"]
=== Vitess becomes unavailable
When the connector is running, the Vitses server (VTGate) that it is connected to could become unavailable for any number of reasons. If this happens, the connector fails with an error and stops. When the server is available again, restart the connector.
The Vitess connector externally stores the last processed offset in the form of a Vitess VGTID. After a connector restarts and connects to a server instance, the connector communicates with the server to continue streaming from that particular offset.
[id="invalid-column-name-error"]
=== Invalid column name error
This error happens very rarely. If you receive an error with the message `Illegal prefix '@' for column: x, from schema: y, table: z`, and your table doesn't have such a column, it is a Vitess vstream link:https://vitess.slack.com/archives/C0PQY0PTK/p1606817216038500[bug] that is caused by column renaming or column type change. It is a transient error. You can restart the connector after a small backoff and it should resolve automatically.
[id="vitess-kafka-connect-process-stops-gracefully"]
=== Kafka Connect process stops gracefully
Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process's connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
[id="vitess-kafka-connect-process-crashes"]
=== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, Vitess connectors resume from the last offset that was _recorded_ by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events. {prodname} changes are idempotent, so a sequence of events always results in the same state.
In each change event record, {prodname} connectors insert source-specific information about the origin of the event, including the Vitess server's time of the event, the position in the binlog where the transaction changes were written. Consumers can keep track of this information, especially the VGTID, to determine whether an event is a duplicate.
[id="vitess-kafka-becomes-unavailable"]
=== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.
[id="vitess-connector-is-stopped-for-a-duration"]
=== Connector is stopped for a duration
If the connector is gracefully stopped, the database can continue to be used. Any changes are recorded in the Vitess binlog. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a {prodname} connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in Vitess.
[id="vitess-connector-failed-before-finishing-the-snapshot"]
=== Connector fails before finishing the snapshot
If a snapshot fails to complete, the connector does not automatically reattempt the snapshot.
If the connector is restarted with the previous offset, the connector skips the snapshot process and immediately begins to stream change events.
Consequently, to recover from the failure, link:https://debezium.io/documentation/faq/#how_to_remove_committed_offsets_for_a_connector/[remove the connector offsets manually] and start the connector.
[id="limitations-with-earlier-vitess-versions"]
=== Limitations with earlier Vitess versions
.Vitess 8.0.0
* Due to a minor Vitess padding issue (which is fixed in Vitess 9.0.0), decimal values with a precision that is greater than or equal to 13 will cause extra whitespaces in front of the number. E.g. if the column type is `decimal(13,4)` in the table definition, the value `-1.2300` becomes `"- 1.2300"`, and the value `1.2300` becomes `" 1.2300"`.
* Does not support the `JSON` column type.
* VStream 8.0.0 doesn't provide additional metadata of permitted values for `ENUM` columns.
Therefore, the Connector does not support the `ENUM` column type.
The index number (1-based) will be emitted instead of the enumeration value.
E.g. `"3"` will be emitted as the value instead of `"L"` if the `ENUM` definition is `enum('S','M','L')`.