tet123/documentation/modules/ROOT/pages/connectors/postgresql.adoc
2019-11-09 08:52:59 +01:00

1746 lines
99 KiB
Plaintext

= Debezium Connector for PostgreSQL
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
Debezium's PostgreSQL Connector can monitor and record the row-level changes in the schemas of a PostgreSQL database.
The first time it connects to a PostgreSQL server/cluster, it reads a consistent snapshot of all of the schemas. When that snapshot is complete, the connector continuously streams the changes that were committed to PostgreSQL 9.6 or later and generates corresponding insert, update and delete events. All of the events for each table are recorded in a separate Kafka topic, where they can be easily consumed by applications and services.
The connector has been tested and works with Postgres 9.6, 10, 11 and 12.
[[overview]]
== Overview
PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html[_logical decoding_] feature was first introduced in version 9.4 and is a mechanism which allows the extraction of the changes which were committed to the transaction log and the processing of these changes in a user-friendly manner via the help of an https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_output plugin_]. This output plugin must be installed prior to running the PostgreSQL server and enabled together with a replication slot in order for clients to be able to consume the changes.
Debezium's PostgreSQL connector contains two different parts which work together in order to be able to read and process server changes:
* a logical decoding output plugin which has to be installed and configured in the PostgreSQL server, one of
** https://github.com/debezium/postgres-decoderbufs[decoderbufs] (maintained by the Debezium community, based on ProtoBuf)
** https://github.com/eulerto/wal2json[wal2json] (maintained by the wal2json community, based on JSON)
** pgoutput, the standard logical decoding plug-in in PostgreSQL 10+ (maintained by the Postgres community, used by Postgres itself for https://www.postgresql.org/docs/current/logical-replication-architecture.html[logical replication]);
this plug-in is always present, meaning that no additional libraries must be installed,
and the Debezium connector will interpret the raw replication event stream into change events directly.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the chosen plugin, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
The connector then produces a _change event_ for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
PostgreSQL normally purges WAL segments after some period of time. This means that the connector won't have the complete history of all changes that have been made to the database. Therefore, when the PostgreSQL connector first connects to a particular PostgreSQL database, it starts by performing a _consistent snapshot_ of each of the database schemas. After the connector completes the snapshot, it continues streaming changes from the exact point at which the snapshot was made. This way, we start with a consistent view of all of the data, yet continue reading without having lost any of the changes made while the snapshot was taking place.
The connector is also tolerant of failures. As the connector reads changes and produces events, it records the position in the write-ahead log with each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the WAL where it last left off. This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
[[limitations]]
[IMPORTANT]
====
The connector's functionality relies on PostgreSQL's logical decoding feature.
Please be aware of the following limitations which are also reflected by the connector:
. Logical Decoding does not support DDL changes: this means that the connector is unable to report DDL change events back to consumers.
. Logical Decoding replication slots are only supported on `primary` servers: this means that when there is a cluster of PostgreSQL servers, the connector can only run on the active `primary` server. It cannot run on `hot` or `warm` standby replicas. If the `primary` server fails or is demoted, the connector will stop. Once the `primary` has recovered the connector can simply be restarted. If a different PostgreSQL server has been promoted to `primary`, the connector configuration must be adjusted before the connector is restarted. Make sure you read more about how the connector behaves link:#when-things-go-wrong[when things go wrong].
====
[IMPORTANT]
====
Debezium currently supports only database with UTF-8 character encoding.
With a single byte character encoding it is not possible to correctly process strings containing extended ASCII code characters.
====
[[setting-up-PostgreSQL]]
== Setting up PostgreSQL
Before using the Debezium PostgreSQL connector to monitor the changes committed on a PostgreSQL server,
first decide which logical decoder method you intend to use.
If you plan *not* to use the native pgoutput logical replication stream support,
then you will need to install the logical decoding plugin into the PostgreSQL server.
Afterward enable a replication slot, and configure a user with sufficient privileges to perform the replication.
Note that *if your database is hosted by a service* such as https://www.heroku.com/postgres[Heroku Postgres] you may be unable to install the plugin.
If so, and if you're using PostgreSQL 10+, you can use the pgoutput decoder support to monitor your database.
If that isn't an option, you'll be unable to monitor your database with Debezium.
[[amazon-rds]]
=== PostgreSQL on Amazon RDS
It is possible to monitor PostgreSQL database running in https://aws.amazon.com/rds/[Amazon RDS]. To get it running you must fulfill the following conditions
* The instance parameter `rds.logical_replication` is set to `1`.
* Verify that `wal_level` parameter is set to `logical` by running the query `SHOW wal_level` as DB master user; this might not be the case in multi-zone replication setups.
You cannot set this option manually, it's (https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithParamGroups.html[automatically changed]) when the `rds.logical_replication` is set to `1`.
If the `wal_level` is not `logical` after the change above, it's probably because the instance has to be restarted due to the parameter group change, it'll happen accordingly to your maintenance window or can be done manually.
* Set `plugin.name` Debezium parameter to `wal2json`. You can skip this on PostgreSQL 10+ if you wish to use pgoutput logical replication stream support.
* Use database master account for replication as RDS currently does not support setting of `REPLICATION` privilege for another account.
[IMPORTANT]
====
You should make sure to use the latest versions of Postgres 9.6, 10 or 11 on Amazon RDS.
Otherwise, older versions of the wal2json plug-in may be installed
(see https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html[the official documentation] for the exact wal2json versions installed on Amazon RDS).
In that case, replication messages received from the database may not carry complete information about type constraints like length or scale or `NULL`/`NOT NULL`,
which in turn might cause creation of messages with an inconsistent schema for a short period of time in case of changes to a column's definition.
As of January 2019, the following Postgres versions on RDS come with an up-to-date version of wal2json and thus should be used:
* Postgres 9.6: 9.6.10 and newer
* Postgres 10: 10.5 and newer
* Postgres 11: any version
====
[[output-plugin]]
=== Installing the Logical Decoding Output Plug-in
[TIP]
====
Also see xref:postgres-plugins.adoc[Logical Decoding Output Plug-in Installation for PostgreSQL] for more detailed instructions of setting up and testing logical decoding plug-ins.
====
[NOTE]
====
As of Debezium 0.10, the connector supports PostgreSQL 10+ logical replication streaming using _pgoutput_.
This means that a logical decoding output plug-in is no longer necessary and changes can be emitted directly from the replication stream by the connector.
====
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to first install a logical decoding output plugin. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Plugins use a number of PostgreSQL specific APIs, as described by the https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_PostgreSQL documentation_].
Debezium's PostgreSQL connector works with one of Debezium's supported logical decoding plugin to encode the changes in either https://github.com/google/protobuf[_Protobuf format_] or http://www.json.org/[_JSON_] format.
See the documentation of your chosen plugin (https://github.com/debezium/postgres-decoderbufs/blob/master/README.md[_protobuf_], https://github.com/eulerto/wal2json/blob/master/README.md[_wal2json_]) to learn more about the plugin's requirements, limitations, and how to compile it.
For simplicity, Debezium also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plugins. We recommend https://github.com/debezium/docker-images/tree/master/postgres/9.6[_using this image_] as an example of the detailed steps required for the installation.
[WARNING]
====
The Debezium logical decoding plugins have only been installed and tested on _Linux_ machines.
For Windows and other operating systems it may require different installation steps.
====
[[discrepance-between-plugins]]
==== Differences Between Plug-ins
The plugins' behaviour is not completely same for all cases.
So far these differences have been identified:
* wal2json plug-in is not able to process quoted identifiers (https://github.com/eulerto/wal2json/issues/35[issue])
* wal2json plug-in does not emit events for tables without primary keys
* wal2json plug-in does not support special values (`NaN` or `infinity`) for floating point types
* wal2json should be used with setting the `schema.refresh.mode` connector option to `columns_diff_exclude_unchanged_toast`;
otherwise, when receiving a change event for a row containing an unchanged TOAST column, no field for that column will be contained in the emitted change event's `after` structure.
This is because wal2json's messages won't contain a field for such column.
The requirement for adding this is tracked under the wal2json https://github.com/eulerto/wal2json/issues/98[issue 98].
See the documentation of `columns_diff_exclude_unchanged_toast` further below for implications of using it.
* pgoutput plug-in does not emit events for tables without primary keys
All up-to-date differences are tracked in a test suite https://github.com/debezium/debezium/blob/master/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java[Java class].
[[server-configuration]]
==== Configuring the PostgreSQL Server
If you are using one of the supported link:#output-plugin[logical decoding plug-ins] (i.e. not pgoutput) and it has been installed,
configure the server to load the plugin at startup:
*postgresql.conf*
[source]
----
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json' //<1>
----
<1> tells the server that it should load at startup the `decoderbufs` and `wal2json` logical decoding plugins (the names of the plugins are set in https://github.com/debezium/postgres-decoderbufs/blob/v0.3.0/Makefile[_Protobuf_] and https://github.com/eulerto/wal2json/blob/master/Makefile[_wal2json_] Makefiles)
Next is to configure the replication slot regardless of the decoder being used:
*postgresql.conf*
[source]
----
# REPLICATION
wal_level = logical //<1>
max_wal_senders = 1 //<2>
max_replication_slots = 1 //<3>
----
<1> tells the server that it should use logical decoding with the write-ahead log
<2> tells the server that it should use a maximum of `1` separate processes for processing WAL changes
<3> tells the server that it should allow a maximum of `1` replication slots to be created for streaming WAL changes
Debezium uses PostgreSQL's logical decoding, which uses replication slots.
Replication slots are guaranteed to retain all WAL required for Debezium even during Debezium outages.
It is important for this reason to closely monitor replication slots to avoid too much disk consumption and other conditions that can happen such as catalog bloat if a replication slot stays unused for too long.
For more information please see the official Postgres docs on https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS[this subject].
[TIP]
====
We strongly recommend reading and understanding https://www.postgresql.org/docs/current/static/wal-configuration.html[the official documentation] regarding the mechanics and configuration of the PostgreSQL write-ahead log.
====
[[PostgreSQL-permissions]]
=== Setting up Permissions
Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts.
In order to give a user replication permissions, define a PostgreSQL role that has _at least_ the `REPLICATION` and `LOGIN` permissions. For example:
[source]
----
CREATE ROLE name REPLICATION LOGIN;
----
[TIP]
====
Superusers have by default both of the above roles.
====
Finally, configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running:
*pg_hba.conf*
[source]
----
local replication <youruser> trust //<1>
host replication <youruser> 127.0.0.1/32 trust //<2>
host replication <youruser> ::1/128 trust //<3>
----
<1> tells the server to allow replication for `<youruser>` locally (i.e. on the server machine)
<2> tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV4`
<3> tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV6`
[TIP]
====
See https://www.postgresql.org/docs/current/static/datatype-net-types.html[_the PostgreSQL documentation_] for more information on network masks.
====
[[supported-PostgreSQL-topologies]]
== Supported PostgreSQL Topologies
The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.
As mentioned link:#limitations[in the beginning], PostgreSQL (for all versions <= 12) only supports logical replication slots on `primary` servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the Debezium PostgreSQL Connector can only connect and communicate with the primary server. Should this server fail, the connector will stop. When the cluster is repaired, if the original primary server is once again promoted to `primary`, the connector can simply be restarted. However, if a different PostgreSQL server _with the plugin and proper configuration_ is promoted to `primary`, the connector configuration must be changed to point to the new `primary` server and then can be restarted.
[[wal-disk-space]]
== WAL Disk Space Consumption
In certain cases, it is possible that PostgreSQL disk space consumed by WAL files either experiences spikes or increases out of usual proportions.
There are three potential reasons that explain the situation:
* Debezium regularly confirms LSN of processed events to the database.
This is visible as `confirmed_flush_lsn` in the `pg_replication_slots` slots table.
The database is responsible for reclaiming the disk space and the WAL size can be calculated from `restart_lsn` of the same table.
So if the `confirmed_flush_lsn` is regularly increasing and `restart_lsn` lags then the database does need to reclaim the space.
Disk space is usually reclaimed in batch blocks so this is expected behaviour and no action on a user's side is necessary.
* There are many updates in a monitored database but only a minuscule amount relates to the monitored table(s) and/or schema(s).
This situation can be easily solved by enabling periodic heartbeat events using `heartbeat.interval.ms` configuration option.
* The PostgreSQL instance contains multiple databases where one of them is a high-traffic database.
Debezium monitors another database that is low-traffic in comparison to the other one.
Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked.
As WAL is shared by all databases it tends to grow until an event is emitted by the database monitored by Debezium.
To overcome the third cause it is necessary to
* enable periodic heartbeat record generation using the `heartbeat.interval.ms` configuration option
* regularly emit change events from the database tracked by Debezium
** In the case of `wal2json` decoder plugin, it is sufficient to generate empty events.
This can be achieved for example by truncating an empty temporary table.
** For other decoder plug-ins, it is recommended to create a supplementary table that is not monitored by Debezium.
A separate process would then periodically update the table (either inserting a new event or updating the same row all over).
PostgreSQL then will invoke Debezium which will confirm the lastest LSN and allow the database to reclaim the WAL space.
[[how-the-postgresql-connector-works]]
== How the PostgreSQL connector works
[[snapshots]]
=== Snapshots
Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments, so the PostgreSQL connector would be unable to see the entire history of the database by simply reading the WAL. So, by default the connector will upon first startup perform an initial _consistent snapshot_ of the database. Each snapshot consists of the following steps (when using the builtin snapshot modes, *custom* snapshot modes may override this):
1. Start a transaction with a https://www.postgresql.org/docs/current/static/sql-set-transaction.html[SERIALIZABLE, READ ONLY, DEFERRABLE] isolation level to ensure that all subsequent reads within this transaction are done against a single consistent version of the data. Any changes to the data due to subsequent `INSERT`, `UPDATE`, and `DELETE` operations by other clients will not be visible to this transaction.
2. Obtain a `ACCESS SHARE MODE` lock on each of the monitored tables to ensure that no structural changes can occur to any of the tables while the snapshot is taking place. Note that these locks do not prevent table `INSERTS`, `UPDATES` and `DELETES` from taking place during the operation. _This step is omitted when using the exported snapshot mode to allow for a lock-free snapshots_.
3. Read the current position in the server's transaction log.
4. Scan all of the database tables and schemas, and generate a `READ` event for each row and write that event to the appropriate table-specific Kafka topic.
5. Commit the transaction.
6. Record the successful completion of the snapshot in the connector offsets.
If the connector fails, is rebalanced, or stops after Step 1 begins but before Step 6 completes, upon restart the connector will begin a new snapshot. Once the connector does complete its initial snapshot, the PostgreSQL connector then continues streaming from the position read during step 3, ensuring that it does not miss any updates. If the connector stops again for any reason, upon restart it will simply continue streaming changes from where it previously left off.
A second snapshot mode allows the connector to perform snapshots *always*. This behavior tells the connector to _always_ perform a snapshot when it starts up, and after the snapshot completes to continue streaming changes from step 3 in the above sequence. This mode can be used in cases when it's known that some WAL segments have been deleted and are no longer available, or in case of a cluster failure after a new primary has been promoted so that the connector doesn't miss out on any potential changes that could've taken place after the new primary had been promoted but before the connector was restarted on the new primary.
The third snapshot mode instructs the connector to *never* performs snapshots. When a new connector is configured this way, if will either continue streaming changes from a previous stored offset or it will start from the point in time when the PostgreSQL logical replication slot was first created on the server. Note that this mode is useful only when you know all data of interest is still reflected in the WAL.
The fourth snapshot mode, *initial only*, will perform a database snapshot and then stop before streaming any other changes. If the connector had started but did not complete a snapshot before stopping, the connector will restart the snapshot process and stop once the snapshot completes.
The fifth snapshot mode, *exported*, will perform a database snapshot based on the point in time when the replication slot was created. This mode is an excellent way to perform a snapshot in a lock-free way.
The final snapshot mode, *custom*, allows the user to inject their own implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface via the `snapshot.custom.class` configuration property, with the class on the classpath of your Kafka Connect cluster (or included in the JAR if using the `EmbeddedEngine`). For more details, see the link:#custom-snapshot[Custom Snapshot] section.
[[custom-snapshot]]
=== Custom Snapshotter SPI
For more advanced usages, the user can provide an implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. This interfaces allows control of most of the aspects of how snapshots operate, such as whether to take a snapshot or not and the way the options used to open the snapshot transaction or take locks.
The full API of the interface can be seen here:
[source,java,indent=0]
----
**
* This interface is used to determine details about the snapshot process:
*
* Namely:
* - Should a snapshot occur at all
* - Should streaming occur
* - What queries should be used to snapshot
*
* While many default snapshot modes are provided with debezium (see documentation for details)
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
* Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()}
* or true for both.
*/
@Incubating
public interface Snapshotter {
void init(PostgresConnectorConfig config, OffsetState sourceInfo,
SlotState slotState);
/**
* @return true if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
/**
* @return true if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return true if when creating a slot, a snapshot should be exported, which
* can be used as an alternative to taking a lock
*/
default boolean exportSnapshot() {
return false;
}
/**
* Generate a valid postgres query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> buildSnapshotQuery(TableId tableId);
/**
* Return a new string that set up the transaction for snapshotting
*
* @param newSlotInfo if a new slow was created for snapshotting, this contains information from
* the `create_replication_slot` command
*/
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo) {
// we're using the same isolation level that pg_backup uses
return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
}
/**
* Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter
* implementation.
*/
default Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) {
String lineSeparator = System.lineSeparator();
StringBuilder statements = new StringBuilder();
statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator);
// we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot
// this does not prevent writes to the table, but prevents changes to the table's schema....
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
tableIds.forEach(tableId -> statements.append("LOCK TABLE ")
.append(tableId.toDoubleQuotedString())
.append(" IN ACCESS SHARE MODE;")
.append(lineSeparator));
return Optional.of(statements.toString());
}
}
----
All of the builtin snapshot modes are implemented in terms of this interface as well.
[[streaming-changes]]
=== Streaming Changes
The PostgreSQL connector will typically spend the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on https://www.postgresql.org/docs/current/static/protocol-replication.html[_PostgreSQL's replication protocol_] where the client can receive changes from the server as they are committed in the server's transaction log at certain positions (also known as `Log Sequence Numbers` or in short LSNs).
Whenever the server commits a transaction, a separate server process invokes a callback function from the link:#output-plugin[logical decoding plugin]. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of Debezium plugin) and writes them on an output stream which can then be consumed by clients.
The PostgreSQL connector acts as a PostgreSQL client, and when it receives these changes it transforms the events into Debezium _create_, _update_, or _delete_ events that include the LSN position of the event. The PostgreSQL connector forwards these change events to the Kafka Connect framework (running in the same process), which then asynchronously writes them in the same order to the appropriate Kafka topic. Kafka Connect uses the term _offset_ for the source-specific position information that Debezium includes with each event, and Kafka Connect periodically records the most recent offset in another Kafka topic.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all events to Kafka, and records the last offset received from each connector. Upon restart, Kafka Connect reads the last recorded offset for each connector, and starts the connector from that point. The PostgreSQL connector uses the LSN recorded in each change event as the offset, so that upon restart the connector requests the PostgreSQL server send it the events starting just after that position.
[NOTE]
====
The PostgreSQL connector retrieves the schema information as part of the events sent by the logical decoder plug-in.
The only exception is the information about which columns compose the primary key, as this information is obtained from the JDBC metadata (side channel).
If the primary key definition of a table changes (by adding, removing or renaming PK columns),
then there exists a slight risk of an unfortunate timing when the primary key information from JDBC
will not be synchronized with the change data in the logical decoding event and a small amount of messages will be created with an inconsistent key structure.
If this happens then a restart of the connector and a reprocessing of the messages will fix the issue.
To prevent the issue completely it is recommended to synchronize updates to the primary key structure with Debezium roughly using following sequence of operations:
* Put the database or an application into a read-only mode
* Let Debezium process all remaining events
* Stop Debezium
* Update the primary key definition
* Put the database or the application into read/write state and start Debezium again
====
[[pgoutput]]
=== PostgreSQL 10+ Logical Decoding Support (pgoutput)
As of PostgreSQL 10+, a new logical replication stream mode was introduced, called _pgoutput_. This logical replication stream mode is natively supported by PostgreSQL,
which means that this connector can consume that replication stream
without the need for additional plug-ins being installed.
This is particularly valuable for environments where installation of plug-ins isn't supported or allowed.
See link:#setting-up-PostgreSQL[Setting up PostgreSQL] for more details.
[[topic-names]]
=== Topics Names
The PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is _serverName_._schemaName_._tableName_ where _serverName_ is the logical name of the connector as specified with the `database.server.name` configuration property, _schemaName_ is the name of the database schema where the operation occurred, and _tableName_ is the name of the database table on which the operation occurred.
For example, consider a PostgreSQL installation with a `postgres` database and an `inventory` schema that contains four tables: `products`, `products_on_hand`, `customers`, and `orders`. If the connector monitoring this database were given a logical server name of `fulfillment`, then the connector would produce events on these four Kafka topics:
* `fulfillment.inventory.products`
* `fulfillment.inventory.products_on_hand`
* `fulfillment.inventory.customers`
* `fulfillment.inventory.orders`
If on the other hand the tables were not part of a specific schema but rather created in the default `public` PostgreSQL schema, then the name of the Kafka topics would be:
* `fulfillment.public.products`
* `fulfillment.public.products_on_hand`
* `fulfillment.public.customers`
* `fulfillment.public.orders`
[[meta-info]]
=== Meta Information
Each `record` produced by the PostgreSQL connector has, in addition to the link:#events[_database event_], some meta-information about where the event occurred on the server, the name of the source partition and the name of the Kafka topic and partition where the event should be placed:
[source,json,indent=0]
----
"sourcePartition": {
"server": "fulfillment"
},
"sourceOffset": {
"lsn": "24023128",
"txId": "555",
"ts_usec": "1482918357011699"
},
"kafkaPartition": null
----
The PostgreSQL connector uses only 1 Kafka Connect _partition_ and it places the generated events into 1 Kafka partition. Therefore, the name of the `sourcePartition` will always default to the name of the `database.server.name` configuration property, while the `kafkaPartition` has the value `null` which means that the connector does not use a specific Kafka partition.
The `sourceOffset` portion of the message contains information about the location of the server where the event occurred:
* `lsn` represents the PostgreSQL https://www.postgresql.org/docs/current/static/datatype-pg-lsn.html[_log sequence number_] or `offset` in the transaction log
* `txId` represents the identifier of the server transaction which caused the event
* `ts_usec` represents the number of microseconds since Unix Epoch as the server time at which the transaction was committed
[[events]]
=== Events
All data change events produced by the PostgreSQL connector have a key and a value, although the structure of the key and value depend on the table from which the change events originated (see link:#topic-names[Topic names]).
[NOTE]
====
Starting with Kafka 0.10, Kafka can optionally record with the message key and value the http://kafka.apache.org/documentation.html#upgrade_10_performance_impact[_timestamp_] at which the message was created (recorded by the producer) or written to the log by Kafka.
====
[WARNING]
====
The Debezium PostgreSQL connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names]. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]), and the remaining characters in the logical server name and all characters in the schema and table names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.
This can lead to unexpected conflicts when the logical server name, schema names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores.
====
Debezium and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a _schema_ and _payload_. The schema describes the structure of the payload, while the payload contains the actual data.
[[change-events-key]]
==== Change Event's Key
For a given table, the change event's key will have a structure that contains a field for each column in the primary key (or unique key constraint with `REPLICA IDENTITY` set to `FULL` or `USING INDEX` on the table) of the table at the time the event was created.
Consider a `customers` table defined in the `public` database schema:
[source,sql,indent=0]
----
CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
----
If the `database.server.name` configuration property has the value `PostgreSQL_server`, every change event for the `customers` table while it has this definition will feature the same key structure, which in JSON looks like this:
[source,json,indent=0]
----
{
"schema": {
"type": "struct",
"name": "PostgreSQL_server.public.customers.Key",
"optional": false,
"fields": [
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": {
"id": "1"
},
}
----
The `schema` portion of the key contains a Kafka Connect schema describing what is in the key portion, and in our case that means that the `payload` value is not optional, is a structure defined by a schema named `PostgreSQL_server.public.customers.Key`, and has one required field named `id` of type `int32`. If we look at the value of the key's `payload` field, we'll see that it is indeed a structure (which in JSON is just an object) with a single `id` field, whose value is `1`.
Therefore, we interpret this key as describing the row in the `public.customers` table (output from the connector named `PostgreSQL_server`) whose `id` primary key column had a value of `1`.
[NOTE]
====
Although the `column.blacklist` configuration property allows you to remove columns from the event values, all columns in a primary or unique key are always included in the event's key.
====
[WARNING]
====
If the table does not have a primary or unique key, then the change event's key will be null. This makes sense since the rows in a table without a primary or unique key constraint cannot be uniquely identified.
====
[[change-events-value]]
==== Change Event's Value
The value of the change event message is a bit more complicated. Like the message key, it has a _schema_ section and _payload_ section. The payload section of every change event value produced by the PostgreSQL connector has an _envelope_ structure with the following fields:
* `op` is a mandatory field that contains a string value describing the type of operation. Values for the PostgreSQL connector are `c` for create (or insert), `u` for update, `d` for delete, and `r` for read (in the case of a snapshot).
* `before` is an optional field that if present contains the state of the row _before_ the event occurred. The structure will be described by the `PostgreSQL_server.public.customers.Value` Kafka Connect schema, which the `PostgreSQL_server` connector uses for all rows in the `public.customers` table.
[WARNING]
====
Whether or not this field is available is highly dependent on the link:#replica-identity[_REPLICA IDENTITY_] setting for each table
====
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `PostgreSQL_server.public.customers.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of PostgreSQL contains several fields: the Debezium version, the connector name, the name of the affected database, schema and table, whether the event is part of an ongoing snapshot or not and the same fields from the record's link:#meta-info[_meta information_] section
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
And of course, the _schema_ portion of the event message's value contains a schema that describes this envelope structure and the nested fields within it.
[[replica-identity]]
===== Replica Identity
https://www.postgresql.org/docs/current/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY[REPLICA IDENTITY] is a PostgreSQL specific table-level setting which determines the amount of information that is available to `logical decoding` in case of `UPDATE` and `DELETE` events. More specifically, this controls what (if any) information is available regarding the previous values of the table columns involved, whenever one of the aforementioned events occur.
There are 4 possible values for `REPLICA IDENTITY`:
* DEFAULT - `UPDATE` and `DELETE` events will only contain the previous values for the primary key columns of a table
* NOTHING - `UPDATE` and `DELETE` events will not contain any information about the previous value on any of the table columns
* FULL - `UPDATE` and `DELETE` events will contain the previous values of all the table's columns
* INDEX `index name` - `UPDATE` and `DELETE` events will contain the previous values of the columns contained in the index definition named `index name`
[[create-events]]
===== Create Events
Let's look at what a _create_ event value might look like for our `customers` table:
[source,json,indent=0,subs="attributes"]
----
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "PostgreSQL_server.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "{debezium-version}",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": true,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c",
"ts_ms": 1559033904863
}
}
----
If we look at the `schema` portion of this event's _value_, we can see the schema for the _envelope_, the schema for the `source` structure (which is specific to the PostgreSQL connector and reused across all events), and the table-specific schemas for the `before` and `after` fields.
[TIP]
====
The names of the schemas for the `before` and `after` fields are of the form _logicalName_._schemaName_._tableName_.Value, and thus are entirely independent from all other schemas for all other tables. This means that when using the link:/docs/faq/#avro-converter[Avro Converter], the resulting Avro schemas for _each table_ in each _logical source_ have their own evolution and history.
====
If we look at the `payload` portion of this event's _value_, we can see the information in the event, namely that it is describing that the row was created (since `op=c`), and that the `after` field value contains the values of the new inserted row's' `id`, `first_name`, `last_name`, and `email` columns.
[TIP]
====
It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message. It is possible and even recommended to use the link:/docs/faq/#avro-converter[Avro Converter] to dramatically decrease the size of the actual messages written to the Kafka topics.
====
[[update-events]]
===== Update Events
The value of an _update_ change event on this table will actually have the exact same _schema_, and its payload will be structured the same but will hold different values. Here's an example:
[source,json,indent=0,subs="attributes"]
----
{
"schema": { ... },
"payload": {
"before": {
"id": 1
},
"after": {
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "{debezium-version}",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": null,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 24023128,
"xmin": null
},
"op": "u",
"ts_ms": 1465584025523
}
}
----
When we compare this to the value in the _insert_ event, we see a couple of differences in the `payload` section:
* The `op` field value is now `u`, signifying that this row changed because of an update
* The `before` field now has the state of the row with the values before the database commit, but only for the primary key column `id`. This is because the link:#replica-identity[_REPLICA IDENTITY_] which is by default `DEFAULT`.
[TIP]
====
Should we want to see the previous values of all the columns for the row, we would have to change the `customers` table first by running
`ALTER TABLE customers REPLICA IDENTITY FULL`
====
* The `after` field now has the updated state of the row, and here was can see that the `first_name` value is now `Anne Marie`.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the WAL.
* The `ts_ms` shows the timestamp that Debezium processed this event.
There are several things we can learn by just looking at this `payload` section. We can compare the `before` and `after` structures to determine what actually changed in this row because of the commit. The `source` structure tells us information about PostgreSQL's record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same PostgreSQL commit as other events.
[NOTE]
====
When the columns for a row's primary/unique key are updated, the value of the row's key has changed so Debezium will output _three_ events: a `DELETE` event and link:#tombstone-events[tombstone event] with the old key for the row, followed by an `INSERT` event with the new key for the row.
====
[[delete-events]]
===== Delete Events
So far we've seen samples of _create_ and _update_ events. Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value will be exactly the same as with the _create_ and _update_ events:
[source,json,indent=0,subs="attributes"]
----
{
"schema": { ... },
"payload": {
"before": {
"id": 1
},
"after": null,
"source": {
"version": "{debezium-version}",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": null,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "d",
"ts_ms": 1465581902461
}
}
----
If we look at the `payload` portion, we see a number of differences compared with the _create_ or _update_ event payloads:
* The `op` field value is now `d`, signifying that this row was deleted
* The `before` field now has the state of the row that was deleted with the database commit. Again this only contains the primary key column due to the link:#replica-identity[_REPLICA IDENTITY_] setting
* The `after` field is null, signifying that the row no longer exists
* The `source` field structure has many of the same values as before, except the `ts_usec`, `lsn` and `txId` fields have changed
* The `ts_ms` shows the timestamp that Debezium processed this event.
This event gives a consumer all kinds of information that it can use to process the removal of this row.
[WARNING]
====
Please pay attention to the tables without PK, any delete messages from such table with REPLICA IDENTITY DEFAULT will have no `before` part (because they have no PK which is the only field for the default identity level) and therefore will be skipped as totally empty.
To be able to process messages from tables without PK set REPLICA IDENTITY to FULL level.
====
The PostgreSQL connector's events are designed to work with https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Kafka log compaction], which allows for the removal of some older messages as long as at least the most recent message for every key is kept. This allows Kafka to reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.
[[tombstone-events]]
When a row is deleted, the _delete_ event value listed above still works with log compaction, since Kafka can still remove all earlier messages with that same key. But only if the message value is `null` will Kafka know that it can remove _all messages_ with that same key. To make this possible, Debezium's PostgreSQL connector always follows the _delete_ event with a special _tombstone_ event that has the same key but `null` value.
[[data-types]]
=== Data Types
As described above, the PostgreSQL connector represents the changes to rows with events that are structured like the table in which the row exist. The event contains a field for each column value, and how that value is represented in the event depends on the PostgreSQL data type of the column. This section describes this mapping.
The following table describes how the connector maps each of the PostgreSQL data types to a _literal type_ and _semantic type_ within the events' fields.
Here, the _literal type_ describes how the value is literally represented using Kafka Connect schema types, namely `INT8`, `INT16`, `INT32`, `INT64`, `FLOAT32`, `FLOAT64`, `BOOLEAN`, `STRING`, `BYTES`, `ARRAY`, `MAP`, and `STRUCT`.
The _semantic type_ describes how the Kafka Connect schema captures the _meaning_ of the field using the name of the Kafka Connect schema for the field.
[cols="20%a,15%a,30%a,35%a",width=150,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`BOOLEAN`
|`BOOLEAN`
|n/a
|
|`BIT(1)`
|`BOOLEAN`
|n/a
|
|`BIT( > 1)`, `BIT VARYING[(M)]`
|`BYTES`
|`io.debezium.data.Bits`
|The `length` schema parameter contains an integer representing the number of bits. The resulting `byte[]` will contain the bits in little-endian form and will be sized to contain at least the specified number of bits (e.g., `numBytes = n/8 + (n%8== 0 ? 0 : 1)` where `n` is the number of bits).
|`SMALLINT`, `SMALLSERIAL`
|`INT16`
|n/a
|
|`INTEGER`, `SERIAL`
|`INT32`
|n/a
|
|`BIGINT`, `BIGSERIAL`
|`INT64`
|n/a
|
|`REAL`
|`FLOAT32`
|n/a
|
|`DOUBLE PRECISION`
|`FLOAT64`
|n/a
|
|`CHAR[(M)]`
|`STRING`
|n/a
|
|`VARCHAR[(M)]`
|`STRING`
|n/a
|
|`CHARACTER[(M)]`
|`STRING`
|n/a
|
|`CHARACTER VARYING[(M)]`
|`STRING`
|n/a
|
|`TIMESTAMPTZ`, `TIMESTAMP WITH TIME ZONE`
|`STRING`
|`io.debezium.time.ZonedTimestamp`
| A string representation of a timestamp with timezone information, where the timezone is GMT
|`TIMETZ`, `TIME WITH TIME ZONE`
|`STRING`
|`io.debezium.time.ZonedTime`
| A string representation of a time value with timezone information, where the timezone is GMT
|`INTERVAL [P]`
|`INT64`
|`io.debezium.time.MicroDuration` +
(default)
|The approximate number of microseconds for a time interval using the `365.25 / 12.0` formula for days per month average
|`INTERVAL [P]`
|`String`
|`io.debezium.time.Interval` +
(when `interval.handling.mode` is set to `string`)
|The string representation of the interval value that follows pattern `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, e.g. `P1Y2M3DT4H5M6.78S`
|`BYTEA`
|`BYTES`
|n/a
|
|`JSON`, `JSONB`
|`STRING`
|`io.debezium.data.Json`
|Contains the string representation of a JSON document, array, or scalar.
|`XML`
|`STRING`
|`io.debezium.data.Xml`
|Contains the string representation of an XML document
|`UUID`
|`STRING`
|`io.debezium.data.Uuid`
|Contains the string representation of a PostgreSQL UUID value
|`POINT`
|`STRUCT`
|`io.debezium.data.geometry.Point`
|Contains a structure with 2 `FLOAT64` fields - `(x,y)` - each representing the coordinates of a geometric point
|`LTREE`
|`STRING`
|`io.debezium.data.Ltree`
|Contains the string representation of a PostgreSQL LTREE value
|`CITEXT`
|`STRING`
|n/a
|
|`INET`
|`STRING`
|n/a
|
|`INT4RANGE`
|`STRING`
|n/a
|Range of integer
|`INT8RANGE`
|`STRING`
|n/a
|Range of bigint
|`NUMRANGE`
|`STRING`
|n/a
|Range of numeric
|`TSRANGE`
|`STRING`
|n/a
|Contains the string representation of timestamp range without time zone.
|`TSTZRANGE`
|`STRING`
|n/a
|Contains the string representation of a timestamp range with (local system) time zone.
|`DATERANGE`
|`STRING`
|n/a
|Contains the string representation of a date range. It always has an exclusive upper-bound.
|=======================
Other data type mappings are described in the following sections.
[[temporal-values]]
==== Temporal Values
Other than PostgreSQL's `TIMESTAMPTZ` and `TIMETZ` data types (which contain time zone information), the other temporal types depend on the value of the `time.precision.mode` configuration property. When the `time.precision.mode` configuration property is set to `adaptive` (the default), then the connector will determine the literal type and semantic type for the temporal types based on the column's data type definition so that events _exactly_ represent the values in the database:
[cols="20%a,15%a,30%a,35%a",width=150,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`DATE`
|`INT32`
|`io.debezium.time.Date`
| Represents the number of days since epoch.
|`TIME(1)`, `TIME(2)`, `TIME(3)`
|`INT32`
|`io.debezium.time.Time`
| Represents the number of milliseconds past midnight, and does not include timezone information.
|`TIME(4)`, `TIME(5)`, `TIME(6)`
|`INT64`
|`io.debezium.time.MicroTime`
| Represents the number of microseconds past midnight, and does not include timezone information.
|`TIMESTAMP(1)`, `TIMESTAMP(2)`, `TIMESTAMP(3)`
|`INT64`
|`io.debezium.time.Timestamp`
| Represents the number of milliseconds past epoch, and does not include timezone information.
|`TIMESTAMP(4)`, `TIMESTAMP(5)`, `TIMESTAMP(6)`, `TIMESTAMP`
|`INT64`
|`io.debezium.time.MicroTimestamp`
| Represents the number of microseconds past epoch, and does not include timezone information.
|=======================
When the `time.precision.mode` configuration property is set to `adaptive_time_microseconds`, then the connector will determine the literal type and semantic type for the temporal types based on the column's data type definition so that events _exactly_ represent the values in the database, except that all TIME fields will be captured as microseconds:
[cols="20%a,15%a,30%a,35%a",width=150,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`DATE`
|`INT32`
|`io.debezium.time.Date`
| Represents the number of days since epoch.
|`TIME([P])`
|`INT64`
|`io.debezium.time.MicroTime`
| Represents the time value in microseconds and does not include timezone information. PostgreSQL allows precision `P` to be in the range 0-6 to store up to microsecond precision.
|`TIMESTAMP(1)` , `TIMESTAMP(2)`, `TIMESTAMP(3)`
|`INT64`
|`io.debezium.time.Timestamp`
| Represents the number of milliseconds past epoch, and does not include timezone information.
|`TIMESTAMP(4)` , `TIMESTAMP(5)`, `TIMESTAMP(6)`, `TIMESTAMP`
|`INT64`
|`io.debezium.time.MicroTimestamp`
| Represents the number of microseconds past epoch, and does not include timezone information.
|=======================
When the `time.precision.mode` configuration property is set to `connect`, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values. On the other hand, since PostgreSQL supports microsecond precision, the events generated by a connector with the `connect` time precision mode will *result in a loss of precision* when the database column has a _fractional second precision_ value greater than 3:
[cols="20%a,15%a,30%a,35%a",width=150,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`DATE`
|`INT32`
|`org.apache.kafka.connect.data.Date`
| Represents the number of days since epoch.
|`TIME([P])`
|`INT64`
|`org.apache.kafka.connect.data.Time`
| Represents the number of milliseconds since midnight, and does not include timezone information. PostgreSQL allows `P` to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when `P` > 3.
|`TIMESTAMP([P])`
|`INT64`
|`org.apache.kafka.connect.data.Timestamp`
| Represents the number of milliseconds since epoch, and does not include timezone information. PostgreSQL allows `P` to be in the range 0-6 to store up to microsecond precision, though this mode results in a loss of precision when `P` > 3.
|=======================
[[timestamp-values]]
===== `TIMESTAMP` values
The `TIMESTAMP` type represents a timestamp without time zone information.
Such columns are converted into an equivalent Kafka Connect value based on UTC.
So for instance the `TIMESTAMP` value "2018-06-20 15:13:16.945104" will be represented by a `io.debezium.time.MicroTimestamp` with the value "1529507596945104"
(assuming `time.precision.mode` is not set to `connect`).
Note that the timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.
[[decimal-values]]
==== Decimal Values
When `decimal.handling.mode` configuration property is set to `precise`, then the connector will use the predefined Kafka Connect `org.apache.kafka.connect.data.Decimal` logical type for all `DECIMAL` and `NUMERIC` columns. This is the default mode.
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`NUMERIC[(M[,D])]`
|`BYTES`
|`org.apache.kafka.connect.data.Decimal`
|The `scaled` schema parameter contains an integer representing how many digits the decimal point was shifted.
|`DECIMAL[(M[,D])]`
|`BYTES`
|`org.apache.kafka.connect.data.Decimal`
|The `scaled` schema parameter contains an integer representing how many digits the decimal point was shifted.
|=======================
There is an exception to this rule.
When the `NUMERIC` or `DECIMAL` types are used without any scale constraints then it means that the values coming from the database have a different (variable) scale for each value.
In this case a type `io.debezium.data.VariableScaleDecimal` is used and it contains both value and scale of the transferred value.
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`NUMERIC`
|`STRUCT`
|`io.debezium.data.VariableScaleDecimal`
|Contains a structure with two fields: `scale` of type `INT32` that contains the scale of the transferred value and `value` of type `BYTES` containing the original value in an unscaled form.
|`DECIMAL`
|`STRUCT`
|`io.debezium.data.VariableScaleDecimal`
|Contains a structure with two fields: `scale` of type `INT32` that contains the scale of the transferred value and `value` of type `BYTES` containing the original value in an unscaled form.
|=======================
However, when `decimal.handling.mode` configuration property is set to `double`, then the connector will represent all `DECIMAL` and `NUMERIC` values as Java double values and encodes them as follows:
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`NUMERIC[(M[,D])]`
|`FLOAT64`
|
|
|`DECIMAL[(M[,D])]`
|`FLOAT64`
|
|
|=======================
The last option for `decimal.handling.mode` configuration property is `string`. In this case the connector will represent all `DECIMAL` and `NUMERIC` values as their formatted string representation and encodes them as follows:
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`NUMERIC[(M[,D])]`
|`STRING`
|
|
|`DECIMAL[(M[,D])]`
|`STRING`
|
|
|=======================
PostgreSQL supports `NaN` (not a number) special value to be stored in the `DECIMAL`/`NUMERIC` values. Only `string` and `double` modes are able to handle such values encoding them as either `Double.NaN` or string constant `NAN`.
[[hstore-values]]
==== HStore Values
When `hstore.handling.mode` configuration property is set to `map`, then the connector will use the `java.util.Map<String,String>` logical type, `MAP` schema type for all `HSTORE` columns. This is the default mode.
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`HSTORE`
|`MAP`
|
|
|=======================
However, when `hstore.handling.mode` configuration property is set to `json`, then the connector will represent all `HSTORE` values as JSON String values and encodes them as follows:
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`HSTORE`
|`STRING`
|`io.debezium.data.Json`
| Example: Output representation is "{"key":"val"}"
|=======================
[[postgis-types]]
[[network-address-types]]
==== Network Address Types
PostgreSQL also have data types that can store IPv4, IPv6, and MAC addresses. It is better to use these instead of plain text types to store network addresses, because these types offer input error checking and specialized operators and functions.
[cols="15%a,15%a,35%a,35%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostgreSQL Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`INET`
|`STRING`
|
|IPv4 and IPv6 networks
|`CIDR`
|`STRING`
|
|IPv4 and IPv6 hosts and networks
|`MACADDR`
|`STRING`
|
|MAC addresses
|`MACADDR8`
|`STRING`
|
|MAC addresses in EUI-64 format
|=======================
==== PostGIS Types
The PostgreSQL connector also has full support for all of the http://postgis.net[PostGIS data types]
[cols="20%a,15%a,30%a,35%a",width=150,options="header,footer",role="table table-bordered table-striped"]
|=======================
|PostGIS Data Type
|Literal type (schema type)
|Semantic type (schema name)
|Notes
|`GEOMETRY` +
(planar)
|`STRUCT`
|`io.debezium.data.geometry.Geometry`
|Contains a structure with 2 fields +
* `srid (INT32)` - Spatial Reference System Identifier defining what type of geometry object is stored in the structure
* `wkb (BYTES)` - a binary representation of the geometry object encoded in the Well-Known-Binary format.
Please see http://www.opengeospatial.org/standards/sfa[Open Geospatial Consortium Simple Features Access specification] for the format details.
|`GEOGRAPHY` +
(spherical)
|`STRUCT`
|`io.debezium.data.geometry.Geography`
|Contains a structure with 2 fields +
* `srid (INT32)` - Spatial Reference System Identifier defining what type of geography object is stored in the structure
* `wkb (BYTES)` - a binary representation of the geometry object encoded in the Well-Known-Binary format.
Please see http://www.opengeospatial.org/standards/sfa[Open Geospatial Consortium Simple Features Access specification] for the format details.
|=======================
[[toasted-values]]
==== Toasted values
PostgreSQL has a hard limit on the page size.
This means that values larger than ca. 8 KB need to be stored using https://www.postgresql.org/docs/current/storage-toast.html[TOAST storage].
This impacts replication messages coming from database, as the values that were stored using the TOAST mechanism and have not been changed are not included in the message, unless they are part of the table's replica identity.
There is no safe way for Debezium to read the missing value out-of-bands directly from database, as this would lead into race conditions potentially.
Debezium thus follows these rules to handle the toasted values:
* tables with `REPLICA IDENTITY FULL`: TOAST column values are part of the `before` and `after` blocks of change events as any other column
* tables with `REPLICA IDENTITY DEFAULT`: when receiving an `UPDATE` event from the database,
any unchanged TOAST column value which is not part of the replica identity will not be part of that event;
similarly, when receiving a `DELETE` event, any such TOAST column will not be part of the `before` block.
As Debezium cannot safely provide the column value in this case, it returns a placeholder value defined in configuration option `toasted.value.placeholder`.
[IMPORTANT]
====
There is a specific problem related to Amazon RDS instances.
`wal2json` plugin has evolved over the time and there were releases that provided out-of-band toasted values.
Amazon supports different versions of the plugin for different PostgreSQL versions.
Please consult Amazon's https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/CHAP_PostgreSQL.html[documentation] to obtain version to version mapping.
For consistent toasted values handling we recommend to
* use `pgoutput` plugin for PostgreSQL 10+ instances
* set `include-unchanged-toast=0` for older versions of `wal2json` plugin using `slot.stream.params` configuration option
====
[[fault-tolerance]]
[[when-things-go-wrong]]
=== When Things Go Wrong
Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then Debezium provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations Debezium (like Kafka) provides _at least once_ delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
==== Configuration and Startup Errors
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, when the connector cannot successfully connect to PostgreSQL using the specified connectivity parameters, or when the connector is restarting from a previously-recorded position in the PostgreSQL WAL (via the LSN value) and PostgreSQL no longer has that history available.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the PostgreSQL problem has been addressed.
==== PostgreSQL Becomes Unavailable
Once the connector is running, if the PostgreSQL server it has been connected to becomes unavailable for any reason, the connector will fail with an error and the connector will stop. Simply restart the connector when the server is available.
The PostgreSQL connector stores externally the last processed offset (in the form of a PostgreSQL `log sequence number` value). Once a connector is restarted and connects to a server instance, it will ask the server to continue streaming from that particular offset. This offset will always remain available so long as the Debezium replication slot remains intact. Never drop a replication slot on the primary or you will lose data. See the next section for failure cases when a slot has been removed.
==== Cluster Failures
As of `12`, PostgreSQL allows logical replication slots _only on primary servers_, which means that a PostgreSQL connector can only be pointed to the active `primary` of a database cluster. If this machine goes down, only after a new `primary` has been promoted (with the link:#output-plugin[logical decoding plugin] installed) can the connector be restarted and pointed to the new server.
There are some really important caveats to failovers, and you should pause Debezium until you can verify that you have a replication slot intact which has not lost data. After a failover, you will lose data unless your administration of failovers includes a process to recreate the Debezium replication slot before the application is allowed to write to the *new* primary. You also may need to verify in a failover situation that Debezium was able to read all changes in the slot **before the old primary failed**.
One reliable method of recovering and verifying any lost changes (yet administratively difficult) is to recover a backup of your failed primary to the point immediately before it failed, which would allow you to inspect the replication slot for any unconsumed changes. In any case, it is crucial that you recreate the replication slot on the new primary prior to allowing writes to it.
[NOTE]
====
There are discussions in the PostgreSQL community around a feature called `failover slots` which would help mitigate this problem, but as of `12` they have not been implemented yet. However, there is active development for Postgres 13 to support logical decoding on standbys, which is a major requirement to make failover possible. You can find more about this on the community thread:
https://www.postgresql.org/message-id/CAJ3gD9fE=0w50sRagcs+jrktBXuJAWGZQdSTMa57CCY+Dh-xbg@mail.gmail.com
You can find out more about the concept of failover slots here http://blog.2ndquadrant.com/failover-slots-postgresql[this blog post].
====
==== Kafka Connect Process Stops Gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There will be a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
==== Kafka Connect Process Crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will obviously terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the PostgreSQL connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events will depend on the offset flush period and the volume of data changes just before the crash.
[TIP]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium change are idempotent, so a sequence of events always results in the same state.
Debezium also includes with each change event message the source-specific information about the origin of the event, including the PostgreSQL server's time of the event, the id of the server transaction and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information (especially the LSN position) to know whether they have already seen a particular event.
====
==== Kafka Becomes Unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be re-established, at which point the connectors will resume exactly where they left off.
==== Connector Is Stopped for a Duration
If the connector is gracefully stopped, the database can continue to be used and any new changes will be recorded in the PostgreSQL WAL. When the connector is restarted, it will resume streaming changes where it last left off, recording change events for all of the changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines[massive throughput]. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.
[[configuration]]
[[deploying-a-connector]]
== Deploying a Connector
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and http://kafka.apache.org/documentation.html#connect[Kafka Connect], then using Debezium's PostgreSQL connector is easy. Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/{debezium-version}/debezium-connector-postgres-{debezium-version}-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to http://docs.confluent.io/{confluent-platform-version}/connect/userguide.html#installing-plugins[Kafka Connect's classpath]. Restart your Kafka Connect process to pick up the new JARs.
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[Debezium's Docker images] for Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already pre-installed and ready to go. You can even link:/blog/2016/05/31/Debezium-on-Kubernetes/[run Debezium on Kubernetes and OpenShift].
To use the connector to produce change events for a particular PostgreSQL server or cluster:
. install the link:#output-plugin[logical decoding plugin]
. configure the link:#server-configuration[PostgreSQL server] to support logical replication
. create a link:#example-configuration[configuration file for the PostgreSQL Connector] and use the https://docs.confluent.io/{confluent-platform-version}/connect/restapi.html[Kafka Connect REST API] to add that connector to your Kafka Connect cluster.
When the connector starts, it will grab a consistent snapshot of the databases in your PostgreSQL server and start streaming changes, producing events for every inserted, updated, and deleted row. You can also choose to produce events for a subset of the schemas and tables. Optionally ignore, mask, or truncate columns that are sensitive, too large, or not needed.
[[example]]
[[example-configuration]]
=== Example Configuration
Using the PostgreSQL connector is straightforward. Here is an example of the configuration for a PostgreSQL connector that monitors a PostgreSQL server at port 5432 on 192.168.99.100, which we logically name `fullfillment`:
[source,json]
----
{
"name": "inventory-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", // <2>
"database.hostname": "192.168.99.100", // <3>
"database.port": "5432", // <4>
"database.user": "postgres", // <5>
"database.password": "postgres", // <6>
"database.dbname" : "postgres", // <7>
"database.server.name": "fullfillment", // <8>
"table.whitelist": "public.inventory" // <9>
}
}
----
<1> The name of our connector when we register it with a Kafka Connect service.
<2> The name of this PostgreSQL connector class.
<3> The address of the PostgreSQL server.
<4> The port number of the PostgreSQL server.
<5> The name of the PostgreSQL user that has the link:#PostgreSQL-permissions[required privileges].
<6> The password for the PostgreSQL user that has the link:#PostgreSQL-permissions[required privileges].
<7> The name of the PostgreSQL database to connect to
<8> The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the link:#avro-converter[Avro Connector] is used.
<9> A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
See the link:#connector-properties[complete list of connector properties] that can be specified in these configurations.
This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the PostgreSQL database and record events to Kafka topics.
[[connector-properties]]
=== Connector Properties
The following configuration properties are _required_ unless a default value is available.
[cols="35%a,10%a,55%a",options="header,footer",role="table table-bordered table-striped"]
|=======================
|Property
|Default
|Description
|`name`
|
|Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
|`connector.class`
|
|The name of the Java class for the connector. Always use a value of `io.debezium.connector.postgresql.PostgresConnector` for the PostgreSQL connector.
|`tasks.max`
|`1`
|The maximum number of tasks that should be created for this connector. The PostgreSQL connector always uses a single task and therefore does not use this value, so the default is always acceptable.
|`plugin.name`
|`decoderbufs`
|The name of the Postgres link:#output-plugin[logical decoding plugin] installed on the server. Supported values are `decoderbufs`, `wal2json`, `wal2json_rds`, `wal2json_streaming` (added in 0.8.0.Beta1), `wal2json_rds_streaming` (added in 0.8.0.Beta1) and `pgoutput` (added in 0.10.0.Beta3)
When the processed transactions are very large it is possible that the `JSON` batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB.
In such cases it is possible to switch to so-called *streaming* mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.
|`slot.name`
|`debezium`
|The name of the Postgres logical decoding slot created for streaming changes from a plugin and database instance. Values must conform to link:https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION[Postgres replication slot naming rules] which state: _"Each replication slot has a name, which can contain lower-case letters, numbers, and the underscore character."_
|`slot.drop.on.stop`
|`false`
|Whether or not to drop the logical replication slot when the connector finishes orderly. Should only be set to `true` in testing or development environments. Dropping the slot allows WAL segments to be discarded by the database, so it may happen that after a restart the connector cannot resume from the WAL position where it left off before.
|`publication.name`
|`dbz_publication`
|The name of the PostgreSQL publication created created for streaming changes when using `pgoutput`.
This publication is created at start-up if it does not already exist to include _all tables_.
Debezium will then use its own white-/blacklist filtering capabilities to limit change events to the specific tables of interest if configured.
Note the connector user must have superuser permissions in order to create this publication,
so it is usually preferable to create the publication upfront.
If the publication already exists (either for all tables or configured with a subset of tables),
Debezium will instead use the publication as defined.
|`database.hostname`
|
|IP address or hostname of the PostgreSQL database server.
|`database.port`
|`5432`
|Integer port number of the PostgreSQL database server.
|`database.user`
|
|Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
|`database.password`
|
|Password to use when connecting to the PostgreSQL database server.
|`database.dbname`
|
|The name of the PostgreSQL database from which to stream the changes
|`database.server.name`
|
|Logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored. The logical name should be unique across all other connectors, since it is used as a prefix for all Kafka topic names coming from this connector.
|`schema.whitelist`
|
|An optional comma-separated list of regular expressions that match schema names to be monitored; any schema name not included in the whitelist will be excluded from monitoring. By default all non-system schemas will be monitored. May not be used with `schema.blacklist`.
|`schema.blacklist`
|
|An optional comma-separated list of regular expressions that match schema names to be excluded from monitoring; any schema name not included in the blacklist will be monitored, with the exception of system schemas. May not be used with `schema.whitelist`.
|`table.whitelist`
|
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form _schemaName_._tableName_. By default the connector will monitor every non-system table in each monitored schema. May not be used with `table.blacklist`.
|`table.blacklist`
|
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form _schemaName_._tableName_. May not be used with `table.whitelist`.
|`column.blacklist`
|
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_
|`time.precision.mode`
|`adaptive`
| Time, date, and timestamps can be represented with different kinds of precision, including: `adaptive` (the default) captures the time and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column's type; `adaptive_time_microseconds` captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database column's type, with the exception of TIME type fields, which are always captured as microseconds; or `connect` always represents time and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision. See link:#temporal-values[temporal values].
|`decimal.handling.mode`
|`precise`
| Specifies how the connector should handle values for `DECIMAL` and `NUMERIC` columns: `precise` (the default) represents them precisely using `java.math.BigDecimal` values represented in change events in a binary form; or `double` represents them using `double` values, which may result in a loss of precision but will be far easier to use. `string` option encodes values as formatted string which is easy to consume but a semantic information about the real type is lost. See <<decimal-values>>.
|`hstore.handling.mode`
|`map`
| Specifies how the connector should handle values for `hstore` columns: `map` (the default) represents using `MAP`; or `json` represents them using `json string`.`json` option encodes values as formatted string such as `{"key" : "val"}`. See <<hstore-values>>.
|`interval.handling.mode`
|`numeric`
| Specifies how the connector should handle values for `interval` columns: `numeric` (the default) represents interval using approximate number of microseconds; `string` represents them exactly, using the string pattern representation `P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S`, e.g. `P1Y2M3DT4H5M6.78S`. See <<data-types>>.
|`database.sslmode`
|`disable`
|Whether to use an encrypted connection to the PostgreSQL server. Options include: *disable* (the default) to use an unencrypted connection ; *require* to use a secure (encrypted) connection, and fail if one cannot be established; *verify-ca* like `require` but additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates, or fail if no valid matching CA certificates are found; *verify-full* like `verify-ca` but additionally verify that the server certificate matches the host to which the connection is attempted. See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`database.sslcert`
|
|The path to the file containing the SSL Certificate for the client. See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`database.sslkey`
|
|The path to the file containing the SSL private key of the client. See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`database.sslpassword`
|
|The password to access the client private key from the file specified by `database.sslkey`. See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`database.sslrootcert`
|
|The path to the file containing the root certificate(s) against which the server is validated. See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`database.tcpKeepAlive`
|
|Enable TCP keep-alive probe to verify that database connection is still alive. (enabled by default). See https://www.postgresql.org/docs/current/static/libpq-connect.html[the PostgreSQL documentation] for more information.
|`tombstones.on.delete`
|`true`
| Controls whether a tombstone event should be generated after a delete event. +
When `true` the delete operations are represented by a delete event and a subsequent tombstone event. When `false` only a delete event is sent. +
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.
|`column.propagate.source.type` 0.8.0 and later
|_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.
The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively.
Useful to properly size corresponding columns in sink databases.
Fully-qualified names for columns are of the form _databaseName_._tableName_._columnName_, or _databaseName_._schemaName_._tableName_._columnName_.
|`message.key.columns`
|_empty string_
| A semi-colon list of regular expressions that match fully-qualified tables and columns to map a primary key. +
Each item (regular expression) must match the fully-qualified `<fully-qualified table>:<a comma-separated list of columns>` representing the custom key. +
Fully-qualified tables could be defined as `DB_NAME.TABLE_NAME` or `SCHEMA_NAME.TABLE_NAME`, depending on the specific connector.
|=======================
The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration.
[cols="35%a,10%a,55%a",width=100,options="header,footer",role="table table-bordered table-striped"]
|=======================
|Property
|Default
|Description
|`snapshot.mode`
|`initial`
|Specifies the criteria for running a snapshot upon startup of the connector. The default is *initial*, and specifies the connector can run a snapshot only when no offsets have been recorded for the logical server name. The *always* option specifies that the connector run a snapshot each time on startup. The *never* option specifies that the connect should never use snapshots and that upon first startup with a logical server name the connector should read from either from where it last left off (last LSN position) or start from the beginning from the point of the view of the logical replication slot. The *initial_only* option specifies that the connector should only take an initial snapshot and then stop, without processing any subsequent changes. The *exported* option specifies that the database snapshot will be based on the point in time when the replication slot was created and is an excellent way to perform the snapshot in a lock-free way. Finally, if set to *custom* then the user must also set `snapshot.custom.class` which is a custom implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. See link:#snapshots[snapshots].
|`snapshot.custom.class`
|
| A full java class name that must be an implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. Only used when `snapshot.mode` is *custom*
|`snapshot.lock.timeout.ms`
|`10000`
|Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail See link:#snapshots[snapshots]
|`snapshot.select.statement.overrides`
|
|Controls which rows from tables will be included in snapshot. +
This property contains a comma-separated list of fully-qualified tables _(DB_NAME.TABLE_NAME)_. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id `snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]`. The value of those properties is the SELECT statement to use when retrieving data from the specific table during snapshotting. _A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted._ +
*Note*: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
|`max.queue.size`
|`20240`
|Positive integer value that specifies the maximum size of the blocking queue into which change events received via streaming replication are placed before they are written to Kafka. This queue can provide backpressure when, for example, writes to Kafka are slower or if Kafka is not available.
|`max.batch.size`
|`10240`
|Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector.
|`poll.interval.ms`
|`1000`
|Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.
|`include.unknown.datatypes`
|`false`
|When Debezium meets a field whose data type is unknown, then by default the field is omitted from the change event and a warning is logged.
In some cases it may be preferable though to include the field and send it downstream to clients in the opaque binary representation so the clients will decode it themselves.
Set to `false` to filter unknown data out of events and `true` to keep them in binary format.
_Note: The clients risk backward compatibility issues. Not only may the database specific binary representation change between releases, but also when the datatype is supported by Debezium eventually, it will be sent downstream in a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added._
|`database.initial.statements`
|
|A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established.
Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter.
_Note: The connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only, but not for executing DML statements._
|`heartbeat.interval.ms`
|`0`
|Controls how frequently heartbeat messages are sent. +
This property contains an interval in milli-seconds that defines how frequently the connector sends messages into a heartbeat topic.
This can be used to monitor whether the connector is still receiving change events from the database.
You also should leverage heartbeat messages in cases where only records in non-captured tables are changed for a longer period of time.
In such situation the connector would proceed to read the log from the database but never emit any change messages into Kafka,
which in turn means that no offset updates will be committed to Kafka.
This will cause the WAL files to be retained by the database longer than needed
(as the connector actually has processed them already but never got a chance to flush the latest retrieved LSN to the database)
and also may result in more change events to be re-sent after a connector restart.
Set this parameter to `0` to not send heartbeat messages at all. +
Disabled by default.
|`heartbeat.topics.prefix`
|`__debezium-heartbeat`
|Controls the naming of the topic to which heartbeat messages are sent. +
The topic is named according to the pattern `<heartbeat.topics.prefix>.<server.name>`.
|`schema.refresh.mode`
|`columns_diff`
|Specify the conditions that trigger a refresh of the in-memory schema for a table.
`columns_diff` (the default) is the safest mode, ensuring the in-memory schema stays in-sync with the database table's schema at all times.
`columns_diff_exclude_unchanged_toast` instructs the connector to refresh the in-memory schema cache if there is a discrepancy between it
and the schema derived from the incoming message, unless unchanged TOASTable data fully accounts for the discrepancy.
This setting can improve connector performance significantly if there are frequently-updated tables that
have TOASTed data that are rarely part of these updates. However, it is possible for the in-memory schema to
become outdated if TOASTable columns are dropped from the table.
|`snapshot.delay.ms`
|
|An interval in milli-seconds that the connector should wait before taking a snapshot after starting up; +
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.
|`snapshot.fetch.size`
|`10240`
|Specifies the maximum number of rows that should be read in one go from each table while taking a snapshot.
The connector will read the table contents in multiple batches of this size. Defaults to 10240.
|`slot.stream.params`
|
|Optional list of parameters to be passed to the configured logical decoding plug-in;
can for instance be used to enable server-side table filtering when using the wal2json plug-in.
Allowed values depend on the chosen plug-in are separated by semicolon,
e.g. `add-tables=public.table,public.table2;include-lsn=true`.
|`sanitize.field.names`
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
|Whether field names will be sanitized to adhere to Avro naming requirements.
See xref:configuration/avro.adoc#names[Avro naming] for more details.
|`slot.max.retries`
|6
|How many times to retry connecting to a replication slot when an attempt fails.
|`slot.retry.delay.ms` +
|10000 (10 seconds)
|The number of milli-seconds to wait between retry attempts when the connector fails to connect to a replication slot.
|`toasted.value.placeholder`
|`__debezium_unavailable_value`
|Specify the constant that will be provided by Debezium to indicate that the original value is a toasted value not provided by the database.
If starts with `hex:` prefix it is expected that the rest of the string repesents hexadecimally encoded octets.
See link:#toasted-values[section] with additional details.
|=======================
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer.
Be sure to consult the http://kafka.apache.org/documentation.html[Kafka documentation] for all of the configuration properties for Kafka producers and consumers. (The PostgreSQL connector does use the http://kafka.apache.org/documentation.html#newconsumerconfigs[new consumer].)