:page-aliases: configuration/timescaledb.adoc [id="timescaledb-integration"] = TimescaleDB Integration :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] link:https://github.com/timescale/timescaledb[TimescaleDB] is an open-source database designed to make SQL scalable for time-series data. It is based on PostgreSQL database and implemented as an extension of it. Debezium can capture the changes made in TimescaleDB's data. For that purpose the regular PostgreSQL link:/documentation/reference/connectors/postgresql[connector] is used to read the raw data from the database and the `io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb` transformation is used to process the raw data, perfrom logical routing and add releavant metadata. == Instalation . Install TimescaleDB following the link:https://docs.timescale.com/[official documentation] . Install PostgresSQL connector using Debezium link:/documentation/reference/install[installation guide] . Configure TimescaleDB and deploy the connector == How it works There are three major functions in TimescaleDB that are interesting from data capture point of view: * Hypertables * Continous aggregates * Compression All three are internally dependent and built upon PostgreSQL basic functionality - storing data in tables. Debezium supports all three to a different level. The SMT needs access to TimescaleDB metadata. It does not have an acess to the database configuration at the connector level so it is necessary to repeat it at the transformation level. === Hypertables Hypertable is a logical table that is used to store time-series data. The data are chunked (partitioned) according to a defined time-bound column. TimescaleDB creates one or more phisical tables in its internal schema where each table represents a single chunk. The connector captures changes in the distinct chunk tables and by default would stream them to separate topics, each corresonding to a single chunk. The transformation reassembles the distinct streams into a single one: * The transformation has an access to TimescaleDB metadata to obtain chunk/hypertable mapping * The transformation re-routes the captured event from chunk-specific topic to a single logical one named accroding to pattern `..` * The transformation adds a headers to the event ** `__debezium_timescaledb_chunk_table` - the name of the physical table storing the event data ** `__debezium_timescaledb_chunk_schema` - the name of the schema to which the physical table belongs .Example: Streaming of hypertable A hypertable `conditions` is created in the `public` schema. [source] ---- CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL); SELECT create_hypertable('conditions', 'time'); ---- The captured changes will be routed into the topic named `timescaledb.public.conditions` and the messages will be enriched with headers like [source] ---- __debezium_timescaledb_chunk_table: _hyper_1_1_chunk __debezium_timescaledb_chunk_schema: _timescaledb_internal ---- == Continuous aggregates Continuous aggregates provides automatic statistical calculations over data stored in hyperables. The aggregate view is backed by its own hypertable which in turn is backed by a set of PostgreSQL tables. The aggregates are recalculated automatically or manually. When recalculated the new values are stored in the hypertable and can be captured and streamed. The aggregates are again streamed into different topics based on the chunk in which they are stored. The transformation reassembles the distinct streams into a single one: * The transformation has an access to TimescaleDB metadata to obtain chunk/hypertable and hypertable/aggregate mapping * The transformation re-routes the captured event from chunk-specific topic to a single logical one named accroding to pattern `..` * The transformation adds a headers to the event ** `__debezium_timescaledb_hypertable_table` - the name of the hypertable storing the continuous aggregate ** `__debezium_timescaledb_hypertable_schema` - the name of the schema in which the hypertable belongs ** `__debezium_timescaledb_chunk_table` - the name of the physical table storing the continuous aggregate ** `__debezium_timescaledb_chunk_schema` - the name of the schema to which the physical table belongs .Example: Streaming of continuous aggregate A continuous aggregate `conditions_summary` is created in the `public` schema. [source] ---- CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS SELECT location, time_bucket(INTERVAL '1 hour', time) AS bucket, AVG(temperature), MAX(temperature), MIN(temperature) FROM conditions GROUP BY location, bucket; ---- The captured changes will be routed into the topic named `timescaledb.public.conditions_summary` and the messages will be enriched with headers like [source] ---- _debezium_timescaledb_chunk_table: _hyper_2_2_chunk __debezium_timescaledb_chunk_schema: _timescaledb_internal __debezium_timescaledb_hypertable_table: _materialized_hypertable_2 __debezium_timescaledb_hypertable_schema: _timescaledb_internal ---- === Compression In case of compression there is no specific functionality. The compressed chunks are just forwarded downstream in the pipeline for further processing if needed. Usually the messages with compressed chunks are dropped and not processed later in the pipeline. == TimescaleDB configuration Debezium captures TimescaleDB/PostgreSQL changes via replication slots. There are multiple implementations of message format used to store data in the slot but it is recommended to use link:/reference/connectors/postgresql.html#postgresql-pgoutput[pgoutput] decoder as it is by-default installed in a TimescaleDB instance. To configure the replication slot, specify the following in the `postgresql.conf` file: [source,properties] ---- # REPLICATION wal_level = logical // <1> ---- <1> Instructs the server to use logical decoding with the write-ahead log. To configure tables for replication it is necessary to create a publication: [source,properties] ---- CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update') ---- Publication can be created either globally as in the example above or per-table. Given that TimescaleDB creates tables on the fly it is strongly recommended to use global setting. == Connector configuration The connector itself is configured in the same way as a plain PostgreSQL connector. To make the connector TimescaleDB-aware the SMT needs to be enabled via configuration options [source,json] ---- "transforms": "timescaledb", "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", "transforms.timescaledb.database.hostname": "timescaledb", "transforms.timescaledb.database.port": "...", "transforms.timescaledb.database.user": "...", "transforms.timescaledb.database.password": "...", "transforms.timescaledb.database.dbname": "..." ---- === Connector configuration example Following is an example of the configuration for a PostgreSQL connector that connects to a TimescaleDB server on port 5432 at 192.168.99.100, whose logical name is `dbserver1`. Typically, you configure the {prodname} PostgreSQL connector in a JSON file by setting the configuration properties available for the connector. You can choose to produce events for a subset of the schemas and tables in a database. Optionally, you can ignore, mask, or truncate columns that contain sensitive data, are larger than a specified size, or that you do not need. [source,json] ---- { "name": "timescaledb-connector", // <1> "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // <2> "database.hostname": "192.168.99.100", // <3> "database.port": "5432", // <4> "database.user": "postgres", // <5> "database.password": "postgres", // <6> "database.dbname" : "postgres", // <7> "topic.prefix": "dbserver1", // <8> "plugin.name": "pgoutput", // <9> "schema.include.list": "_timescaledb_internal", // <10> "transforms": "timescaledb", // <11> "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", <12> "transforms.timescaledb.database.hostname": "timescaledb", // <13> "transforms.timescaledb.database.port": "5432", // <14> "transforms.timescaledb.database.user": "postgres", // <15> "transforms.timescaledb.database.password": "postgres", // <16> "transforms.timescaledb.database.dbname": "postgres" // <17> } } ---- <1> The name of the connector when registered with a Kafka Connect service. <2> The name of this PostgreSQL connector class. <3> The address of the TimescaleDB server. <4> The port number of the TimescaleDB server. <5> The name of the TimescaleDB user. <6> The password for the TimescaleDB. <7> The name of the TimescaleDB database to connect to. <8> The topic prefix for the TimescaleDB server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used. <9> Indicates the usage of `pgoutput` logical decoding plug-in. <10> A list of all schemas containing TimescaleDB physicial tables. <11> Enable the SMT to process raw TimescaleDB events. <12> Enable the SMT to process raw TimescaleDB events. <13> etc. Provide TimescaleDB connection information for SMT. The values must be the same as in case of items `3` - `7`. == Configuration options The following table lists the configuration options that you can set for the TimescaleDB integration SMT. .TimescaleDB integration SMT (`TimescaleDB`) configuration options [cols="30%a,25%a,45%a"] |=== |Property |Default |Description |[[timescaledb-property-database-hostname]]<> |No default |IP address or hostname of the TimescaleDB database server. |[[timescaledb-property-database-port]]<> |`5432` |Integer port number of the TimescaleDB database server. |[[timescaledb-property-database-user]]<> |No default |Name of the TimescaleDB database user for connecting to the TimescaleDB database server. |[[timescaledb-property-database-password]]<> |No default |Password to use when connecting to the TimescaleDB database server. |[[timescaledb-property-database-dbname]]<> |No default |The name of the TimescaleDB database from which to stream the changes. |[[timescaledb-property-schema-list]]<> |`_timescaledb_internal` |Comma-separated list schema names that contain TimescaleDB raw (internal) data tables. The SMT will process only those changes that originates in one of the schemas in the list. |[[timescaledb-property-target-topic-prefix]]<> |`timescaledb` |The namespace (prefix) of topics where TimescaleDB events will be routed. The SMT will route the messages into topics named `..`. |===