tet123/documentation/modules/ROOT/pages/connectors/cassandra.adoc
2023-08-01 08:55:29 -04:00

1370 lines
54 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

[id="debezium-connector-for-cassandra"]
= {prodname} Connector for Cassandra
:context: cassandra
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
The Cassanadra connector can monitor a Cassandra cluster and record all row-level changes. The connector must be deployed locally on each node in the Cassandra cluster. The first time the connector connects to a Cassandra node, it performs a snapshot of all CDC-enabled tables in all keyspaces. The connector will also read the changes that are written to Cassandra commit logs and generates corresponding insert, update, and delete events. All events for each table are recorded in a separate kafka topic, where they can be consumed easily by applications and services.
For information about the Cassandra versions that are compatible with this connector, see the link:https://debezium.io/releases/[{prodname} release overview].
[[cassandra-overview]]
== Overview
Cassandra is an open-sourced NoSQL database.
Similar to most databases, the write path of Cassandra starts with the immediate logging of a change into its commit log.
The commit log resides locally on each node, recording every write made to that node.
Since Cassandra 3.0, a http://cassandra.apache.org/doc/3.11.3/operating/cdc.html[change data capture (CDC) feature] is introduced.
The CDC feature can be enabled on the table level by setting the table property `cdc=true`, after which any commit log containing data for a CDC-enabled table will be moved to the CDC directory specified in `cassandra.yaml` on discard.
The Cassandra connector resides on each Cassandra node and monitors the `cdc_raw` directory for change.
It processes all local commit log segments as they are detected, produces a change event for every row-level insert, update, and delete operations in the commit log, publishes all change events for each table in a separate Kafka topic, and finally deletes the commit log from the `cdc_raw` directory.
This last step is important because once CDC is enabled, Cassandra itself cannot purge the commit logs. If the `cdc_free_space_in_mb` fills up, writes to CDC-enabled tables will be rejected.
The connector is tolerant of failures.
As the connector reads commit logs and produces events, it records each commit log segment's filename and position along with each event.
If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the commit log where it last left off.
This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
We'll talk later about how the connector behaves xref:{link-cassandra-connector}#cassandra-when-things-go-wrong[when things go wrong].
[NOTE]
====
Cassandra is different from the other {prodname} connectors since it is not implemented on top of the Kafka Connect framework. Instead it is a single JVM process that is intended to reside on each Cassandra node and publishes events to Kafka via a Kafka producer.
====
[WARNING]
====
The following features are currently not supported by the Cassandra connector. Changes resulted from any of these features are ignored:
* TTL on collection-type columns
* Range deletes
* Static columns
* Triggers
* Materialized views
* Secondary indices
* Light-weight transactions
====
[[setting-up-cassandra]]
== Setting Up Cassandra
Before the {prodname} Cassandra connector can be used to monitor the changes in a Cassandra cluster, CDC must be enabled on the node level and table level.
[[enabling-cdc-on-node]]
=== Enabling CDC on Node
To enable CDC, update the following CDC config in `cassandra.yaml`:
[source,yaml]
----
cdc_enabled: true
----
Additional CDC configs are have the following default values:
[source,yaml]
----
cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw
cdc_free_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
----
* `cdc_enabled` enables or disables CDC operations node-wide
* `cdc_raw_directory` determines the destination for commit log segments to be moved after all corresponding memtables are flushed.
* `cdc_free_space_in_mb` is the maximum capacity allocated to store commit log segments, and defaults to the minimum of 4096 MB and 1/8 of volume space.
* `cdc_free_space_check_interval_ms` is frequency with which we re-calculate the space taken up by `cdc_raw_directory` to prevent burning CPU cycles unnecessarily when at capacity.
[[enabling-cdc-on-table]]
=== Enabling CDC on Table
Once CDC is enabled on the Cassandra node, each table must be be explicitly enabled for CDC as well via the CREATE TABLE or ALTER TABLE command. For example:
[source,sql]
----
CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;
ALTER TABLE foo WITH cdc=true;
----
[[how-the-cassandra-connector-works]]
== How the Cassandra Connector Works
This section goes into detail about how the Cassandra connector performs snapshots, transforms commit log events into {prodname} change events, handles commit log life cycle, records events into Kafka, manages schema evolution, and behaves when things go wrong.
[[cassandra-snapshots]]
=== Snapshots
When the Cassandra connector first starts on a Cassandra node, it will by default perform an initial snapshot of the cluster.
This is the default mode, since most of the time CDC is enabled on non-empty tables and commit logs do not contain the complete history.
The snapshot reader issues a SELECT statement to query all the columns in a table. Cassandra allows consistency level to be set either globally or on the statement level.
For snapshotting, the consistency level is set on the statement level to `ALL` by default to provide the highest consistency.
This implies if one node goes down during the snapshot, the snapshot would not be able to continue and a subsequent re-snapshot is required once the node has been brought back online.
You can adjust the consistency level of the snapshot to a lower consistency level in order to increase availability, provided that you understand the tradeoff with consistency.
[NOTE]
====
In Cassandra 3.X, it is not possible to read strictly from the local Cassandra node.
Starting in Cassandra 4.0, a `NODE_LOCAL` consistency level will be added.
This will allow the Cassandra connector to read from the node it resides in only (which would be consistent with the way commit logs are processed).
====
Unlike relational databases, there is no read lock applied during a snapshot, so writes to Cassandra are not blocked during that snapshot.
If the queried data has been modified by another client during the snapshot, those changes may be reflected in the snapshot result set.
If the connector fails or stops before the snapshot is completed, the connector will begin a new snapshot upon restarts.
In the default snapshot mode (`initial`), once the connector completes its initial snapshot, it will no longer perform any additional snapshots.
The only exception would be during a connector restart: if CDC is enabled on a table, and then the connector is restarted, that table would be snapshotted.
The second snapshot mode (`always`) allows the connector to perform snapshot whenever necessary.
It checks periodically for newly CDC-enabled tables, and snapshot these tables as soon as they are detected.
The third snapshot mode ('never') ensures the connector never performs snapshots.
When a new connector is configured this way, it will only read the commit log in the CDC directory.
This is not the default behavior because starting a new connector in this mode (without a snapshot) requires the commit logs to contain the entire history of all CDC-enabled tables, which is often not the case.
Another use case for this mode is if there is one connector already doing the snapshotting, you can disable snapshot on others to avoid duplicated work.
[[reading-the-commitlog]]
=== Reading the Commit Log
The Cassandra connector will typically spend the vast majority of its time reading local commit logs on the Cassandra node.
In Cassandra 4.0 on every segment fsync, an index file will be updated to reflect latest offset.
This eliminates the processing delay in the CDC feature in Cassandra 3.X. and can be enabled in Cassandra 4 Debezium connector by setting the configuration: `commit.log.real.time.processing.enabled` to `true`.
The frequency at which index file is polled is determined by `commit.log.marked.complete.poll.interval.ms`.
Commit logs' binary data are deserialized with Cassandra's CommitLogReader and CommitLogReadHandler.
Each deserialized object is called a `mutation` in Cassandra. A `mutation` contains one or more change events.
As the Cassandra connector reads the commit log, it transform the log events into {prodname} _create_, _update_, or _delete_ events that include the position in the commit log where the event was found.
The Cassandra connector encode these change events with Kafka Connect converters and publish them to the appropriate Kafka topics.
[[limitations-of-commit-logs]]
=== Limitations of Commit Logs
Cassandra's commit logs come with a set of limitations, which are critical for interpreting CDC events correctly:
* Commit logs only arrive in `cdc_raw` directory when it is full, in which case it would be flushed/discarded.
This implies there is a delay between when the event is logged and when the event is captured.
* Commit logs on an individual Cassandra node do not reflect all writes to the cluster, they only reflect writes stored on that node.
This is why it is necesssary to monitor changes on all nodes in a Cassandra cluster.
However, due to replication factor, this also implies it is necessary for downstream consumers of these events to handle deduplication.
* Writes to an individual Cassandra node are logged as they arrive. However, these events may arrive out-of-order from which they are issued.
Downstream consumers of these events must understand and implement logic similar to Cassandra's read path to get the correct output.
* Schema changes of tables are not recorded in commit logs, only data changes are recorded.
Therefore changes in schema are detected on a best-effort basis.
To avoid data loss, it is recommended to pause writes to the table during schema change.
* Cassandra does not perform read-before-write, as a result commit logs do not record the value of every column in the changed row, it only records the values of columns that have been modified (except for partition key columns, which are always recorded as they are required in Cassandra DML commands).
* Due to the nature of CQL, _insert_ DMLs can result in a row insertion or update; _update_ DMLs can result in a row insertion, update, or deletion; _delete_ DMLs can result in a row update or deletion.
Since queries are not recorded in commit logs, CDC event type is classified based on the effect on the row in a relational database sense.
**TODO**: is there a way to determine event type which corresponds to the actual Cassandra DML statement? and if so, is that preferred over the semantic of these events?
[[managing-commitlog-lifecycle]]
=== Managing Commit Log Lifecycle
By default, Cassandra connector will delete commit logs which have been processed.
It is not recommended to start the connector while deletion of commit logs is disabled, as this could bloat up disk storage and prevent further writes to the Cassandra cluster.
To manage the commit logs in a custom manner (i.e. upload it to a cloud provider), the CommitLogTransfer interface can be implemented.
[[cassandra-topic-names]]
=== Topics names
The Cassandra connector writes events for all insert, update, and delete uperations on a single table to a single Kafka topic.
The name of the Kafka topics always take the following form:
`_clusterName_._keyspaceName_._tableName_`
where _clusterName_ is the logical name of the connector as specified with the xref:cassandra-property-topic-prefix[`topic.prefix`] configuration property, _keyspaceName_ is the name of the keyspace where the operation occurred, and _tableName_ is the name of the table on which the operation occurred.
For example, consider a Cassandra installation with an `inventory` keyspace that contains four tables: `products`, `products_on_hand`, `customers`, and `orders`.
If the connector monitoring this database were given a logical server name of `fulfillment`, then the connector would produce events on these four Kafka topics:
* `fulfillment.inventory.products`
* `fulfillment.inventory.products_on_hand`
* `fulfillment.inventory.customers`
* `fulfillment.inventory.orders`
**TODO**: for topic name, is _clusterName_._keyspaceName_._tableName_ okay? or should it be _connectorName_._keyspaceName_._tableName_ or _connectorName_._clusterName_._keyspaceName_._tableName_?
[[cassandra-schema-evolution]]
=== Schema Evolution
DDLs are not recorded in commit logs.
When the schema of a table change, this change is issued from one of the Cassandra node and propagated to other nodes via Gossip Protocol.
Schema changes in Cassandra will be detected by an implemented SchemaChangeListener with latency less than 1s, which will then update the schema instance loaded from Cassandra as well as the Kafka key value schemas cached for each table.
Please note that with the current schema evolution approach, the Cassandra connector won't be able to provide accurate data change information for a small period of time in the following cases:
. If CDC gets disabled for a table, data changes which have happened before CDC got disabled will be skipped.
. If a column is removed from a table, data changes involving this column before it's removed cannot be deserialized correctly and will be skipped.
[[cassandra-events]]
=== Events
All data change events produced by the Cassandra connector have a key and a value, although the structure of the key and value depends on the table from which the change events originated (see xref:{link-cassandra-connector}#cassandra-topic-names[Topic Names]).
[[cassandra-change-events-key]]
==== Change Event's Key
For a given table, the change event's key will have a structure that contains a field for each column in the primary key of the table at the time the event was created.
Consider an `inventory` database with a `customers` table defined as:
[source,sql,indent=0]
----
CREATE TABLE customers (
id bigint,
registration_date timestamp,
first_name text,
last_name text,
email text,
PRIMARY KEY (id, registration_date)
);
----
Every change event for the `customers` table while it has this definition will feature the same key schema, which in JSON representation looks like this:
[source,json,indent=0]
----
{
"type": "record",
"name": "cassandra-cluster-1.inventory.customers.Key",
"namespace": "io.debezium.connector.cassandra",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "registration_date",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
----
For id = 1001 and registration_date = 1562202942545, the key payload in JSON representation would look like this:
[source,json,indent=0]
----
{
"id": 1001,
"registration_date": 1562202942545
}
----
[NOTE]
====
Although the `field.exclude.list` configuration property allows you to remove columns from the event values, all columns in a primary key are always included in the event's key.
====
[[cassandra-change-events-value]]
==== Change event's value
The value of the change event message is a bit more complicated. Every change event value produced by Cassandra connector has an envelope structure with the following fields:
`op`:: A mandatory field that contains a string value describing the type of operation. Values for the Cassandra connector are `i` for insert, `u` for update, and `d` for delete.
`after`:: An optional field that if present contains the state of the row _after_ the event occurred.
The structure will be described by the `cassandra-cluster-1.inventory.customers.Value` Kafka Connect schema, which represent the cluster, keyspace, and table the event is referring to.
`source`:: A mandatory field that contains a structure describing the source metadata for the event, which in the case of Cassandra contains the following fields:
* {prodname} version.
* Connector name.
* Cassandra cluster name.
* Name of the commit log file where the event was recorded, the position in that commit log file where the event appeared, whether this event was part of a snapshot, name of the affected keyspace and table, and the maximum timestamp of the partition update in microseconds.
`ts_ms`:: (Optional) If present, contains the time at which the connector processed the event, based on the system clock of the JVM that runs the Cassandra connector.
[NOTE]
====
Because Cassandra does not perform a read-before-write, the Cassandra commit log does not record the value of a row before a change is applied.
As a result, Cassandra change event records do not include a `before` field.
====
The following is a JSON representation of a value schema for a _create_ event for our `customers` table:
[source,json,indent=0]
----
{
"type": "record",
"name": "cassandra-cluster-1.inventory.customers.Envelope",
"namespace": "io.debezium.connector.cassandra",
"fields": [
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "after",
"type": "record",
"fields": [
{
"name": "id",
"type": [
"null",
{
"name": "id",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "registration_date",
"type": [
"null",
{
"name": "registration_date",
"type": "record",
"fields": [
{
"name":"value",
"type": "long",
"logical_type": "timestamp-millis"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "first_name",
"type": [
"null",
{
"name": "first_name",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "last_name",
"type": [
"null",
{
"name": "last_name",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "last_name",
"type": [
"null",
{
"name": "email",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
}
]
},
{
"name": "source",
"type": "record",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "cluster",
"type": "string"
},
{
"name": "snapshot",
"type": "boolean"
},
{
"name": "keyspace",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "file",
"type": "string"
},
{
"name": "position",
"type": "int"
},
{
"name": "ts_ms",
"type": "long",
"logicalType": "timestamp-micros"
}
]
}
]
}
----
**TODO**: verify max timestamp != deletion timestamp in case of deletion DDLs
Given the following `insert` DML:
[source,sql,indent=0]
----
INSERT INTO customers (
id,
registration_date,
first_name,
last_name,
email)
VALUES (
1001,
now(),
"Anne",
"Kretchmar",
"annek@noanswer.org"
);
----
The value payload in JSON representation would look like this:
[source,json,indent=0,subs="attributes"]
----
{
"op": "c",
"ts_ms": 1562202942832,
"after": {
"id": {
"value": 1001,
"deletion_ts": null,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": null,
"set": true
},
"first_name": {
"value": "Anne",
"deletion_ts": null,
"set": true
},
"last_name": {
"value": "Kretchmar",
"deletion_ts": null,
"set": true
},
"email": {
"value": "annek@noanswer.org",
"deletion_ts": null,
"set": true
}
},
"source": {
"version": "{debezium-version}",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 54,
"ts_ms": 1562202942666382
}
}
----
Given the following `update` DML:
[source,sql,indent=0]
----
UPDATE customers
SET email = "annek_new@noanswer.org"
WHERE id = 1001 AND registration_date = 1562202942545
----
The value payload in JSON representation would look like this:
[source,json,indent=0,subs="attributes"]
----
{
"op": "u",
"ts_ms": 1562202942912,
"after": {
"id": {
"value": 1001,
"deletion_ts": null,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": null,
"set": true
},
"first_name": null,
"last_name": null,
"email": {
"value": "annek_new@noanswer.org",
"deletion_ts": null,
"set": true
}
},
"source": {
"version": "{debezium-version}",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 102,
"ts_ms": 1562202942666490
}
}
----
When we compare this to the value in the _insert_ event, we see a couple differences:
* The `op` field value is now `u`, signifying that this row changed because of an update.
* The `after` field now has the updated state of the row, and here we can see that the email value is now `annek_new@noanswer.org`.
Notice that `first_name` and `last_name` are null, this is because these fields did not change during this update.
However, `id` and `registration_date` are still included, because these are the primary keys of this table.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the commit log.
* The `ts_ms` shows the timestamp milliseconds which the connector processed this event.
Finally, given the following `delete` DML:
[source,sql]
----
DELETE FROM customers
WHERE id = 1001 AND registration_date = 1562202942545;
----
The value payload in JSON representation would look like this:
[source,json,indent=0,subs="attributes"]
----
{
"op": "d",
"ts_ms": 1562202942912,
"after": {
"id": {
"value": 1001,
"deletion_ts": 1562202972545,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": 1562202972545,
"set": true
},
"first_name": null,
"last_name": null,
"email": null
},
"source": {
"version": "{debezium-version}",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 102,
"ts_ms": 1562202942666490
}
}
----
When we compare this to the value in the _insert_ and _update_ event, we see a couple differences:
* The `op` field value is now `d`, signifying that this row changed because of a deletion.
* The `after` field only contains values for `id` and `registration_date` because this is a deletion by primary keys.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the commit log.
* The `ts_ms` shows the timestamp milliseconds which the connector processed this event.
**TODO**: given TTL is not currently support, would it be better to remove delete_ts? would it also be okay to derive whether a field is set or not by looking at the each column to see if it is null?
**TODO**: discuss tombstone events in Cassandra connector
[[cassandra-data-types]]
=== Data Types
As described above, the Cassandra connector represents the changes to rows with events that are structured like the table in which the row exist.
The event contains a field for each column value, and how that value is represented in the event depends on the Cassandra data type of the column.
This section describes this mapping.
The following table describes how the connector maps each of the Cassandra data types to an Kafka Connect data type.
[cols="30%a, 30%a, 40%a"]
|===
|Cassandra Data Type
|Literal Type (Schema Type)
|Semantic Type (Schema Name)
|`ascii`
|`string`
|n/a
|`bigint`
|`int64`
|n/a
|`blob`
|`bytes`
|n/a
|`boolean`
|`boolean`
|n/a
|`counter`
|`int64`
|n/a
|`date`
|`int32`
|`io.debezium.time.Date`
|`decimal`
|`float64`
|n/a
|`double`
|`float64`
|n/a
|`float`
|`float32`
|n/a
|`frozen`
|`bytes`
|n/a
|`inet`
|`string`
|n/a
|`int`
|`int32`
|n/a
|`list`
|`array`
|n/a
|`map`
|`map`
|n/a
|`set`
|`array`
|n/a
|`smallint`
|`int16`
|n/a
|`text`
|`string`
|n/a
|`time`
|`int64`
|n/a
|`timestamp`
|`int64`
|`io.debezium.time.Timestamp`
|`timeuuid`
|`string`
|`io.debezium.data.Uuid`
|`tinyint`
|`int8`
|n/a
|`tuple`
|`map`
|n/a
|`uuid`
|`string`
|`io.debezium.data.Uuid`
|`varchar`
|`string`
|n/a
|`varint`
|`int64`
|n/a
|`duration`
|`int64`
|`io.debezium.time.NanoDuration` (an approximate representation of the duration value in nano-seconds)
|===
**TODO**: add logical types
=== Arbitrary-precision integer types
The Cassandra connector handles `varint` values according to the setting of the xref:{link-cassandra-connector}#cassandra-property-varint-handling-mode[`varint.handling.mode` connector configuration property].
varint.handling.mode=long::
+
.Mapping when `varint.handling.mode=long`
[cols="30%a,30%a,40%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`varint`
|`INT64`
a|_n/a_
|===
varint.handling.mode=precise::
+
.Mappings when `decimal.handling.mode=precise`
[cols="30%a,15%a,55%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`varint`
|`BYTES`
a|`org.apache.kafka.connect.data.Decimal` +
The `scale` schema parameter is set to zero.
|===
varint.handling.mode=string::
+
.Mapping when `varint.handling.mode=string`
[cols="30%a,30%a,40%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`varint`
|`STRING`
a|_n/a_
|===
=== Decimal types
The Cassandra connector handles `decimal` values according to the setting of the xref:{link-cassandra-connector}#cassandra-property-decimal-handling-mode[`decimal.handling.mode` connector configuration property].
decimal.handling.mode=double::
+
.Mapping when `decimal.handling.mode=double`
[cols="30%a,30%a,40%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`decimal`
|`FLOAT64`
a|_n/a_
|===
decimal.handling.mode=precise::
+
.Mappings when `decimal.handling.mode=precise`
[cols="30%a,15%a,55%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`decimal`
|`STRUCT`
a|`io.debezium.data.VariableScaleDecimal` +
Contains a structure with two fields: `scale` of type `INT32` that contains the scale of the transferred value and `value` of type `BYTES` containing the original value in an unscaled form.
|===
decimal.handling.mode=string::
+
.Mapping when `decimal.handling.mode=string`
[cols="30%a,30%a,40%a",options="header",subs="+attributes"]
|===
|Cassandra type |Literal type |Semantic type
|`decimal`
|`STRING`
a|_n/a_
|===
If the default data type conversions do not meet your needs, you can {link-prefix}:{link-custom-converters}#custom-converters[create a custom converter] for the connector.
[[cassandra-when-things-go-wrong]]
=== When Things Go Wrong
==== Configuration And Startup Errors
The Cassandra connector will fail upon startup, report error or exception in the log, and stop running if the configurations are invalid or if the connector cannot successfully connector to Cassandra using the specified connectivity parameters.
In this case, the error will have more details about the problem and possibly suggest a work around.
The connector can be restarted when the configuration has been corrected.
==== Cassandra Becomes Unavailable
Once the connector is running, if the Cassandra node becomes unavailable for any reason, the connector will fail and stop. In this case, restart the connector once the server becomes available.
If this happened during snapshot, it will rebootstrap the entire table from the beginning of the table.
==== Cassandra Connector Stops Gracefully
If the Cassandra connector is gracefully shut down, prior to stopping the process it will make sure to flush all events in the ChangeEventQueue to Kafka.
The Cassandra connector keeps track of the filename and offset each time a streamed record is send to Kafka.
So when the connector is restarted, it will resume from where it left off.
It does this by searching for the oldest commit log in the directory, start processing that commitlog, skipping the already-read records, until it finds the most recent record that hasnt been processed.
If the Cassandra connector is stopped during snapshot, it will pick up from that table, but will rebootstrap the entire table.
==== Cassandra Connector Crashes
If the Cassandra connector crashes unexpected, then the Cassandra connector would likely have terminated without recording the most-recently processed offset.
In this case, when the connector is restarted, it will resume from the most recent recorded offset.
This means duplicates is likely (which is trivial since we already be get duplicates from RF).
Note that since the offset is only updated when a record has been successfully send to Kafka, it is okay to lose the un-emitted data in the ChangeEventQueue during a crash, as these events will be recreated.
==== Kafka Becomes Unavailable
As the connector generate change event, it will publish those events to Kafka using Kafka producer API.
If Kafka broker becomes unavailable (producer encounters TimeoutException), the Cassandra connector will repeatedly attempt to reconnect to the broker once per second until a successful retry.
==== Cassandra connector is Stopped for a Duration
Depending on the write load of a table, when a Cassandra connector is stopped for a long time, it risks into hitting the cdc_total_space_in_mb capacity.
Once this upper limit is reached, Cassandra will stop accepting writes for this table; which means it is important to monitor this space while running the Cassandra connector.
In the worst case scenario if this happens, complete the following steps:
. Turn off Cassandra connector.
. Dusable CDC for the table so it stops generating additional writes.
Because the commit logs are not filtered, writes to other CDC-enabled tables on the same node could still affect the commitlog file generation.
. Remove the recorded offset from the offset file
. After the capacity is increased or the directory used space is under control, restart the connector so that it re-bootstraps the table.
==== Cassandra Table CDC is Enabled, Then Temporarily Disabled, And Then Enabled Again
If a Cassandra table temporarily disables CDC and then re-enables it again after some time, it must be re-bootstrapped.
To re-bootstrap an individual table, you can manually remove the recorded offset line corresponding to that table from snapshot_offset.properties file.
[[cassandra-deploying-a-connector]]
== Deploying A Connector
The Cassandra connector should be deployed each Cassandra node in a Cassandra cluster.
The Cassandra connector Jar file takes in a CDC configuration (.properties) file.
See xref:{link-cassandra-connector}#cassandra-example-configuration[see example configurations] for reference.
[[cassandra-example-configuration]]
=== Example configuration
The following represents an example .properties configuration file for running and testing the Cassandra Connector locally:
[source,properties,indent=0]
----
connector.name=test_connector
commit.log.relocation.dir=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/
http.port=8000
cassandra.config=/usr/local/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042
kafka.producer.bootstrap.servers=127.0.0.1:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_prefix
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://localhost:8081
offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/
snapshot.consistency=ONE
snapshot.mode=ALWAYS
latest.commit.log.only=true
----
[[cassandra-monitoring]]
=== Monitoring
Cassandra connector has built-in support for JMX metrics.
The Cassandra driver also publishes a number of metrics about the driver's activities that can be monitored through JMX.
The connector has two types of metrics.
Snapshot metrics help you monitor the snapshot activity and are available when the connector is performing a snapshot.
Binlog metrics help you monitor the progress and activity while the connector reads the Cassandra commit logs.
[[cassandra-snapshot-metrics]]
==== Snapshot Metrics
[cols="30%a,10%a,60%a"]
|===
|Attribute Name
|Type
|Description
|`total-table-count`
|`int`
|The total number of tables that are being included in the snapshot.
|`remaining-table-count`
|`int`
|The number of tables that the snapshot has yet to copy.
|`snapshot-running`
|`boolean`
|Whether the snapshot was started.
|`snapshot-aborted`
|`boolean`
|Whether the snapshot was aborted.
|`snapshot-completed`
|`boolean`
|Whether the snapshot completed.
|`snapshot-during-in-seconds`
|`long`
|The total number of seconds that the snapshot has taken so far, even if not complete.
|`rows-scanned`
|`Map<String, Long>`
|Map containing the number of rows scanned for each table in the snapshot. Tables are incrementally added to the Map during processing. Updates every 10,000 rows scanned and upon completing a table.
|===
[[cassandra-commitlog-metrics]]
==== Commitlog Metrics
[cols="30%a,10%a,60%a"]
|===
|Attribute Name
|Type
|Description
|`commitlog-filename`
|`string`
|The name of the commit log filename that the connector has most recently read.
|`commitlog-position`
|`long`
|The most recent position (in bytes) within the commit log that the connector has read.
|`number-of-processed-mutations`
|`long`
|The number of mutations that have been processed.
|`number-of-unrecoverable-errors`
|`long`
|The number of unrecoverable errors while processing commit logs.
|===
[[cassandra-connector-properties]]
=== Connector properties
[cols="30%a,25%a,45%a"]
|===
|Property
|Default
|Description
|[[cassandra-property-snapshot-mode]]<<cassandra-property-snapshot-mode, `snapshot.mode`>>
|`INITIAL`
|Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the cassandra connector agent.
Must be one of 'INITIAL', 'ALWAYS', or 'NEVER'. The default snapshot mode is 'INITIAL'.
|[[cassandra-property-snapshot-consistency]]<<cassandra-property-snapshot-consistency, `snapshot.consistency`>>
|`ALL`
|Specify the {@link ConsistencyLevel} used for the snapshot query.
|[[cassandra-property-http-port]]<<cassandra-property-http-port, `http.port`>>
|`8000`
|The port used by the HTTP server for ping, health check, and build info
|[[cassandra-property-cassandra-config]]<<cassandra-property-cassandra-config, `cassandra.config`>>
|No default
|The absolute path of the YAML config file used by a Cassandra node.
|[[cassandra-property-cassandra-hosts]]<<cassandra-property-cassandra-hosts, `cassandra.hosts`>>
|`localhost`
|One or more addresses of Cassandra nodes that driver uses to discover topology, separated by ","
|[[cassandra-property-cassandra-port]]<<cassandra-property-cassandra-port, `cassandra.port`>>
|`9042`
|The port used to connect to Cassandra host(s).
|[[cassandra-property-cassandra-username]]<<cassandra-property-cassandra-username, `cassandra.username`>>
|No default
|The username used when connecting to Cassandra hosts.
|[[cassandra-property-cassandra-password]]<<cassandra-property-cassandra-password, `cassandra.password`>>
|No default
|The password used when connecting to Cassandra hosts.
|[[cassandra-property-cassandra-ssl-enabled]]<<cassandra-property-cassandra-ssl-enabled, `cassandra.ssl.enabled`>>
|`false`
|If set to true, Cassandra connector agent will use SSL to connect to Cassandra node.
|[[cassandra-property-cassandra-ssl-config-path]]<<cassandra-property-cassandra-ssl-config-path, `cassandra.ssl.config.path`>>
|No default
|The SSL config file path required for storage node. An example of config file can be found at the bottom of the page.
|[[cassandra-property-commit-log-real-time-processing-enabled]]<<cassandra-property-commit-log-real-time-processing-enabled, `commit.log.real.time.processing.enabled`>>
|`false`
|Only applicable in Cassandra 4 and if set to true, Cassandra connector agent will read commit logs incrementally by watching for updates in commit log index files and stream data in real-time, at frequency determined by xref:cassandra-property-commit-log-marked-complete-poll-interval-ms[`commit.log.marked.complete.poll.interval.ms`].
If set to false, then Cassandra 4 connector waits for Commit Logs file to be marked Completed before processing them.
|[[cassandra-property-commit-log-marked-complete-poll-interval-ms]]<<cassandra-property-commit-log-marked-complete-poll-interval-ms, `commit.log.marked.complete.poll.interval.ms`>>
|10000
|Only applicable in Cassandra 4 and when real-time streaming is enabled by xref:cassandra-property-commit-log-real-time-processing-enabled[`commit.log.real.time.processing.enabled`].
This config determines the frequency at which commit log index file is polled for updates in offset value.
|[[cassandra-property-commit-log-relocation-dir]]<<cassandra-property-commit-log-relocation-dir, `commit.log.relocation.dir`>>
|No default
|The local directory where commit logs get relocated to from cdc_raw dir once processed.
|[[cassandra-property-commit-log-post-processing-enabled]]<<cassandra-property-commit-log-post-processing-enabled, `commit.log.post.processing.enabled`>>
|`true`
|Determines whether or not the CommitLogPostProcessor should run to move processed commit logs from relocation dir.
If disabled, commit logs would not be moved out of relocation dir.
|[[cassandra-property-commit-log-relocation-dir-poll-interval-ms]]<<cassandra-property-commit-log-relocation-dir-poll-interval-ms, `commit.log.relocation.dir.poll.interval.ms`>>
|10000
|The amount of time the CommitLogPostProcessor should wait to re-fetch all processed commit logs in relocation dir.
|[[cassandra-property-commit-log-transfer-class]]<<cassandra-property-commit-log-transfer-class, `commit.log.transfer.class`>>
|`io.debezium.connector.cassandra.BlackHoleCommitLogTransfer`
|The class used by CommitLogPostProcessor to move processed commit logs from relocation dir.
The built-in transfer class is `BlackHoleCommitLogTransfer`, which simply removes all processed commit logs from relocation dir.
Users are supposed to implement their own customized commit log transfer class if needed.
|[[cassandra-property-commit-log-error-reprocessing-enabled]]<<cassandra-property-commit-log-error-reprocessing-enabled, `commit.log.error.reprocessing.enabled`>>
|false
|Determines whether or not the CommitLogProcessor should re-process error commit logs.
|[[cassandra-property-converters]]<<cassandra-property-converters, `converters`>>
|No default
|Enumerates a comma-separated list of the symbolic names of the {link-prefix}:{link-custom-converters}#custom-converters[custom converter] instances that the connector can use.
For example, +
`isbn`
You must set the `converters` property to enable the connector to use a custom converter.
For each converter that you configure for a connector, you must also add a `.type` property, which specifies the fully-qualifed name of the class that implements the converter interface.
The `.type` property uses the following format: +
`_<converterSymbolicName>_.type` +
For example, +
isbn.type: io.debezium.test.IsbnConverter
If you want to further control the behavior of a configured converter, you can add one or more configuration parameters to pass values to the converter.
To associate any additional configuration parameter with a converter, prefix the parameter names with the symbolic name of the converter.
For example, +
isbn.schema.name: io.debezium.cassandra.type.Isbn
|[[cassandra-property-offset-backing-store-dir]]<<cassandra-property-offset-backing-store-dir, `offset.backing.store.dir`>>
|No default
|The directory to store offset tracking files.
|[[cassandra-property-offset-flush-interval-ms]]<<cassandra-property-offset-flush-interval-ms, `offset.flush.interval.ms`>>
|`0`
|The minimum amount of time to wait before committing the offset. The default value of 0 implies
the offset will be flushed every time.
|[[cassandra-property-max-offset-flush-size]]<<cassandra-property-max-offset-flush-size, `max.offset.flush.size`>>
|`100`
|The maximum records that are allowed to be processed until it is required to flush offset to disk. This config is effective only if offset_flush_interval_ms != 0.
|[[cassandra-property-max-queue-size]]<<cassandra-property-max-queue-size, `max.queue.size`>>
|`8192`
|Positive integer value that specifies the maximum size of the blocking queue into which change events read from the commit log are placed before they are written to Kafka.
This queue can provide back pressure to the commit log reader when, for example, writes to Kafka are slower or if Kafka is not available.
Events that appear in the queue are not included in the offsets periodically recorded by this connector.
Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.
The capacity of the queue to hold deserialized records before they are converted to Kafka Connect structs and emitted to Kafka.
|[[cassandra-property-max-batch-size]]<<cassandra-property-max-batch-size, `max.batch.size`>>
|`2048`
|The maximum number of change events to dequeue each time.
|[[cassandra-property-max-queue-size-in-bytes]]<<cassandra-property-max-queue-size-in-bytes, `max.queue.size.in.bytes`>>
|`0`
|A long integer value that specifies the maximum volume of the blocking queue in bytes.
By default, volume limits are not specified for the blocking queue.
To specify the number of bytes that the queue can consume, set this property to a positive long value. +
If xref:cassandra-property-max-queue-size[`max.queue.size`] is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property.
For example, if you set `max.queue.size=1000`, and `max.queue.size.in.bytes=5000`, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.
|[[cassandra-property-poll-interval-ms]]<<cassandra-property-poll-interval-ms, `poll.interval.ms`>>
|`1000`
|Positive integer value that specifies the number of milliseconds the commit log processor should wait during each iteration for new change events to appear in the queue. Defaults to 1000 milliseconds, or 1 second.
|[[cassandra-property-schema-refresh-interval-ms]]<<cassandra-property-schema-refresh-interval-ms, `schema.refresh.interval.ms`>>
|`10000`
|Positive integer value that specifies the number of milliseconds the schema processor should wait before
refreshing the cached Cassandra table schemas.
|[[cassandra-property-cdc-dir-poll-interval-ms]]<<cassandra-property-cdc-dir-poll-interval-ms, `cdc.dir.poll.interval.ms`>>
|`10000`
|The maximum amount of time to wait on each poll before reattempt.
|[[cassandra-property-snapshot-scan-interval-ms]]<<cassandra-property-snapshot-scan-interval-ms, `snapshot.scan.interval.ms`>>
|`10000`
|Positive integer value that specifies the number of milliseconds the snapshot processor should wait before re-scanning tables to look for new CDC-enabled tables.
Defaults to 10000 milliseconds, or 10 seconds.
|[[cassandra-property-tombstones-on-delete]]<<cassandra-property-tombstones-on-delete, `tombstones.on.delete`>>
|`false`
|Whether deletion events should have a subsequent tombstone event (true) or not (false).
It's important to note that in Cassandra, two events with the same key may be updating different columns of a given table.
So this could potentially result in records being lost during compaction if they have not been consumed by the consumer yet.
In other words, do NOT set this to true if you have Kafka compaction turned on.
|[[cassandra-property-field-exclude-list]]<<cassandra-property-field-exclude-list, `field.exclude.list`>>
|No default
|A comma-separated list of fully-qualified names of fields that should be excluded from change event message values.
Fully-qualified names for fields are in the form keyspace_name>.<field_name>.<nested_field_name>.
|[[cassandra-property-num-of-change-event-queues]]<<cassandra-property-num-of-change-event-queues, `num.of.change.event.queues`>>
|`1`
|The number of change event queues and queue processors. Defaults to 1.
|[[cassandra-property-skipped-operations]]<<cassandra-property-skipped-operations, `+skipped.operations+`>>
|`t`
|A comma-separated list of operation types that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates, `d` for deletes, `t` for truncates, and `none` to not skip any operations.
By default, truncate operations are skipped (not emitted by this connector).
|[[cassandra-property-topic-naming-strategy]]<<cassandra-property-topic-naming-strategy, `topic.naming.strategy`>>
|`io.debezium.schema.SchemaTopicNamingStrategy`
|The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to `SchemaTopicNamingStrategy`.
|[[cassandra-property-topic-delimiter]]<<cassandra-property-topic-delimiter, `topic.delimiter`>>
|`.`
|Specify the delimiter for topic name, defaults to `.`.
|[[cassandra-property-topic-prefix]]<<cassandra-property-topic-prefix, `topic.prefix`>>
|No default
|The name of the prefix to be used for all topics. +
+
[WARNING]
====
Do not change the value of this property.
If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
The connector is also unable to recover its database schema history topic.
====
|[[cassandra-property-topic-cache-size]]<<cassandra-property-topic-cache-size, `topic.cache.size`>>
|`10000`
|The size used for holding the topic names in bounded concurrent hash map.
This cache helps to determine the topic name corresponding to a given data collection.
|[[cassandra-property-topic-heartbeat-prefix]]<<cassandra-property-topic-heartbeat-prefix, `+topic.heartbeat.prefix+`>>
|`__debezium-heartbeat`
|Controls the name of the topic to which the connector sends heartbeat messages.
The topic name has the following pattern: +
+
_topic.heartbeat.prefix_._topic.prefix_ +
+
For example, if the database server name or topic prefix is `fulfillment`, the default topic name is `__debezium-heartbeat.fulfillment`.
|[[cassandra-property-varint-handling-mode]]<<cassandra-property-varint-handling-mode, `varint.handling.mode`>>
|`long`
|Specifies how `varint` columns should be represented in change events. Possible settings are: +
+
`long` (the default) represents values by using Java's `long`, which might not offer the precision but which is easy to use in consumers. +
+
`precise` uses `java.math.BigDecimal` to represent values, which are encoded in the change events by using a binary representation and Kafka Connect's `org.apache.kafka.connect.data.Decimal` type. +
+
`string` encodes values as formatted strings, which is easy to consume.
|[[cassandra-property-decimal-handling-mode]]<<cassandra-property-decimal-handling-mode, `decimal.handling.mode`>>
|`double`
|Specifies how `decimal` columns should be represented in change events. Possible settings are: +
+
`double` (the default) represents values by using Java's `double`, which might not offer the precision but which is easy to use in consumers. +
+
`precise` uses `java.math.BigDecimal` to represent values, which are encoded in the change events by using a binary representation and Kafka Connect's `org.apache.kafka.connect.data.VariableScaleDecimal` type. +
+
`string` encodes values as formatted strings, which is easy to consume.
|[[cassandra-property-schema-name-adjustment-mode]]<<cassandra-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|none
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
|[[cassandra-property-field-name-adjustment-mode]]<<cassandra-property-field-name-adjustment-mode,`+field.name.adjustment.mode+`>>
|none
|Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
See xref:{link-avro-serialization}#avro-naming[Avro naming] for more details.
|[[cassandra-property-custom-metric-tags]]<<cassandra-property-custom-metric-tags, `custom.metric.tags`>>
|`No default`
|The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended the end of regular name, each key would represent a tag for the MBean object name, and the corresponding value would be the value of that tag the key is. For example: `k1=v1,k2=v2`.
|[[cassandra-property-errors-max-retires]]<<cassandra-property-errors-max-retires, `errors.max.retries`>>
|`-1`
|The maximum number of retries on retriable errors (e.g. connection errors) before failing (-1 = no limit, 0 = disabled, > 0 = num of retries).
|===
If the Cassandra agent use SSL to connect to Cassandra node, an SSL config file is required.
The following example shows how to write the SSL config file:
[source,properties,indent=0]
----
keyStore.location=/var/private/ssl/cassandra.keystore.jks
keyStore.password=cassandra
keyStore.type=JKS
trustStore.location=/var/private/ssl/cassandra.truststore.jks
trustStore.password=cassandra
trustStore.type=JKS
keyManager.algorithm=SunX509
trustManager.algorithm=SunX509
cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
----
[NOTE]
====
The cipherSuites field is not mandatory, it simply allows you to add one (or more) ciphers that are not present.
The default value of trustStore.type and keyStore.type is JKS.
The default value of keyManager.algorithm and trustManager.algorithm is SunX509.
====
The connector also supports pass-through configuration properties that are used when creating the Kafka producer.
Specifically, all connector configuration properties that begin with the `kafka.producer.` prefix are used (without the prefix) when creating the Kafka producer that writes events to Kafka.
For example, the follwoing connector configuration properties can be used to {link-kafka-docs}.html#security_configclients[secure connections to the Kafka broker]:
[source,properties,indent=0]
----
kafka.producer.security.protocol=SSL
kafka.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.producer.ssl.keystore.password=test1234
kafka.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.producer.ssl.truststore.password=test1234
kafka.producer.ssl.key.password=test1234
kafka.consumer.security.protocol=SSL
kafka.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.consumer.ssl.keystore.password=test1234
kafka.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.consumer.ssl.truststore.password=test1234
kafka.consumer.ssl.key.password=test1234
----
Be sure to consult the {link-kafka-docs}.html[Kafka documentation] for all of the configuration properties for Kafka producers.
The connector supports the following Kafka Connect converters for key/value serialization:
[source,indent=0]
----
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
com.blueapron.connect.protobuf.ProtobufConverter
----