tet123/documentation/modules/ROOT/pages/connectors/spanner.adoc

1204 lines
57 KiB
Plaintext
Raw Normal View History

// Category: debezium-using
// Type: assembly
[id="debezium-connector-for-spanner"]
= {prodname} connector for Cloud Spanner
:context: Cloud Spanner
:mbean-name: {context}
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
{prodname}'s Cloud Spanner connector consumes and outputs Cloud Spanner change streams data into a Kafka topic.
A Cloud Spanner link:https://cloud.google.com/spanner/docs/change-streams[change stream] watches and streams out a Cloud Spanner database's data changes - inserts, updates, deletes - in near-real-time. The Cloud Spanner connector abstracts away the details of querying the Cloud Spanner change streams. With this connector, you don't have to manage the change streams partition lifecycle, which is necessary when you link:https://cloud.devsite.corp.google.com/spanner/docs/change-streams/details#query[use the Spanner API directly].
The connector does not support the snapshot feature at the moment. The first time it connects to a Cloud Spanner database, it will start streaming changes from either the user-provided start timestamp or the current time, if the user did not provide a start timestamp.
// Type: concept
// Title: Overview of {prodname} Cloud Spanner connector
// ModuleID: overview-of-debezium-spanner-connector
[[spanner-overview]]
== Overview
To read and process database changes, the Debezium connector queries from a change stream.
The connector produces a change event for every row-level insert, update, and delete operation that was captured and sends change event records for each table in a separate Kafka topic. Client applications read the Kafka topics that correspond to the database tables of interest, and can react to every row-level event they receive from those topics.
The connector is tolerant of failures. As the connector reads changes and produces events, it records the last commit timestamp processed for each link:https://cloud.google.com/spanner/docs/change-streams/details[change stream partition]. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart the connector continues streaming records where it last left off.
// Type: assembly
// ModuleID: how-debezium-spanner-connectors-work
// Title: How {prodname} Cloud Spanner connectors work
[[how-the-spanner-connector-works]]
== How the connector works
To optimally configure and run a {prodname} Spanner connector, it is helpful to understand how the connector streams change events, determines Kafka topic names, and uses metadata.
// Type: concept
// ModuleID: how-debezium-spanner-connectors-stream-change-event-records
// Title: How {prodname} Cloud Spanner connectors stream change event records
[[spanner-streaming-changes]]
=== Streaming changes
The {prodname} Spanner connector spends all its time streaming changes from the change stream to which it is subscribed. When a change occurs on a table, Spanner writes a corresponding change stream record in the database, synchronously in the same transaction as the data change. To enable change streams writes and reads to scale, Spanner splits and merges the internal change stream storage along with the database data. To support reading change stream records in near real-time as database writes scale, the Spanner API is designed for a change steram to be queried concurrently using the change stream partitions. See about Cloud Spanner change stream's link:https://cloud.google.com/spanner/docs/change-streams/details[partitioning model].
The connector offers an abstraction over the Spanner API for querying change streams. With this connector, you don't have to manage the change stream partition lifecycle. The connector provides you with a stream of data change records so that you are free to focus more on application logic and less on specific API details and dynamic change stream partitioning.
When subscribing to a change stream, the connector needs to provide the the project ID, Spanner instance ID, Spanner database ID, as well as the change stream name. Optionally, the user can also provide a start timestamp and an end timestamp. See the xref:spanner-connector-properties[this section] for a more detailed list of the connector configuration properties.
When the connector receives changes it transforms the events into {prodname} _create_, _update_, or _delete_ events. The connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
Periodically, Kafka Connect records the most recent _offset_ in another Kafka topic. The offset indicates source-specific position information that {prodname} includes with each event. For the Cloud Spanner connector, the last processed commit timestamps for the change stream partitions is the offset.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset.
The Spanner connector also records metadata information in the following metadata topics during streaming. It is not advisable to modify the content or the configuration of these topics:
* `_consumer_offsets`: A topic automatically created by Kafka. Stores consumer offsets for consumers created in the Kafka connector
* `_kafka-connect-offsets`: A topic automatically created by Kafka Connect. Stores the connector offsets.
* `_sync_topic_spanner_connector_connectorname`: A topic automatically created by the connector. Stores metadata regarding change stream partitions.
* `_rebalancing_topic_spanner_connector_connectorname`: A topic automatically created by the connector. Used to determine connector task aliveness.
* `_debezium-heartbeat.connectorname`: A topic used to process Spanner change stream heartbeats.
// Type: concept
// ModuleID: default-names-of-kafka-topics-that-receive-debezium-spanner-change-event-records
// Title: Default names of Kafka topics that receive {prodname} Cloud Spanner change event records
[[spanner-topic-names]]
=== Topic names
The Spanner connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is _topicPrefix_._connectorName_._tableName_ where:
* _topicPrefix_ is the topic prefix as specified by the `topic.prefix` connector configuration property.
* _connectorName_ is the user-specified name of the connector.
* _tableName_ is the name of the database table in which the operation occurred.
For example, suppose that `spanner` is the logical name in the configuration for a connector that is capturing changes from a Cloud Spanner change stream that tracks changes in a database that contains four tables: `table1`, `table2`, `table3`, and `table4`. The connector would stream records to these four Kafka topics:
* `spanner.table1`
* `spanner.table2`
* `spanner.table3`
* `spanner.table4`
// Type: assembly
// ModuleID: descriptions-of-debezium-spanner-connector-data-change-events
// Title: Descriptions of {prodname} Spanner connector data change events
[[spanner-events]]
== Data change events
The {prodname} Spanner connector generates a data change event for each row-level `INSERT`, `UPDATE`, and `DELETE` operation. Each event contains a key and a value. The structure of the key and the value depends on the table that was changed.
{prodname} and Kafka Connect are designed around _continuous streams of event messages_. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained. The schema for the key will never change. Please note that the schema for the value is an amalgamation of all the columns in the table that the change stream has tracked since the start time of the connector.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A `schema` field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:
[source,json,index=0]
----
{
"schema": { // <1>
...
},
"payload": { // <2>
...
},
"schema": { // <3>
...
},
"payload": { // <4>
...
},
}
----
.Overview of change event basic content
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The first `schema` field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key's `payload` portion. In other words, the first `schema` field describes the structure of the primary key.
|2
|`payload`
|The first `payload` field is part of the event key. It has the structure described by the previous `schema` field and it contains the key for the row that was changed.
|3
|`schema`
|The second `schema` field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value's `payload` portion. In other words, the second `schema` describes the structure of the row that was changed. Typically, this schema contains nested schemas.
|4
|`payload`
|The second `payload` field is part of the event value. It has the structure described by the previous `schema` field and it contains the actual data for the row that was changed.
|===
The default behavior is that the connector streams change event records to xref:spanner-topic-names[topics with names that are the same as the event's originating table].
[NOTE]
====
Starting with Kafka 0.10, Kafka can optionally record the event key and value with the {link-kafka-docs}.html#upgrade_10_performance_impact[_timestamp_] at which the message was created (recorded by the producer) or written to the log by Kafka.
====
// Type: concept
// ModuleID: about-keys-in-debezium-spanner-change-events
// Title: About keys in {prodname} Spanner change events
[[spanner-change-events-key]]
=== Change event keys
For a given table, the change event's key has a structure that contains a field for each column in the primary key of the table at the time the event was created. All key columns will be marked as non-optional.
Consider a `users` table defined in the `business` database and the example of a change event key for that table.
.Example table
[source,sql,indent=0]
----
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
----
.Example change event key
Every change event for the `users` table while it has this definition has the same key structure, which in JSON looks like this:
[source,json,indent=0]
----
{
"schema": { // <1>
"type": "struct",
"name": "Users.Key", // <2>
"optional": false, // <3>
"fields": [ // <4>
{
"type": "int64",
"optional": "false",
"field": "false"
}
]
},
"payload": { // <5>
"id": "1"
},
}
----
.Description of change event key
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The schema portion of the key specifies a Kafka Connect schema that describes what is in the key's `payload` portion.
|2
|`Users.Key`
|Name of the schema that defines the structure of the key's payload. This schema describes the structure of the primary key for the table that was changed.
|3
|`optional`
|Indicates whether the event key must contain a value in its `payload` field. Primary key columns are always required.
|4
|`fields`
|Specifies each field that is expected in the `payload`, including each field's name, type, and whether it is optional.
|5
|`payload`
|Contains the key for the row for which this change event was generated. In this example, the key, contains a single `id` field whose value is `1`.
|===
// Type: concept
// ModuleID: about-values-in-debezium-spanner-change-events
// Title: About values in {prodname} Spanner change events
[[spanner-change-events-value]]
=== Change event values
Consider the same sample table that was used to show an example of a change event key:
[source,sql,indent=0]
----
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
----
// Type: continue
[[spanner-create-events]]
=== _create_ events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the `Users` table. Note that even if a column is marked as non-optional in Spanner, it will be marked as optional in the schema. Only primary key columns are marked as non-optional.
[source,json,options="nowrap",indent=0,subs="+attributes"]
----
{
"schema": { // <1>
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
}
],
"optional": true,
"name": "Users.Value", // <2>
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "project_id"
},
{
"type": "string",
"optional": false,
"field": "instance_id"
},
{
"type": "string",
"optional": false,
"field": "database_id"
},
{
"type": "string",
"optional": false,
"field": "change_stream_name"
},
{
"type": "string",
"optional": true,
"field": "table"
}
{
"type": "string",
"optional": true,
"field": "server_transaction_id"
}
{
"type": "int64",
"optional": true,
"field": "low_watermark"
}
{
"type": "int64",
"optional": true,
"field": "read_at_timestamp"
}
{
"type": "int64",
"optional": true,
"field": "number_of_records_in_transaction"
}
{
"type": "string",
"optional": true,
"field": "transaction_tag"
}
{
"type": "boolean",
"optional": true,
"field": "system_transaction"
}
{
"type": "string",
"optional": true,
"field": "value_capture_type"
}
{
"type": "string",
"optional": true,
"field": "partition_token"
}
{
"type": "int32",
"optional": true,
"field": "mod_number"
}
{
"type": "boolean",
"optional": true,
"field": "is_last_record_in_transaction_in_partition"
}
{
"type": "int64",
"optional": true,
"field": "number_of_partitions_in_transaction"
}
],
"optional": false,
"name": "io.debezium.connector.spanner.Source", // <3>
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "connector_name.Users.Envelope" // <4>
},
"payload": { // <5>
"before": null, // <6>
"after": { // <7>
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { // <8>
"version": "{debezium-version}",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "c", // <9>
"ts_ms": 1559033904863 // <10>
}
}
----
.Descriptions of _create_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The value's schema, which describes the structure of the value's payload. A change event's value schema is the same in every change event that the connector generates for a particular table.
|2
|`name`
a|In the `schema` section, each `name` field specifies the schema for a field in the value's payload.
|3
|`name`
a|`io.debezium.connector.spanner.Source` is the schema for the payload's `source` field. This schema is specific to the Spanner connector. The connector uses it for all events that it generates.
|4
|`name`
a|`connector_name.Users.Envelope` is the schema for the overall structure of the payload, where `connector_name` is the connector name, and `customers` is the table.
|5
|`payload`
|The value's actual data. This is the information that the change event is providing.
|6
|`before`
a|An optional field that specifies the state of the row before the event occurred. When the `op` field is `c` for create, as it is in this example, the `before` field is `null` since this change event is for new content.
|7
|`after`
|An optional field that specifies the state of the row after the event occurred. In this example, the `after` field contains the values of the new row's `id`, `first_name`, `last_name`, and `email` columns.
|8
|`source`
a|Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
* {prodname} version
* Connector type and name
* Database and table that contains the new row
* If the event was part of a snapshot
* The record sequence number for the data change event in the transaction
* The project ID
* The instance ID
* The database ID
* The change stream name
* The transaction ID
* The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector
* The commit timestamp for when the change was made in the database
* The timestamp at which the connnector processed the change
* The number of records in the originatig transaction
* The transaction tag
* Whether or not the transaction was a system transaction
* The value capture type
* The originating partition token used to query this change event
* The mod number in the original data change event received from Spanner
* Whether or not the data change event was the last record in the transaction in the partition
* The total number of change stream partitions in the transaction
|9
|`op`
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `c` indicates that the operation created a row. Valid values are:
* `c` = create
* `u` = update
* `d` = delete
|10
|`ts_ms`
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|===
// Type: continue
[[spanner-update-events]]
=== _update_ events
The value of a change event for an update in the sample `Users` table has the same schema as a _create_ event for that table. Likewise, the event value's payload has the same structure. However, the event value payload contains different values in an _update_ event. Here is an example of a change event value in an event that the connector generates for an update in the `Users` table:
[source,json,indent=0,options="nowrap",subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"before": { // <1>
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { // <2>
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { // <3>
"version": "{debezium-version}",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "u", // <4>
"ts_ms": 1465584025523 // <5>
}
}
----
.Descriptions of _update_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`before`
|An optional field that contains all values of all columns that were in the row before the database commit.
|2
|`after`
|An optional field that specifies the state of the row after the event occurred. In this example, the `first_name` value is now `Anne Marie`.
|3
|`source`
a|Mandatory field that describes the source metadata for the event. The `source` field structure has the same fields as in a _create_ event, but some values are different. The source metadata includes:
* {prodname} version
* Connector type and name
* Database (a.k.a keyspace) and table that contains the new row
* If the event was part of a snapshot
* The record sequence number for the data change event in the transaction
* The project ID
* The instance ID
* The database ID
* The change stream name
* The transaction ID
* The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector
* The commit timestamp for when the change was made in the database
* The timestamp at which the connnector processed the change
* The number of records in the originatig transaction
* The transaction tag
* Whether or not the transaction was a system transaction
* The value capture type
* The originating partition token used to query this change event
* The mod number in the original data change event received from Spanner
* Whether or not the data change event was the last record in the transaction in the partition
* The total number of change stream partitions in the transaction
|4
|`op`
a|Mandatory string that describes the type of operation. In an _update_ event value, the `op` field value is `u`, signifying that this row changed because of an update.
|5
|`ts_ms`
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|===
[[spanner-delete-events]]
=== _delete_ events
The value in a _delete_ change event has the same `schema` portion as _create_ and _update_ events for the same table. The `payload` portion in a _delete_ event for the sample `Users` table looks like this:
[source,json,indent=0,subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"before": { // <1>
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, // <2>
"source": { // <3>
"version": "{debezium-version}",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "d", // <4>
"ts_ms": 1465581902461 // <5>
}
}
----
.Descriptions of _delete_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`before`
|Optional field that specifies the state of the row before the event occurred. In a _delete_ event value, the `before` field contains the values that were in the row before it was deleted with the database commit.
|2
|`after`
|Optional field that specifies the state of the row after the event occurred. In a _delete_ event value, the `after` field is `null`, signifying that the row no longer exists.
|3
|`source`
a|Mandatory field that describes the source metadata for the event. In a _delete_ event value, the `source` field structure is the same as for _create_ and _update_ events for the same table. Many `source` field values are also the same. In a _delete_ event value, the `ts_ms` and `lsn` field values, as well as other values, might have changed. But the `source` field in a _delete_ event value provides the same metadata:
* {prodname} version
* Connector type and name
* Database (a.k.a keyspace) and table that contains the new row
* If the event was part of a snapshot
* The record sequence number for the data change event in the transaction
* The project ID
* The instance ID
* The database ID
* The change stream name
* The transaction ID
* The low watermark, which represents the timestamp at which all records with commit timestamp less than the low watermark timestamp have already been streamed out by the connector
* The commit timestamp for when the change was made in the database
* The timestamp at which the connnector processed the change
* The number of records in the originatig transaction
* The transaction tag
* Whether or not the transaction was a system transaction
* The value capture type
* The originating partition token used to query this change event
* The mod number in the original data change event received from Spanner
* Whether or not the data change event was the last record in the transaction in the partition
* The total number of change stream partitions in the transaction
|4
|`op`
a|Mandatory string that describes the type of operation. The `op` field value is `d`, signifying that this row was deleted.
|5
|`ts_ms`
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|===
A _delete_ change event record provides a consumer with the information it needs to process the removal of this row.
Spanner connector events are designed to work with link:{link-kafka-docs}#compaction[Kafka log compaction]. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state. Note that if the low watermark is enabled, compaction should not be enabled.
// Type: continue
[[spanner-tombstone-events]]
.Tombstone events
When a row is deleted, the _delete_ event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must be `null`. To make this possible, the connector follows a _delete_ event with a special _tombstone_ event that has the same key but a `null` value.
// Type: reference
// ModuleID: how-debezium-spanner-connectors-map-data-types
// Title: How {prodname} Spanner connectors map data types
[[spanner-data-types]]
== Data type mappings
The Cloud Spanner connector represents changes to rows with events that are structured like the table in which the row exists. The event contains a field for each column value. How that value is represented in the event depends on the Spanner data type of the column. This section describes these mappings.
[id="spanner-types"]
=== Spanner types
.Mappings for Spanner data types
[cols="25%a,20%a",options="header"]
|===
|Spanner data type
|Literal type (schema type)
|`BOOLEAN`
|`BOOLEAN`
|`INT64`
|`INT64`
|`ARRAY`
|`ARRAY`
|`BYTES`
|`BYTES`
|`STRING`
|`STRING`
|`FLOAT`
|`DOUBLE`
|`NUMERIC`
|`STRING`
|`TIMESTAMP`
|`STRING`
|`NUMERIC`
|`STRING`
|===
// Type: assembly
// ModuleID: running-debezium-spanner-connector
// Title: Running a {prodname} Cloud Spanner connector
[[setting-up-spanner]]
== Setting up Spanner
.Checklist
* Make sure to provide a project ID, Spanner instance ID, Spanner database ID, and change stream name. See link:https://cloud.google.com/spanner/docs/change-streams/manage[documentation] on how to create change streams.
* Make sure to create and configure a GCP service account with the proper credentials. The service account key will need to be provided in the connector configuration. See more link:https://cloud.google.com/iam/docs/service-accounts[information] about service accounts here.
See the following section on how to configure a {prodname} Spanner connector.
// Type: assembly
// ModuleID: deploying-and-managing-debezium-spanner-connectors
// Title: Deploying and managing {prodname} Spanner connectors
[[spanner-deploying-a-connector]]
== Deployment
With link:https://zookeeper.apache.org[Zookeeper], link:http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect] installed, the remaining tasks to deploy a {prodname} Cloud Spanner connector are to download the link:https://repo1.maven.org/maven2/io/debezium/debezium-connector-spanner/{debezium-version}/debezium-connector-spanner-{debezium-version}-plugin.tar.gz[connector's plug-in archive], extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`]. You then need to restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see link:https://quay.io/organization/debezium[{prodname}'s Container images] for Zookeeper, Kafka and Kafka Connect with the Cloud Spanner connector already installed and ready to run. You can also xref:operations/openshift.adoc[run {prodname} on Kubernetes and OpenShift].
// Type: concept
// ModuleID:debezium-spanner-connector-configuration-example
// Title: {prodname} Cloud Spanner connector configuration example
[[spanner-example-configuration]]
=== Connector configuration example
Following is an example of the configuration for a Spanner connector that connects to a change stream called `changeStreamAll` in the database `Database` in instance `Instance` and project `Project`.
You can choose to produce events for a subset of the schemas and tables. Optionally, ignore, mask, or truncate columns that are sensitive, too large, or not needed.
[source,json]
----
{
"name": "spanner-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.spanner.SpannerConnector", <2>
"gcp.spanner.change.stream": "changeStreamAll", <3>
"gcp.spanner.project.id": "Project", <4>
"gcp.spanner.instance.id": "Instance", <5>
"gcp.spanner.database.id": "Database", <6>
"gcp.spanner.credentials.json": <key.json>, <7>
"tasks.max": 1 // <8>
}
}
----
<1> The name of the connector when registered with a Kafka Connect service.
<2> The name of this Cloud Spanner connector class.
<3> The change stream name.
<4> The GCP Project ID.
<5> The Cloud Spanner Instance ID.
<6> The Cloud Spanner Database ID.
<7> The GCP service account key JSON.
<8> The maximum number of tasks.
See the xref:spanner-connector-properties[complete list of Spanner connector properties] that can be specified in these configurations.
// Type: procedure
// ModuleID: adding-debezium-spanner-connector-configuration-to-kafka-connect
// Title: Adding {prodname} Cloud Spanner connector configuration to Kafka Connect
[[spanner-adding-connector-configuration]]
=== Adding connector configuration
To start running a Spanner connector, create a connector configuration and add the configuration to your Kafka Connect cluster.
.Prerequisites
* The Cloud Spanner change stream is created and available.
* The Cloud Spanner connector is installed.
.Procedure
. Create a configuration for the Cloud Spanner connector.
. Use the link:{link-kafka-docs}/#connect_rest[Kafka Connect REST API] to add that connector configuration to your Kafka Connect cluster.
.Results
When the connector starts, it starts generating data change events for row-level operations and streaming change event records to Kafka topics.
// Type: assembly
// ModuleID: monitoring-debezium-spanner-connector-performance
// Title: Monitoring {prodname} Cloud Spanner connector performance
[[spanner-monitoring]]
=== Monitoring
The {prodname} Spanner connector provides only one type of metric in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect provide.
* xref:spanner-streaming-metrics[Streaming metrics] provide information about connector operations when the connector is capturing changes and streaming change event records.
xref:{link-debezium-monitoring}#monitoring-debezium[{prodname} monitoring documentation] provides details for how to expose these metrics by using JMX.
// Type: reference
// ModuleID: monitoring-debezium-spanner-connector-record-streaming
// Title: Monitoring {prodname} Cloud Spanner connector record streaming
[[spanner-streaming-metrics]]
==== Streaming metrics
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=common-streaming]
[cols="45%a,25%a,30%a",options="header"]
|===
|Attributes |Type |Description
|[[connectors-strm-metric-millisecondssincelastevent]]<<connectors-strm-metric-millisecondssincelastevent, `+MilliSecondsSinceLastEvent+`>>
|`long`
|The number of milliseconds since the connector has read and processed the most recent event.
|[[connectors-strm-metric-totalnumberofeventsseen]]<<connectors-strm-metric-totalnumberofeventsseen, `+TotalNumberOfEventsSeen+`>>
|`long`
|The total number of events that this connector has seen since last started or reset.
|[[connectors-strm-metric-numberofeventsfiltered]]<<connectors-strm-metric-numberofeventsfiltered, `+NumberOfEventsFiltered+`>>
|`long`
|The number of events that have been filtered by include/exclude list filtering rules configured on the connector.
|[[connectors-strm-metric-queuetotalcapacity]]<<connectors-strm-metric-queuetotalcapacity, `+QueueTotalCapacity+`>>
|`int`
|The length the queue used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-queueremainingcapacity]]<<connectors-strm-metric-queueremainingcapacity, `+QueueRemainingCapacity+`>>
|`int`
|The free capacity of the queue used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-connected]]<<connectors-strm-metric-connected, `+Connected+`>>
|`boolean`
|Flag that denotes whether the connector is currently connected to the database server.
|[[connectors-strm-metric-millisecondsbehindsource]]<<connectors-strm-metric-millisecondsbehindsource, `+MilliSecondsBehindSource+`>>
|`long`
|The number of milliseconds between the last change event's timestamp and the connector processing it.
The values will incoporate any differences between the clocks on the machines where the database server and the connector are running.
|[[connectors-strm-metric-numberofcommittedtransactions]]<<connectors-strm-metric-numberofcommittedtransactions, `+NumberOfCommittedTransactions+`>>
|`long`
|The number of processed transactions that were committed.
|[[connectors-strm-metric-maxqueuesizeinbytes]]<<connectors-strm-metric-maxqueuesizeinbytes, `+MaxQueueSizeInBytes+`>>
|`long`
|The maximum buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-currentqueuesizeinbytes]]<<connectors-strm-metric-currentqueuesizeinbytes, `+CurrentQueueSizeInBytes+`>>
|`long`
|The current buffer of the queue in bytes used to pass events between the streamer and the main Kafka Connect loop.
|[[connectors-strm-metric-lowwatermark]]<<connectors-strm-metric-lowwatermark, `+LowWatermark+`>>
|`long`
|The current low watermark of the connector task. The low watermark describes the time T at which the connector is guaranteed to have streamed out all events with timestamp < T.
|[[connectors-strm-metric-latencylowwatermarklag]]<<connectors-strm-metric-latencylowwatermarklag, `+LatencyLowWatermarkLag+`>>
|`long`
|The lag of the low watermark behind the current time in milliseconds. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-latencyspanner]]<<connectors-strm-metric-latencyspanner, `+LatencySpanner+`>>
|`long`
|The Spanner commit timestamp to connector read latency. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-latencyreadtoemit]]<<connectors-strm-metric-latencyreadtoemit, `+LatencyReadToEmit+`>>
|`long`
|The Spanner read timestamp to connector emit latency. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-latencycommittoemit]]<<connectors-strm-metric-latencycommittoemit, `+LatencyCommitToEmit+`>>
|`long`
|The Spanner commit timestamp to connector emit latency. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-latencycommittopublish]]<<connectors-strm-metric-latencycommittopublish, `+LatencyCommitToPublish+`>>
|`long`
|The Spanner commit timestamp to Kafka publish timestamp latency. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-latencyemittopublish]]<<connectors-strm-metric-latencyemittopublish, `+LatencyEmitToPublish+`>>
|`long`
|The connector emit timestamp to Kafka publish timestamp latency. P50, P95, P99, Average, Min, Max metrics are provided.
|[[connectors-strm-metric-spannereventqueuecapacity]]<<connectors-strm-metric-spannereventqueuecapacity, `+SpannerEventQueueCapacity+`>>
|`long`
|The total Spanner event queue capacity.
|[[connectors-strm-metric-remainingspannereventqueuecapacity]]<<connectors-strm-metric-remainingspannereventqueuecapacity, `+RemainingSpannerEventQueueCapacity+`>>
|`long`
|The remaining Spanner event queue capacity.
|[[connectors-strm-metric-taskstatechangeeventqueuecapacity]]<<connectors-strm-metric-taskstatechangeeventqueuecapacity, `+TaskStateChangeEventQueueCapacity+`>>
|`long`
|The total task state change event queue capacity.
|[[connectors-strm-metric-remainingtaskstatechangeeventqueuecapacity]]<<connectors-strm-metric-remainingtaskstatechangeeventqueuecapacity, `+RemainingTaskStateChangeEventQueueCapacity+`>>
|`long`
|The remaining task state change event queue capacity.
|[[connectors-strm-metric-numberofchangestreampartitionsdetected]]<<connectors-strm-metric-numberofchangestreampartitionsdetected, `+NumberOfChangeStreamPartitionsDetected+`>>
|`long`
|The total number of partitions detected by the current task.
|[[connectors-strm-metric-numberofchangestreamqueriesissued]]<<connectors-strm-metric-numberofchangestreamqueriesissued, `+NumberOfChangeStreamQueriesIssued+`>>
|`long`
|The total number of change stream queries issued by the current task.
|[[connectors-strm-metric-numberofactivechangestreamqueries]]<<connectors-strm-metric-numberofactivechangestreamqueries, `+NumberOfActiveChangeStreamQueries+`>>
|`long`
|The active number of change stream queries detected by the current task.
|===
// Type: reference
// ModuleID: descriptions-of-debezium-spanner-connector-configuration-properties
// Title: Description of {prodname} Cloud Spanner connector configuration properties
[[spanner-connector-properties]]
=== Connector configuration properties
The {prodname} Spanner connector has many configuration properties that you can use to achieve the right connector behavior for your application. Many properties have default values. Information about the properties is organized as follows:
* xref:spanner-required-configuration-properties[Required configuration properties]
* xref:spanner-advanced-configuration-properties[Advanced configuration properties]
* xref:spanner-pass-through-properties[Pass-through configuration properties]
[id="spanner-required-configuration-properties"]
The following configuration properties are _required_ unless a default value is available.
.Required connector configuration properties
[cols="30%a,25%a,45%a",options="header"]
|===
|Property
|Default
|Description
|[[spanner-property-name]]<<spanner-property-name, `+name+`>>
|No default
|Unique name for the connector. Attempting to register again with the same name will fail. This property is required by all Kafka Connect connectors.
|[[spanner-property-connector-class]]<<spanner-property-connector-class, `+connector.class+`>>
|No default
|The name of the Java class for the connector. Always use a value of `io.debezium.connector.spanner.SpannerConnector` for the Cloud Spanner connector.
|[[spanner-property-tasks-max]]<<spanner-property-tasks-max, `+tasks.max+`>>
|`1`
|The maximum number of tasks that should be created for this connector. The Cloud Spanner connector can use more than 1 tasks if you enable offset.storage.per.task mode.
|[[spanner-property-project]]<<spanner-property-project, `+gcp.spanner.project.id+`>>
|No default
|The GCP project ID
2022-12-20 10:11:01 +01:00
|[[spanner-property-instance]]<<spanner-property-instance, `+gcp.spanner.instance.id+`>>
|No default
|The Cloud Spanner instance ID
|[[spanner-property-database]]<<spanner-property-database, `+gcp.spanner.database.id+`>>
|No default
|The Cloud Spanner database ID
2022-12-20 10:11:01 +01:00
|[[spanner-property-changestream]]<<spanner-property-changestream, `+gcp.spanner.change.stream+`>>
|No default
|The Cloud Spanner change stream
|[[spanner-property-credentials]]<<spanner-property-credentials, `+gcp.spanner.credentials.path+`>>
|No default
|The GCP service account key JSON.
2022-12-20 10:11:01 +01:00
|===
[id="spanner-advanced-configuration-properties"]
The following _advanced_ configuration properties have defaults that work in most situations and therefore rarely need to be specified in the connector's configuration.
.Advanced connector configuration properties
[cols="30%a,28%a,42%a",options="header"]
|===
|Property
|Default
|Description
|[[spanner-property-lowwatermark]]<<spanner-property-lowwatermark, `+gcp.spanner.low-watermark.enabled+`>>
|`false`
|Whether or not the low watermark is enabled for the connector.
|[[spanner-property-lowwatermarkupdateperiod]]<<spanner-property-lowwatermarkupdateperiod, `+gcp.spanner.low-watermark.update-period.ms+`>>
|`1000 ms`
|The interval at which the low watermark is updated.
|[[spanner-property-heartbeatinterval]]<<spanner-property-heartbeatinterval, `+heartbeat.interval.ms+`>>
|`300000`
|The Spanner heartbeat interval.
|[[spanner-property-starttime]]<<spanner-property-starttime, `+gcp.spanner.start.time+`>>
|`current time`
|The connector start time.
|[[spanner-property-endtime]]<<spanner-property-endtime, `+gcp.spanner.end.time+`>>
|`indefinite end time`
|The connector end time.
|[[spanner-property-eventqueuecapacity]]<<spanner-property-eventqueuecapacity, `+gcp.spanner.stream.event.queue.capacity+`>>
|`10000`
|The Spanner event queue capacity
|[[spanner-property-taskstatechangequeuecapacity]]<<spanner-property-taskstatechangequeuecapacity, `+connector.spanner.task.state.change.event.queue.capacity+`>>
|`1000`
|The task state change event queue capacity
|[[spanner-property-missedheartbeatsmax]]<<spanner-property-missedheartbeatsmax, `+connector.spanner.max.missed.heartbeats+`>>
|`5000`
|The maximum number of missed heartbeats for a change stream query before an exception is thrown
|[[spanner-property-scalermonitorenabled]]<<spanner-property-scalermonitorenabled, `+scaler.monitor.enabled+`>>
|`false`
|Whether or not task autoscaling is enabled
|[[spanner-property-taskdesiredpartitions]]<<spanner-property-taskdesiredpartitions, `+tasks.desired.partitions+`>>
|`2`
|The desired number of change stream partitions per task.
|===
[id="spanner-pass-through-properties"]
.Pass-through connector configuration properties
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer.
Be sure to consult the {link-kafka-docs}.html[Kafka documentation] for all of the configuration properties for Kafka producers and consumers. The Spanner connector does use the {link-kafka-docs}.html#consumerconfigs[new consumer configuration properties].
// Type: assembly
// ModuleID: how-debezium-spanner-connectors-handle-faults-and-problems
// Title: How {prodname} Cloud Spanner connectors handle faults and problems
[[spanner-when-things-go-wrong]]
== Behavior when things go wrong
{prodname} is a distributed system that captures all changes in multiple upstream databases; it never misses or loses an event. When the system is operating normally or being managed carefully then {prodname} provides _exactly once_ delivery of every change event record.
If a fault does happen then the system does not lose any events. However, while it is recovering from the fault, it might repeat some change events. In these abnormal situations, {prodname}, like Kafka, provides _at least once_ delivery of change events.
The rest of this section describes how {prodname} handles various kinds of faults and problems.
[id="spanner-connector-configuration-and-startup-errors"]
=== Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error/exception in the log, and stops running:
* The connector's configuration is invalid.
* The connector cannot successfully connect to Spanner by using the specified connection parameters.
In these cases, the error message has details about the problem and possibly a suggested workaround. After you correct the configuration or address the Cloud Spanner problem, restart the connector.
[id="spanner-becomes-unavailable"]
=== Spanner becomes unavailable
When the connector is running, Spanner could become unavailable for any number of reasons. The connector will continue running and will be able to stream events once Cloud Spanner becomes available again.
[id="spanner-kafka-connect-process-stops-gracefully"]
=== Kafka Connect process stops gracefully
Suppose that Kafka Connect is being run in distributed mode and a Kafka Connect process is stopped gracefully. Prior to shutting down that process, Kafka Connect migrates the process's connector tasks to another Kafka Connect process in that group. The new connector tasks start processing exactly where the prior tasks stopped. There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
[id="spanner-kafka-connect-process-crashes"]
=== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, any connector tasks it was running terminate without recording their most recently processed offsets. When Kafka Connect is being run in distributed mode, Kafka Connect restarts those connector tasks on other processes. However, Spanner connectors resume from the last offset that was _recorded_ by the earlier processes. This means that the new replacement tasks might generate some of the same change events that were processed just prior to the crash. The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events might be duplicated during a recovery from failure, consumers should always anticipate some duplicate events.
In each change event record, {prodname} connectors insert source-specific information about the origin of the event, such as the originating partition token, the commit timestamp, the transaction ID, the record sequence, and the mod number. Consumers could use these identifiers for deduplication.
[id="spanner-kafka-becomes-unavailable"]
=== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka by using the Kafka producer API. Periodically, at a frequency that you specify in the Kafka Connect configuration, Kafka Connect records the latest offset that appears in those change events. If the Kafka brokers become unavailable, the Kafka Connect process that is running the connectors repeatedly tries to reconnect to the Kafka brokers. In other words, the connector tasks pause until a connection can be re-established, at which point the connectors resume exactly where they left off.
[id="spanner-connector-is-stopped-for-a-duration"]
=== Connector is stopped for a duration
If the connector is gracefully stopped, the database can continue to be used. When the connector restarts, it resumes streaming changes where it left off. That is, it generates change event records for all database changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle massive throughput. Kafka Connect is written according to Kafka best practices, and given enough resources a Kafka Connect connector can also handle very large numbers of database change events. Because of this, after being stopped for a while, when a {prodname} connector restarts, it is very likely to catch up with the database changes that were made while it was stopped. How quickly this happens depends on the capabilities and performance of Kafka and the volume of changes being made to the data in Spanner.
Note that the current connector can only be streamed back in time by ~one hour. The Kafka connector reads the information schema at the Kafka connector's start timestamp to retrieve schema information. By default, Spanner cannot read the information schema at read timestamps before the link:https://cloud.google.com/spanner/docs/reference/rest/v1/projects.instances.databases?_gl=1*1wl08s9*_ga*MTYxNjk4NDM4OS4xNjY5MDY0ODQ5*_ga_WH2QY8WWF5*MTY3MTEzNTk3Ni4yMS4wLjE2NzExMzU5NzYuMC4wLjA.#Database.FIELDS.version_retention_period[version retention period], which defaults to one hour. If you wants to start the connector from earlier than one hour into the past, you will need to increase the database's version retention period.
// Type: assembly
// ModuleID: debezium-spanner-limitations
// Title: Current Limitations
[[spanner-limitations]]
== Limitations
* The connector does not support streaming link:https://debezium.io/blog/2021/10/07/incremental-snapshots/[snapshot events].
* If watermarking is enabled in the connector, you cannot configure link:https://debezium.io/documentation/reference/stable/transformations/topic-routing.html[Debezium topic
routing transformations].
2022-12-20 10:11:01 +01:00
* This connector currently does not support the link:https://cloud.google.com/spanner/docs/postgresql-interface/[PostgreSQL interface for Cloud Spanner].