DBZ-6482 Included review comments

Co-authored-by: roldanbob <broldan@redhat.com>
This commit is contained in:
Jiri Pechanec 2023-10-11 07:59:42 +02:00
parent e3be69da6c
commit 201d870a09
2 changed files with 70 additions and 61 deletions

View File

@ -43,7 +43,7 @@ The following SMTs are provided by {prodname}:
|Converts {prodname} and Kafka Connect timestamp fields in event records to a specified timezone.
|xref:transformations/timescaledb.adoc[TimescaleDB Integration]
|Routes and enriches messages from TimescaleDB captured by {prodname} PostgreSQL connector.
|Routes and enriches messages that the {prodname} PostgreSQL connector captures from a TimescaleDB.
|===

View File

@ -13,53 +13,57 @@ toc::[]
link:https://github.com/timescale/timescaledb[TimescaleDB] is an open-source database designed to make SQL scalable for time-series data.
It is based on PostgreSQL database and implemented as an extension of it.
Debezium can capture the changes made in TimescaleDB's data.
For that purpose the regular PostgreSQL link:/documentation/reference/connectors/postgresql[connector] is used to read the raw data from the database and the `io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb` transformation is used to process the raw data, perfrom logical routing and add releavant metadata.
The {prodname} PostgreSQL connector can capture data changes from TimescaleDB.
The standard link:/documentation/reference/connectors/postgresql[PostgreSQL connector] reads the raw data from the database.
You can then use the `io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb` transformation to process the raw data, perform logical routing, and add relevant metadata.
== Instalation
== Installation
. Install TimescaleDB following the link:https://docs.timescale.com/[official documentation]
. Install PostgresSQL connector using Debezium link:/documentation/reference/install[installation guide]
. Configure TimescaleDB and deploy the connector
. Install TimescaleDB as described in the link:https://docs.timescale.com/[TimescaleDB documentation].
. Install the {prodname} PostgresSQL connector according to the instructions in the link:/documentation/reference/install[{prodname} installation guide].
. Configure TimescaleDB, and deploy the connector.
== How it works
There are three major functions in TimescaleDB that are interesting from data capture point of view:
{prodname} can capture events from the following TimescaleDB functions:
* Hypertables
* Continous aggregates
* Compression
All three are internally dependent and built upon PostgreSQL basic functionality - storing data in tables.
Debezium supports all three to a different level.
These three functions are internally dependent.
Each of the functions are built on the PostgreSQL functionality for storing data in tables.
{prodname} supports all three functions to differing degrees.
The SMT needs access to TimescaleDB metadata.
It does not have an acess to the database configuration at the connector level so it is necessary to repeat it at the transformation level.
Because the SMT cannot access the database configuration at the connector level, you must explicitly define configuration metadata for the transformation.
=== Hypertables
Hypertable is a logical table that is used to store time-series data.
The data are chunked (partitioned) according to a defined time-bound column.
TimescaleDB creates one or more phisical tables in its internal schema where each table represents a single chunk.
The connector captures changes in the distinct chunk tables and by default would stream them to separate topics, each corresonding to a single chunk.
The transformation reassembles the distinct streams into a single one:
Data is chunked (partitioned) according to a defined time-bound column.
TimescaleDB creates one or more physical tables in its internal schema, with each table representing a single chunk.
By default, the connector captures changes from each chunk table, and streams the changes to the individual topics that correspond to each chunk.
The Timescaledb transformation reassembles data from the separate topics, and then streams the reassembled data to a single topic.
* The transformation has an access to TimescaleDB metadata to obtain chunk/hypertable mapping
* The transformation re-routes the captured event from chunk-specific topic to a single logical one named accroding to pattern `<prefix>.<hypertable-schema-name>.<hypertable-name>`
* The transformation adds a headers to the event
** `__debezium_timescaledb_chunk_table` - the name of the physical table storing the event data
** `__debezium_timescaledb_chunk_schema` - the name of the schema to which the physical table belongs
* The transformation has access to TimescaleDB metadata to obtain chunk/hypertable mapping.
* The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern: `_<prefix>_._<hypertable-schema-name>_._<hypertable-name>_`
* The transformation adds the following headers to the event:
`__debezium_timescaledb_chunk_table`:: The name of the physical table that stores the event data.
`__debezium_timescaledb_chunk_schema`:: The name of the schema that the physical table belongs to.
.Example: Streaming of hypertable
.Example: Streaming data from a hypertable
A hypertable `conditions` is created in the `public` schema.
The following example shows a SQL command for creating a `conditions` hypertable in the `public` schema:
[source]
----
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');
----
The captured changes will be routed into the topic named `timescaledb.public.conditions` and the messages will be enriched with headers like
The Timescaledb SMT routes change events captured in the hypertable to a topic with the name `timescaledb.public.conditions`.
The transformation enriches event messages with headers that you define in the configuration.
For example:
[source]
----
__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
@ -68,24 +72,24 @@ __debezium_timescaledb_chunk_schema: _timescaledb_internal
== Continuous aggregates
Continuous aggregates provides automatic statistical calculations over data stored in hyperables.
Continuous aggregates provides automatic statistical calculations over data that is stored in hypertables.
The aggregate view is backed by its own hypertable which in turn is backed by a set of PostgreSQL tables.
The aggregates are recalculated automatically or manually.
When recalculated the new values are stored in the hypertable and can be captured and streamed.
The aggregates are again streamed into different topics based on the chunk in which they are stored.
The transformation reassembles the distinct streams into a single one:
The aggregates can be recalculated either automatically or manually.
After an aggregate is recalculated, the new values are stored in the hypertable, from which they can be captured and streamed.
Data from the aggregates is streamed to different topics, based on the chunk in which it is stored.
The Timescaledb transformation reassembles data that was streamed to different topics and routes it to a single topic.
* The transformation has an access to TimescaleDB metadata to obtain chunk/hypertable and hypertable/aggregate mapping
* The transformation re-routes the captured event from chunk-specific topic to a single logical one named accroding to pattern `<prefix>.<aggregate-schema-name>.<aggregate-name>`
* The transformation adds a headers to the event
** `__debezium_timescaledb_hypertable_table` - the name of the hypertable storing the continuous aggregate
** `__debezium_timescaledb_hypertable_schema` - the name of the schema in which the hypertable belongs
** `__debezium_timescaledb_chunk_table` - the name of the physical table storing the continuous aggregate
** `__debezium_timescaledb_chunk_schema` - the name of the schema to which the physical table belongs
* The transformation has access to TimescaleDB metadata to obtain mappings between chunks and hypertables, and between hypertables and aggregates.
* The transformation reroutes the captured events from their chunk-specific topics to a single logical topic that is named according to the following pattern `_<prefix>_._<aggregate-schema-name>_._<aggregate-name>_`.
* The transformation adds the following headers to the event:
`__debezium_timescaledb_hypertable_table`:: The name of the hypertable that stores the continuous aggregate.
`__debezium_timescaledb_hypertable_schema`:: The name of the schema that the hypertable belongs to.
`__debezium_timescaledb_chunk_table`:: The name of the physical table that stores the continuous aggregate.
`__debezium_timescaledb_chunk_schema`:: The name of the schema that the physical table belongs to.
.Example: Streaming of continuous aggregate
.Example: Streaming data from a continuous aggregate
A continuous aggregate `conditions_summary` is created in the `public` schema.
The following example shows a SQL command for creating a continuous aggregate `conditions_summary` in the `public` schema.
[source]
----
CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
@ -99,7 +103,9 @@ CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
GROUP BY location, bucket;
----
The captured changes will be routed into the topic named `timescaledb.public.conditions_summary` and the messages will be enriched with headers like
The TimescaleDB SMT routes the change events captured in the aggregate to a topic with the name `timescaledb.public.conditions_summary`.
The transformation enriches event messages with headers that you define in the configuration.
For example:
[source]
----
_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
@ -110,14 +116,15 @@ __debezium_timescaledb_hypertable_schema: _timescaledb_internal
=== Compression
In case of compression there is no specific functionality.
The compressed chunks are just forwarded downstream in the pipeline for further processing if needed.
Usually the messages with compressed chunks are dropped and not processed later in the pipeline.
The TimescaleDB SMT does not apply any special processing to compression functions.
Compressed chunks are forwarded unchanged to the next downstream job in the pipeline for further processing as needed.
Typically, messages with compressed chunks are dropped, and are not processed by subsequent jobs in the pipeline.
== TimescaleDB configuration
Debezium captures TimescaleDB/PostgreSQL changes via replication slots.
There are multiple implementations of message format used to store data in the slot but it is recommended to use link:/reference/connectors/postgresql.html#postgresql-pgoutput[pgoutput] decoder as it is by-default installed in a TimescaleDB instance.
{prodname} uses replication slots to capture changes from TimescaleDB and PostgreSQL.
Replication slots Data store data in multiple message formats.
Typically, it's best to configure {prodname} to use the link:/reference/connectors/postgresql.html#postgresql-pgoutput[pgoutput] decoder, the default decoder for TimescaleDB instances, to read from the slot.
To configure the replication slot, specify the following in the `postgresql.conf` file:
@ -128,19 +135,19 @@ wal_level = logical // <1>
----
<1> Instructs the server to use logical decoding with the write-ahead log.
To configure tables for replication it is necessary to create a publication:
To configure tables for replication, you must create a publication, as shown in the following example:
[source,properties]
----
CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
----
Publication can be created either globally as in the example above or per-table.
Given that TimescaleDB creates tables on the fly it is strongly recommended to use global setting.
You can create publications globally, as in the preceding example, or create separate publications for each table.
Because the TimescaleDB creates tables automatically, as needed, the use of global publications is strongly recommended.
== Connector configuration
The connector itself is configured in the same way as a plain PostgreSQL connector.
To make the connector TimescaleDB-aware the SMT needs to be enabled via configuration options
Configure the TimescaleDB SMT in the same way that you would configure the PostgreSQL connector.
To enable the connector to correctly process events from TimescaleDB, add the following options to the connector configuration:
[source,json]
----
"transforms": "timescaledb",
@ -154,11 +161,11 @@ To make the connector TimescaleDB-aware the SMT needs to be enabled via configur
=== Connector configuration example
Following is an example of the configuration for a PostgreSQL connector that connects to a TimescaleDB server on port 5432 at 192.168.99.100, whose logical name is `dbserver1`.
The following example shows the configuration for setting up a PostgreSQL connector to connect to a TimescaleDB server with the logical name `dbserver1` on port 5432 at 192.168.99.100.
Typically, you configure the {prodname} PostgreSQL connector in a JSON file by setting the configuration properties available for the connector.
You can choose to produce events for a subset of the schemas and tables in a database.
Optionally, you can ignore, mask, or truncate columns that contain sensitive data, are larger than a specified size, or that you do not need.
Optionally, you can ignore, mask, or truncate columns that contain sensitive data, that exceed a specified size, or that you do not need.
[source,json]
----
@ -191,12 +198,14 @@ Optionally, you can ignore, mask, or truncate columns that contain sensitive dat
<5> The name of the TimescaleDB user.
<6> The password for the TimescaleDB.
<7> The name of the TimescaleDB database to connect to.
<8> The topic prefix for the TimescaleDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
<9> Indicates the usage of `pgoutput` logical decoding plug-in.
<10> A list of all schemas containing TimescaleDB physicial tables.
<11> Enable the SMT to process raw TimescaleDB events.
<12> Enable the SMT to process raw TimescaleDB events.
<13> etc. Provide TimescaleDB connection information for SMT. The values must be the same as in case of items `3` - `7`.
<8> The topic prefix for the TimescaleDB server or cluster.
This prefix forms a namespace, and is used in the names of all Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema, when the Avro converter is used.
<9> Indicates use of the `pgoutput` logical decoding plug-in.
<10> A list of all schemas that contain TimescaleDB physical tables.
<11> Enables the SMT to process raw TimescaleDB events.
<12> Enables the SMT to process raw TimescaleDB events.
<13> Provides TimescaleDB connection information for the SMT.
The values must match the value of items `3` - `7`.
== Configuration options
@ -228,16 +237,16 @@ The following table lists the configuration options that you can set for the Tim
|[[timescaledb-property-database-dbname]]<<timescaledb-property-database-dbname, `+database.dbname+`>>
|No default
|The name of the TimescaleDB database from which to stream the changes.
|The name of the TimescaleDB database from which to stream changes.
|[[timescaledb-property-schema-list]]<<timescaledb-property-schema-list, `+schema.list+`>>
|`_timescaledb_internal`
|Comma-separated list schema names that contain TimescaleDB raw (internal) data tables.
The SMT will process only those changes that originates in one of the schemas in the list.
|Comma-separated list of schema names that contain TimescaleDB raw (internal) data tables.
The SMT processes only those changes that originate in one of the schemas in the list.
|[[timescaledb-property-target-topic-prefix]]<<timescaledb-property-target-topic-prefix, `+target.topic.prefix+`>>
|`timescaledb`
|The namespace (prefix) of topics where TimescaleDB events will be routed.
The SMT will route the messages into topics named `<prefix>.<schema>.<hypertable/aggregate>`.
|The namespace (prefix) of topics to which TimescaleDB events are routed.
The SMT routes messages into topics named `_<prefix>_._<schema>_._<hypertable|aggregate>_`.
|===