updates to the connector doc files

Signed-off-by: prmellor <pmellor@redhat.com>
This commit is contained in:
prmellor 2019-12-20 17:35:50 +00:00 committed by Gunnar Morling
parent cd99ce4038
commit 092804b7ea
4 changed files with 719 additions and 410 deletions

View File

@ -7,6 +7,7 @@ ifndef::site-gen-antora[]
:partialsdir: {moduledir}/pages/_partials
endif::[]
:ProductName: Debezium
:debezium-version: 1.0.0.Final
:debezium-dev-version: 1.1
:debezium-kafka-version: 2.4.0

View File

@ -1,4 +1,6 @@
= Debezium Connector for MongoDB
= {ProductName} Connector for MongoDB
ifndef::cdc-product[]
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
@ -7,44 +9,86 @@ include::../_attributes.adoc[]
:source-highlighter: highlight.js
toc::[]
endif::cdc-product[]
Debezium's MongoDB Connector can monitor a https://docs.mongodb.com/manual/tutorial/deploy-replica-set/[MongoDB replica set] or a https://docs.mongodb.com/manual/core/sharded-cluster-components/[MongoDB sharded cluster] for document changes in databases and collections, recording those changes as events in Kafka topics. The connector automatically handles the https://docs.mongodb.com/manual/tutorial/add-shards-to-shard-cluster/[addition] or https://docs.mongodb.com/manual/tutorial/remove-shards-from-cluster/[removal] of shards in a sharded cluster, changes in membership of each replica set, https://docs.mongodb.com/manual/core/replica-set-elections/[elections] within each replica set, and awaiting the resolution of communications problems.
ifdef::cdc-product[]
[IMPORTANT]
====
Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process.
For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope^].
====
endif::cdc-product[]
[[overview]]
{ProductName}'s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics.
The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and awaiting the resolution of communications problems.
[[mongodb-overview]]
== Overview
MongoDB's https://docs.mongodb.com/manual/replication/[replication mechanism] provides redundancy and high availability, and is the preferred way to run MongoDB in production. A MongoDB _replica set_ consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set's _primary_ are correctly applied to the other replica set's servers, called _secondaries_. MongoDB replication works by having the primary record the changes in its _oplog_ (or operation log), and then each of the secondaries reads the primary's oplog and applies in order all of the operations to their own documents. When a new server is added to a replica set, that server first performs an https://docs.mongodb.com/manual/core/replica-set-sync/[initial sync] of all of the databases and collections on the primary, and then reads the primary's oplog to apply all changes that might have been made since it began the initial sync. This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary's oplog.
MongoDB's replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production.
MongoDB connector captures the changes in a replica set or sharded cluster.
The Debezium MongoDB connector uses this same replication mechanism, though it does not actually become a member of the replica set. Just like MongoDB secondaries, however, the connector always reads the oplog of the replica set's primary. And, when the connector sees a replica set for the first time, it looks at the oplog to get the last recorded transaction and then performs an _intial sync_ of the primary's databases and collections. When all the data is copied, the connector then starts reading the oplog from the position it read earlier. Operations in the MongoDB oplog are https://docs.mongodb.com/manual/core/replica-set-oplog/[idempotent], so no matter how many times the operations are applied, they result in the same end state.
A MongoDB _replica set_ consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set's _primary_ are correctly applied to the other replica set's servers, called _secondaries_.
MongoDB replication works by having the primary record the changes in its _oplog_ (or operation log), and then each of the secondaries reads the primary's oplog and applies in order all of the operations to their own documents.
When a new server is added to a replica set, that server first performs an https://docs.mongodb.com/manual/core/replica-set-sync/[initial sync] of all of the databases and collections on the primary, and then reads the primary's oplog to apply all changes that might have been made since it began the initial sync.
This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary's oplog.
As the MongoDB connector processes the oplog, it periodically records the position in the oplog where the event originated. When the MongoDB connector stops, it records the last oplog position that it processed, so that upon restart it simply begins reading the oplog from that position. In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and it will pick up exactly where it left off without losing a single event. Of course, MongoDB's oplogs are usually capped at a maximum size, which means that the connector should not be stopped for too long, or else some of the operations in the oplog might be purged before the connector has a change to read them. In this case, upon restart the connector will detect the missing oplog operations, perform an initial sync, and then proceed to tail the oplog.
The MongoDB connector uses this same replication mechanism, though it does not actually become a member of the replica set.
Just like MongoDB secondaries, however, the connector always reads the oplog of the replica set's primary.
And, when the connector sees a replica set for the first time, it looks at the oplog to get the last recorded transaction and then performs an _intial sync_ of the primary's databases and collections.
When all the data is copied, the connector then starts reading the oplog from the position it read earlier. Operations in the MongoDB oplog are https://docs.mongodb.com/manual/core/replica-set-oplog/[idempotent], so no matter how many times the operations are applied, they result in the same end state.
The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures. The connector always uses the replica set's primary node to tail the oplog, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the oplog using the new primary node. Likewise, if connector experiences any problems communicating with the replica set primary, it will try to reconnect (using exponential backoff so as to not overwhelm the network or replica set) and continue tailing the oplog from where it last left off. In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.
As the MongoDB connector processes the oplog, it periodically records the position in the oplog where the event originated.
When the MongoDB connector stops, it records the last oplog position that it processed, so that upon restart it simply begins reading the oplog from that position.
In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and it will pick up exactly where it left off without losing a single event.
Of course, MongoDB's oplogs are usually capped at a maximum size, which means that the connector should not be stopped for too long, or else some of the operations in the oplog might be purged before the connector has a chance to read them.
In this case, upon restart the connector will detect the missing oplog operations, perform an initial sync, and then proceed to tail the oplog.
The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures.
The connector always uses the replica set's primary node to tail the oplog, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop tailing the oplog, connect to the new primary, and start tailing the oplog using the new primary node.
Likewise, if connector experiences any problems communicating with the replica set primary, it will try to reconnect (using exponential backoff so as to not overwhelm the network or replica set) and continue tailing the oplog from where it last left off.
In this way the connector is able to dynamically adjust to changes in replica set membership and to automatically handle communication failures.
.Additional resources
* https://docs.mongodb.com/manual/replication/[Replication mechanism]
* link:https://docs.mongodb.com/manual/tutorial/deploy-replica-set/[Replica set]
* link:https://docs.mongodb.com/manual/core/replica-set-elections/[Replica set elections]
* link:https://docs.mongodb.com/manual/core/sharded-cluster-components/[Sharded cluster]
* link:https://docs.mongodb.com/manual/tutorial/add-shards-to-shard-cluster/[Shard addition]
* link:https://docs.mongodb.com/manual/tutorial/remove-shards-from-cluster/[Shard removal]
[[setting-up-mongodb]]
== Setting up MongoDB
The Debezium MongoDB connector uses MongoDB's oplog to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set. See the MongoDB documentation for setting up a https://docs.mongodb.com/manual/replication/[replica set] or https://docs.mongodb.com/manual/sharding/[sharded cluster]. Also, be sure to understand how to enable https://docs.mongodb.com/manual/tutorial/deploy-replica-set-with-keyfile-access-control/#deploy-repl-set-with-auth[access control and authentication] with replica sets.
The MongoDB connector uses MongoDB's oplog to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set.
See the MongoDB documentation for setting up a https://docs.mongodb.com/manual/replication/[replica set] or https://docs.mongodb.com/manual/sharding/[sharded cluster].
Also, be sure to understand how to enable https://docs.mongodb.com/manual/tutorial/deploy-replica-set-with-keyfile-access-control/#deploy-repl-set-with-auth[access control and authentication] with replica sets.
You must also have a MongoDB user that has the appropriate roles to read the `admin` database where the oplog can be read. Additionally, the user must also be able to read the `config` database in the configuration server of a sharded cluster and must have `listDatabases` privilege action.
[[supported-mongodb-topologies]]
== Supported MongoDB topologies
The Debezium MongoDB connector can be used with a variety of MongoDB topologies.
The MongoDB connector can be used with a variety of MongoDB topologies.
[[mongodb-replicaset]]
=== MongoDB replica set
The Debezium MongoDB connector can capture changes from a single https://docs.mongodb.com/manual/replication/[MongoDB replica set]. Although production replica sets should have https://docs.mongodb.com/manual/core/replica-set-architecture-three-members/[at least three members], the connector actually doesn't care how many members are in the replica set.
The MongoDB connector can capture changes from a single https://docs.mongodb.com/manual/replication/[MongoDB replica set].
Production replica sets require a minimum of https://docs.mongodb.com/manual/core/replica-set-architecture-three-members/[at least three members].
To use the Debezium MongoDB connector with a replica set, simply provide the addresses of one or more replica set servers as _seed addresses_ via the connector's `mongodb.hosts` property. The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary. The connector will start a task to connect to the primary and capture the changes from the primary's oplog. When the replica set elects a new primary, the task will automatically switch over to the new primary.
To use the MongoDB connector with a replica set, provide the addresses of one or more replica set servers as _seed addresses_ through the connector's `mongodb.hosts` property.
The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary.
The connector will start a task to connect to the primary and capture the changes from the primary's oplog.
When the replica set elects a new primary, the task will automatically switch over to the new primary.
[TIP]
[NOTE]
====
When MongoDB is fronted by a proxy (such as with Docker on OS X or Windows), then when a client connects to the replica set and discover the members, the MongoDB client will exclude the proxy as a valid member and will attempt and fail to connect directly to the members rather than go through the proxy.
When MongoDB is fronted by a proxy (such as with Docker on OS X or Windows), then when a client connects to the replica set and discovers the members, the MongoDB client will exclude the proxy as a valid member and will attempt and fail to connect directly to the members rather than go through the proxy.
In these cases, set the connector's optional `mongodb.members.auto.discover` configuration property to `false` to instruct the connector to forgo membership discovery and instead simply use the first seed address (specified via the `mongodb.hosts` property) as the primary node. This may work, but still make cause issues when election occurs.
In such a case, set the connector's optional `mongodb.members.auto.discover` configuration property to `false` to instruct the connector to forgo membership discovery and instead simply use the first seed address (specified via the `mongodb.hosts` property) as the primary node.
This may work, but still make cause issues when election occurs.
====
[[mongodb-sharded-cluster]]
@ -52,20 +96,21 @@ In these cases, set the connector's optional `mongodb.members.auto.discover` con
A https://docs.mongodb.com/manual/sharding/[MongoDB sharded cluster] consists of:
* one or more _shards_, each deployed as a replica set;
* a separate replica set that acts as the cluster's _configuration server_; and
* one or more _routers_ (also called `mongos`) to which clients connect and that routes requests to the appropriate shards
* One or more _shards_, each deployed as a replica set;
* A separate replica set that acts as the cluster's _configuration server_
* One or more _routers_ (also called `mongos`) to which clients connect and that routes requests to the appropriate shards
To use the Debezium MongoDB connector with a sharded cluster, configure the connector with the host addresses of the _configuration server_ replica set. When the connector connects to this replica set, it discovers that it is acting as the configuration server for a sharded cluster, discovers the information about each replica set used as a shard in the cluster, and will then start up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.
To use the MongoDB connector with a sharded cluster, configure the connector with the host addresses of the _configuration server_ replica set. When the connector connects to this replica set, it discovers that it is acting as the configuration server for a sharded cluster, discovers the information about each replica set used as a shard in the cluster, and will then start up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.
[[mongodb-standalone-server]]
=== MongoDB standalone server
The Debezium MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog. The connector will work if the standalone server is converted to a replica set with one member.
The MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog.
The connector will work if the standalone server is converted to a replica set with one member.
[TIP]
[NOTE]
====
MongoDB https://docs.mongodb.com/manual/core/replica-set-architectures/[does not recommend] running a standalone server in production anyway.
MongoDB https://docs.mongodb.com/manual/core/replica-set-architectures/[does not recommend] running a standalone server in production.
====
@ -73,36 +118,42 @@ MongoDB https://docs.mongodb.com/manual/core/replica-set-architectures/[does not
[[how-the-mongodb-connector-works]]
== How the MongoDB connector works
This section goes into detail about how the MongoDB connector captures the changes in a replica set or sharded cluster.
When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets.
Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set.
The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.
[[tasks]]
=== Tasks
When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets. Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set. The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.
[TIP]
[NOTE]
====
When running the connector against a sharded cluster, use a value of `tasks.max` that is greater than the number of replica sets. This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.
When running the connector against a sharded cluster, use a value of `tasks.max` that is greater than the number of replica sets.
This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.
====
[[logical-name]]
[[logical-connector-name]]
=== Logical connector name
One of the connector configuration properties is `mongodb.name`, and it serves as a _logical name_ for the MongoDB replica set or sharded cluster. The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the oplog position of each replica set.
The connector configuration property `mongodb.name` serves as a _logical name_ for the MongoDB replica set or sharded cluster.
The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the oplog position of each replica set.
You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system. We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.
You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system.
We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.
[[initial-sync]]
=== Initial sync
When a task starts up using a replica set, it uses the connector's logical name and the replica set name to find an _offset_ that describes the position in the replica sets oplog where the connector previously stopped reading. If an offset can be found and it is still in the oplog, then the task immediately proceeds with link:#tailing-the-oplog[tailing the oplog], starting at the recorded offset position.
When a task starts up using a replica set, it uses the connector's logical name and the replica set name to find an _offset_ that describes the position in the replica sets oplog where the connector previously stopped reading.
If an offset can be found and it is still in the oplog, then the task immediately proceeds with link:#tailing-the-oplog[tailing the oplog], starting at the recorded offset position.
However, if no offset is found or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing an _initial sync_. This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes an initial sync has been started). The task will then proceed to copy each collection, spawning as many threads as possible (up to the value of the `initial.sync.max.threads` configuration property) to perform this work in parallel. The connector will record a separate _read event_ for each document it sees, and that read event will contain the object's identifier, the complete state of the object, and _source_ information about the MongoDB replica set where the object was found. The source information will also include a flag that denotes the event was produced during an initial sync.
However, if no offset is found or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing an _initial sync_.
This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes an initial sync has been started).
The task will then proceed to copy each collection, spawning as many threads as possible (up to the value of the `initial.sync.max.threads` configuration property) to perform this work in parallel.
The connector will record a separate _read event_ for each document it sees, and that read event will contain the object's identifier, the complete state of the object, and _source_ information about the MongoDB replica set where the object was found.
The source information will also include a flag that denotes the event was produced during an initial sync.
This initial sync will continue until it has copied all collections that match the connector's filters. If the connector is stopped before the tasks' initial syncs are completed, upon restart the connector will begin the initial sync again.
This initial sync will continue until it has copied all collections that match the connector's filters.
If the connector is stopped before the tasks' initial syncs are completed, upon restart the connector begins the initial sync again.
[TIP]
[NOTE]
====
Try to avoid task reassignment and reconfiguration while the connector is performing an intial sync of any replica sets. The connector does log messages with the progress of the initial sync. For utmost control, run a separate cluster of Kafka Connect for each connector.
====
@ -110,9 +161,11 @@ Try to avoid task reassignment and reconfiguration while the connector is perfor
[[tailing-the-oplog]]
=== Tailing the oplog
Once the connector task for a replica set has an offset, it uses the offset to determine the position in the oplog where it should start reading. The task will then connect to the replica set's primary node and start reading the oplog from that position, processing all of the create, insert, and delete operations and converting them into Debezium link:#events[change events]. Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. (The interval at which the offset is recorded is governed by the `offset.flush.interval.ms` http://docs.confluent.io/3.0.0/connect/userguide.html#configuring-workers[Kafka Connect worker configuration property].)
Once the connector task for a replica set has an offset, it uses the offset to determine the position in the oplog where it should start reading.
The task will then connect to the replica set's primary node and start reading the oplog from that position, processing all of the create, insert, and delete operations and converting them into {ProductName} link:#mongodb-events[change events]. Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. (The interval at which the offset is recorded is governed by the `offset.flush.interval.ms` http://docs.confluent.io/3.0.0/connect/userguide.html#configuring-workers[Kafka Connect worker configuration property].)
When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off. If the connector's tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector will begin at the last _recorded_ offset, possibly generating some the same events that were previously generated just prior to the crash.
When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off.
If the connector's tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the last _recorded_ offset, possibly generating some the same events that were previously generated just prior to the crash.
[NOTE]
====
@ -123,47 +176,54 @@ As mentioned above, the connector tasks always use the replica set's primary nod
The bottom line is that the MongoDB connector will continue running under most situations, though communication problems may cause the connector to wait until the problems are resolved.
[[topic-names]]
[[mongodb-topic-names]]
=== Topics names
The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic. The name of the Kafka topics always takes the form _logicalName_._databaseName_._collectionName_, where _logicalName_ is the link:logical-name[logical name] of the connector as specified with the `mongodb.name` configuration property, _databaseName_ is the name of the database where the operation occurred, and _collectionName_ is the name of the MongoDB collection in which the affected document existed.
The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic.
The name of the Kafka topics always takes the form _logicalName_._databaseName_._collectionName_, where _logicalName_ is the link:logical-name[logical name] of the connector as specified with the `mongodb.name` configuration property, _databaseName_ is the name of the database where the operation occurred, and _collectionName_ is the name of the MongoDB collection in which the affected document existed.
For example, consider a MongoDB replica set with an `inventory` database that contains four collections: `products`, `products_on_hand`, `customers`, and `orders`. If the connector monitoring this database were given a logical name of `fulfillment`, then the connector would produce events on these four Kafka topics:
For example, consider a MongoDB replica set with an `inventory` database that contains four collections: `products`, `products_on_hand`, `customers`, and `orders`.
If the connector monitoring this database were given a logical name of `fulfillment`, then the connector would produce events on these four Kafka topics:
* `fulfillment.inventory.products`
* `fulfillment.inventory.products_on_hand`
* `fulfillment.inventory.customers`
* `fulfillment.inventory.orders`
Notice that the topic names do not incorporate the replica set name or shard name. As a result, all changes to a sharded collection (where each shard contains a subset of the collection's documents) all go to the same Kafka topic.
Notice that the topic names do not incorporate the replica set name or shard name.
As a result, all changes to a sharded collection (where each shard contains a subset of the collection's documents) all go to the same Kafka topic.
You can set up Kafka to http://kafka.apache.org/documentation.html#basic_ops_add_topic[auto-create] the topics as they are needed. If not, then you must use Kafka administration tools to create the topics before starting the connector.
You can set up Kafka to http://kafka.apache.org/documentation.html#basic_ops_add_topic[auto-create] the topics as they are needed.
If not, then you must use Kafka administration tools to create the topics before starting the connector.
[[partitions]]
=== Partitions
The MongoDB connector does not make any explicit determination of the topic partitions for events. Instead, it allows Kafka to determine the partition based upon the key. You can change Kafka's partitioning logic by defining in the Kafka Connect worker configuration the name of the `Partitioner` implementation.
The MongoDB connector does not make any explicit determination of the topic partitions for events.
Instead, it allows Kafka to determine the partition based upon the key.
You can change Kafka's partitioning logic by defining in the Kafka Connect worker configuration the name of the `Partitioner` implementation.
Be aware that Kafka only maintains total order for events written to a single topic _partition_. Partitioning the events by key does mean that all events with the same key will always go to the same partition, ensuring that all events for a specific document are always totally ordered.
Be aware that Kafka only maintains total order for events written to a single topic _partition_.
Partitioning the events by key does mean that all events with the same key will always go to the same partition, ensuring that all events for a specific document are always totally ordered.
[[events]]
[[mongodb-events]]
=== Events
All data change events produced by the MongoDB connector have a key and a value. The rest of this section outlines the structure of these keys and values.
All data change events produced by the MongoDB connector have a key and a value.
[NOTE]
====
Starting with Kafka 0.10, Kafka can optionally record with the message key and value the http://kafka.apache.org/documentation.html#upgrade_10_performance_impact[_timestamp_] at which the message was created (recorded by the producer) or written to the log by Kafka.
====
Debezium and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events could potentially change over time if the source of those events changed in structure or if the connector is improved or changed. This could be difficult for consumers to deal with, so to make it very easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a _schema_ and _payload_. The schema describes the structure of the payload, while the payload contains the actual data.
{ProductName} and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events could potentially change over time if the source of those events changed in structure or if the connector is improved or changed.
This could be difficult for consumers to deal with, so to make it very easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a _schema_ and _payload_. The schema describes the structure of the payload, while the payload contains the actual data.
[[change-events-key]]
[[mongodb-change-events-key]]
==== Change event's key
For a given collection, the change event's key will be a structure that contains a single `id` field. Its value will be the document's identifier represented as string which is derived from the https://docs.mongodb.com/manual/reference/mongodb-extended-json/[MongoDB extended JSON serialization in strict mode]. Consider a connector with a logical name of `fulfillment`, a replica set containing an `inventory` database with a `customers` collection containing documents such as:
For a given collection, the change event's key contains a single `id` field.
Its value is the document's identifier represented as string which is derived from the https://docs.mongodb.com/manual/reference/mongodb-extended-json/[MongoDB extended JSON serialization in strict mode]. Consider a connector with a logical name of `fulfillment`, a replica set containing an `inventory` database with a `customers` collection containing documents such as:
[source,json,indent=0]
----
@ -214,21 +274,25 @@ different types will get encoded as the event key's payload:
|Binary |BinData("a2Fma2E=",0)|`{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }`
|==========================================
ifndef::cdc-product[]
[WARNING]
====
As of Debezium 0.3.0, the Debezium MongoDB connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names]. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]), and the remaining characters in the logical server name and all characters in the database and collections names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.
As of Debezium 0.3.0, the MongoDB connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names]. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]), and the remaining characters in the logical server name and all characters in the database and collections names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.
This can lead to unexpected conflicts in schemas names when the logical server name, database names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores. The connector attempts to produce an exception in this such cases, but only when the conflicts exist between schemas used within a single connector.
====
endif::cdc-product[]
[[change-events-value]]
[[mongodb-change-events-value]]
==== Change event's value
The value of the change event message is a bit more complicated. Like the key message, it has a _schema_ section and _payload_ section. The payload section of every change event value produced by the MongoDB connector has an _envelope_ structure with the following fields:
The value of the change event message is a bit more complicated.
Like the key message, it has a _schema_ section and _payload_ section.
The payload section of every change event value produced by the MongoDB connector has an _envelope_ structure with the following fields:
* `op` is a mandatory field that contains a string value describing the type of operation. Values for the MongoDB connector are `c` for create (or insert), `u` for update, `d` for delete, and `r` for read (in the case of a initial sync).
* `after` is an optional field that if present contains the state of the document _after_ the event occurred. MongoDB's oplog entries only contain the full state of a document for _create_ events, so these are the only events that contain an _after_ field.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MongoDB contains several fields: the Debezium version, the logical name, the replica set's name, the namespace of the collection, the MongoDB timestamp (and ordinal of the event within the timestamp) at which the event occurred, the identifier of the MongoDB operation (e.g., the `h` field in the oplog event), and the initial sync flag if the event resulted during an intial sync.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of MongoDB contains several fields: the {ProductName} version, the logical name, the replica set's name, the namespace of the collection, the MongoDB timestamp (and ordinal of the event within the timestamp) at which the event occurred, the identifier of the MongoDB operation (e.g., the `h` field in the oplog event), and the initial sync flag if the event resulted during an intial sync.
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
And of course, the _schema_ portion of the event message's value contains a schema that describes this envelope structure and the nested fields within it.
@ -353,12 +417,15 @@ If we look at the `schema` portion of this event's _value_, we can see the schem
If we look at the `payload` portion of this event's _value_, we can see the information in the event, namely that it is describing that the document was read as part of an initial sync (since `op=r` and `initsync=true`), and that the `after` field value contains the JSON string representation of the document.
[TIP]
[NOTE]
====
It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message. It is possible and even recommended to use the link:/docs/faq/#avro-converter[Avro Converter] to dramatically decrease the size of the actual messages written to the Kafka topics.
It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message.
ifndef::cdc-product[]
It is possible and even recommended to use the link:/docs/faq/#avro-converter[Avro Converter] to dramatically decrease the size of the actual messages written to the Kafka topics.
endif::cdc-product[]
====
The value of an _update_ change event on this collection will actually have the exact same _schema_, and its payload will be structured the same but will hold different values. Specifically, an update event will not have an `after` value and will instead have a `patch` string containing the JSON representation of the idempotent update operation. Here's an example:
The value of an _update_ change event on this collection will actually have the exact same _schema_, and its payload is structured the same but will hold different values. Specifically, an update event will not have an `after` value and will instead have a `patch` string containing the JSON representation of the idempotent update operation. Here's an example:
[source,json,indent=0,subs="attributes"]
----
@ -389,24 +456,29 @@ When we compare this to the value in the _insert_ event, we see a couple of diff
* The `op` field value is now `u`, signifying that this document changed because of an update
* The `patch` field appears and has the stringified JSON representation of the actual MongoDB idempotent change to the document, which in this example involves setting the `first_name` field to a new value
* The `after` field no longer appears
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the oplog.
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the oplog
* The `ts_ms` shows the timestamp that {ProductName} processed this event
[WARNING]
====
The content of the `patch` field is provided by MongoDB itself and its exact format depends on the version.
ifndef::cdc-product[]
You can thus expect that the messages will not be same for MongoDB 3.4 and 3.6 and you should be careful
while upgrading the MongoDB instance to a new version.
All examples in this document were obtained from MongoDB 3.4 and might differ if you use a different one.
endif::cdc-product[]
====
[NOTE]
====
Once again, update events in MongoDB's oplog don't have the _before_ or _after_ states of the changed document, so there's no way for the Debezium connector to provide this information. However, because _create_ or _read_ events _do_ contain the starting state, downstream consumers of the stream can actually fully-reconstruct the state by keeping the latest state for each document and applying each event to that state. Debezium connector's are not able to keep such state, so it is not able to do this.
Update events in MongoDB's oplog don't have the _before_ or _after_ states of the changed document, so there's no way for the connector to provide this information.
However, because _create_ or _read_ events _do_ contain the starting state, downstream consumers of the stream can actually fully-reconstruct the state by keeping the latest state for each document and applying each event to that state. {ProductName} connector's are not able to keep such state, so it is not able to do this.
====
So far we've seen samples of _create_/_read_ and _update_ events. Now, let's look at the value of a _delete_ event for the same table. The value of an _delete_ event on this collection will also have the exact same _schema_, and its payload will be structured the same but will hold different values. In particular, a delete event will not have an `after` value or a `patch` value:
So far we've seen samples of _create_/_read_ and _update_ events. Now, let's look at the value of a _delete_ event for the same table.
The value of an _delete_ event on this collection will also have the exact same _schema_, and its payload is structured the same but will hold different values.
In particular, a delete event will not have an `after` value or a `patch` value:
[source,json,indent=0,subs="attributes"]
----
@ -436,124 +508,73 @@ When we compare this to the value in the other events, we see a couple of differ
* The `op` field value is now `d`, signifying that this document was deleted
* The `patch` field does not appear
* The `after` field does not appear
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the oplog.
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the oplog
* The `ts_ms` shows the timestamp that {ProductName} processed this event
The Debezium MongoDB connector actually provides one other kind of event. Each _delete_ event will be followed by a _tombstone_ event that has the same key but a `null` value, giving Kafka enough information to know that its https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Kafka log compaction] mechanism can remove _all_ messages with that key.
The MongoDB connector actually provides one other kind of event. Each _delete_ event is followed by a _tombstone_ event that has the same key but a `null` value, giving Kafka enough information to know that its https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Kafka log compaction] mechanism can remove _all_ messages with that key.
[NOTE]
====
All MongoDB connector events are designed to work with https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Kafka log compaction], which allows for the removal of older messages as long as at least the most recent message for every key is kept. This is how Kafka can reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.
All MongoDB connector events for a uniquely identified document will have exactly the same key, signalling to Kafka that only the latest event be kept. And, a tombstone event informs Kafka that _all_ messages with that same key can be removed.
All MongoDB connector events for a uniquely identified document will have exactly the same key, signaling to Kafka that only the latest event be kept.
And, a tombstone event informs Kafka that _all_ messages with that same key can be removed.
====
[[fault-tolerance]]
[[when-things-go-wrong]]
=== When things go wrong
Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then Debezium provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations Debezium (like Kafka) provides _at least once_ delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
==== Configuration and startup errors
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, or when the connector repeatedly fails to connect to MongoDB using the specified connectivity parameters. Reconnection is done using exponential backoff, and the maximum number of attempts is configurable.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.
==== MongoDB becomes unavailable
Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.
The attempts to reconnect are controlled by three properties:
* `connect.backoff.initial.delay.ms` - The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).
* `connect.backoff.max.delay.ms` - The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).
* `connect.max.attempts` - The maximum number of attempts before an error is produced, with a default of 16.
Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.
[width=75,cols="30%a,30%a,40%a",options="header,footer",role="table table-bordered table-striped"]
|=======================
|Reconnection attempt number
|Delay before attempt, in seconds
|Total delay before attempt, in minutes and seconds
|1 |1 |00:01
|2 |2 |00:03
|3 |4 |00:07
|4 |8 |00:15
|5 |16 |00:31
|6 |32 |01:03
|7 |64 |02:07
|8 |120|04:07
|9 |120|06:07
|10 |120|08:07
|11 |120|10:07
|12 |120|12:07
|13 |120|14:07
|14 |120|16:07
|15 |120|18:07
|16 |120|20:07
|=======================
==== Kafka Connect process stops gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There will be a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.
==== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will obviously terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the MongoDB connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events will depend on the offset flush period and the volume of data changes just before the crash.
[TIP]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium change are idempotent, so a sequence of events always results in the same state.
Debezium also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event's unique transaction identifier (`h`) and timestamp (`sec` and `ord`). Consumers can keep track of other of these values to know whether it has already seen a particular event.
====
==== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
==== Connector is stopped for a duration
If the connector is gracefully stopped, the replica sets can continue to be used and any new changes will be recorded in MongoDB's oplog. When the connector is restarted, it will resume reading the oplog for each replica set where it last left off, recording change events for all of the changes that were made while the connector was stopped. If the connector is stopped long enough such that MongoDB purges from its oplog some operations that the connector has not read, then upon startup the connector will perform an initial sync.
A properly configured Kafka cluster is able to https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines[massive throughput]. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in MongoDB.
[NOTE]
====
If the connector remains stopped for long enough, MongoDB might purge older oplog files and the connector's last position may be lost. In this case, when the connector configured with _initial_ snapshot mode (the default) is finally restarted, the MongoDB server will no longer have the starting point and the connector will fail with an error.
====
==== MongoDB loses writes
It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, it's oplog is missing the last changes that the old primary had recorded and no longer has those changes.
In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes. At this time, there is no way to prevent this side effect of MongoDB.
[[configuration]]
[[deploying-a-connector]]
== Deploying a connector
[[mongodb-deploying-a-connector]]
== Deploying the MongoDB connector
ifndef::cdc-product[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and http://kafka.apache.org/documentation.html#connect[Kafka Connect], then using Debezium's MongoDB connector is easy. Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/0.3.0/debezium-connector-mongodb-0.3.0-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to http://docs.confluent.io/3.0.0/connect/userguide.html#installing-connector-plugins[Kafka Connect's classpath]. Restart your Kafka Connect process to pick up the new JARs.
endif::cdc-product[]
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[Debezium's Docker images] for Zookeeper, Kafka, and Kafka Connect with the MongoDB connector already pre-installed and ready to go. Our xref:tutorial.adoc[tutorial] even walks you through using these images, and this is a great way to learn what Debezium is all about. You can even link:/blog/2016/05/31/Debezium-on-Kubernetes/[run Debezium on Kubernetes and OpenShift].
ifdef::cdc-product[]
Installing the MongoDB connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
To use the connector to produce change events for a particular MongoDB replica set or sharded cluster, simply create a link:#configuration[configuration file for the MongoDB Connector] and use the link:http://docs.confluent.io/3.0.0/connect/userguide.html#rest-interface[Kafka Connect REST API] to add that connector to your Kafka Connect cluster. When the connector starts, it will perform an initial sync of the collections in your MongoDB replica set(s) and start reading the replica sets' oplogs, producing events for every inserted, updated, and deleted row. Optionally filter out collections that are not needed.
.Prerequisites
* You have link:https://zookeeper.apache.org/[Zookeeper], link:http://kafka.apache.org/[Kafka], and link:http://kafka.apache.org/documentation.html#connect[Kafka Connect] installed.
* You have MongoDB installed and setup.
.Procedure
. Visit link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?downloadType=distributions&product=jboss.amq.streams[Product Downloads^] on the Red Hat Customer Portal and download the MongoDB connector.
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
+
[source]
----
plugin.path=/kafka/connect
----
NOTE: The above example assumes you have extracted the {prodname} MongoDB connector to the `/kafka/connect/{prodname}-connector-mongodb` path.
[start=4]
. Restart your Kafka Connect process. This ensures the new JARs are picked up.
.Additional resources
For more information on the deployment process, and deploying connectors with AMQ Streams, refer to the {ProductName} installation guides.
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_openshift[Installing {ProductName} on OpenShift]
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_rhel[Installing {ProductName} on RHEL]
endif::cdc-product[]
ifndef::cdc-product[]
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[Debezium's Docker images] for Zookeeper, Kafka, and Kafka Connect with the MongoDB connector already pre-installed and ready to go.
Our xref:tutorial.adoc[tutorial] even walks you through using these images, and this is a great way to learn what Debezium is all about. You can even link:/blog/2016/05/31/Debezium-on-Kubernetes/[run Debezium on Kubernetes and OpenShift].
endif::cdc-product[]
[[example]]
[[example-configuration]]
[[mongodb-example-configuration]]
=== Example configuration
Using the MongoDB connector is straightforward. Here is an example of the configuration for a MongoDB connector that monitors a MongoDB replica set `rs0` at port 27017 on 192.168.99.100, which we logically name `fullfillment`:
To use the connector to produce change events for a particular MongoDB replica set or sharded cluster, create a configuration file in JSON.
When the connector starts, it will perform an initial sync of the collections in your MongoDB replica sets and start reading the replica sets' oplogs, producing events for every inserted, updated, and deleted row.
Optionally filter out collections that are not needed.
Here is an example of the configuration for a MongoDB connector that monitors a MongoDB replica set `rs0` at port 27017 on 192.168.99.100, which we logically name `fullfillment`:
[source,json]
----
@ -570,15 +591,15 @@ Using the MongoDB connector is straightforward. Here is an example of the config
<1> The name of our connector when we register it with a Kafka Connect service.
<2> The name of the MongoDB connector class.
<3> The host addresses to use to connect to the MongoDB replica set
<4> The _logical name_ of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the link:#avro-converter[Avro Connector] is used.
<4> The _logical name_ of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
<5> A list of regular expressions that match the collection namespaces (e.g., <dbName>.<collectionName>) of all collections to be monitored. This is optional.
See the link:#connector-properties[complete list of connector properties] that can be specified in these configurations.
See the link:#mongodb-connector-properties[complete list of connector properties] that can be specified in these configurations.
This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the MongoDB replica set or sharded cluster, assign tasks for each replica set, perform an initial sync if necessary, read the oplog, and record events to Kafka topics.
[[connector-properties]]
[[mongodb-connector-properties]]
=== Connector properties
The following configuration properties are _required_ unless a default value is available.
@ -603,7 +624,7 @@ The following configuration properties are _required_ unless a default value is
|`mongodb.name`
|
|A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
|A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one {ProductName} connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
|`mongodb.user`
|
@ -627,19 +648,20 @@ The following configuration properties are _required_ unless a default value is
|`database.whitelist`
|_empty string_
|An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with `database.blacklist`.
|An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist is excluded from monitoring. By default all databases is monitored.
May not be used with `database.blacklist`.
|`database.blacklist`
|_empty string_
|An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with `database.whitelist`.
|An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist is monitored. May not be used with `database.whitelist`.
|`collection.whitelist`
|_empty string_
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist will be excluded from monitoring. Each identifier is of the form _databaseName_._collectionName_. By default the connector will monitor all collections except those in the `local` and `admin` databases. May not be used with `collection.blacklist`.
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist is excluded from monitoring. Each identifier is of the form _databaseName_._collectionName_. By default the connector will monitor all collections except those in the `local` and `admin` databases. May not be used with `collection.blacklist`.
|`collection.blacklist`
|_empty string_
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist will be monitored. Each identifier is of the form _databaseName_._collectionName_. May not be used with `collection.whitelist`.
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist is monitored. Each identifier is of the form _databaseName_._collectionName_. May not be used with `collection.whitelist`.
|`snapshot.mode`
|`initial`
@ -719,11 +741,11 @@ The following _advanced_ configuration properties have good defaults that will w
|`source.struct.version`
|v2
|Schema version for the `source` block in CDC events; Debezium 0.10 introduced a few breaking +
|Schema version for the `source` block in CDC events. Debezium 0.10 introduced a few breaking +
changes to the structure of the `source` block in order to unify the exposed structure across
all the connectors. +
By setting this option to `v1` the structure used in earlier versions can be produced.
Note that this setting is not recommended and is planned for removal in a future Debezium version.
Note that this setting is not recommended and is planned for removal in a future {ProductName} version.
|`heartbeat.interval.ms`
|`0`
@ -732,7 +754,7 @@ This property contains an interval in milli-seconds that defines how frequently
This can be used to monitor whether the connector is still receiving change events from the database.
You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time.
In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka,
which in turn means that no offset updates will be committed to Kafka.
which in turn means that no offset updates are committed to Kafka.
This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to `0` to not send heartbeat messages at all. +
@ -745,7 +767,104 @@ The topic is named according to the pattern `<heartbeat.topics.prefix>.<server.n
|`sanitize.field.names`
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
|Whether field names will be sanitized to adhere to Avro naming requirements.
|Whether field names are sanitized to adhere to Avro naming requirements.
ifndef::cdc-product[]
See xref:configuration/avro.adoc#names[Avro naming] for more details.
endif::cdc-product[]
|=======================
[[fault-tolerance]]
[[mongodb-when-things-go-wrong]]
== MongoDB connector common issues
{ProductName} is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then {ProductName} provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations {ProductName} (like Kafka) provides _at least once_ delivery of change events.
The rest of this section describes how {ProductName} handles various kinds of faults and problems.
=== Configuration and startup errors
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, or when the connector repeatedly fails to connect to MongoDB using the specified connectivity parameters. Reconnection is done using exponential backoff, and the maximum number of attempts is configurable.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.
=== MongoDB becomes unavailable
Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.
The attempts to reconnect are controlled by three properties:
* `connect.backoff.initial.delay.ms` - The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).
* `connect.backoff.max.delay.ms` - The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).
* `connect.max.attempts` - The maximum number of attempts before an error is produced, with a default of 16.
Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.
[width=75,cols="30%a,30%a,40%a",options="header,footer",role="table table-bordered table-striped"]
|=======================
|Reconnection attempt number
|Delay before attempt, in seconds
|Total delay before attempt, in minutes and seconds
|1 |1 |00:01
|2 |2 |00:03
|3 |4 |00:07
|4 |8 |00:15
|5 |16 |00:31
|6 |32 |01:03
|7 |64 |02:07
|8 |120|04:07
|9 |120|06:07
|10 |120|08:07
|11 |120|10:07
|12 |120|12:07
|13 |120|14:07
|14 |120|16:07
|15 |120|18:07
|16 |120|20:07
|=======================
=== Kafka Connect process stops gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off.
There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.
=== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets.
When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes.
However, the MongoDB connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash.
The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
[NOTE]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. {ProductName} changes are idempotent, so a sequence of events always results in the same state.
{ProductName} also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event's unique transaction identifier (`h`) and timestamp (`sec` and `ord`). Consumers can keep track of other of these values to know whether it has already seen a particular event.
====
=== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
=== Connector is stopped for a duration
If the connector is gracefully stopped, the replica sets can continue to be used and any new changes are recorded in MongoDB's oplog.
When the connector is restarted, it will resume reading the oplog for each replica set where it last left off, recording change events for all of the changes that were made while the connector was stopped. If the connector is stopped long enough such that MongoDB purges from its oplog some operations that the connector has not read, then upon startup the connector will perform an initial sync.
A properly configured Kafka cluster is capable of https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines[massive throughput].
Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in MongoDB.
[NOTE]
====
If the connector remains stopped for long enough, MongoDB might purge older oplog files and the connector's last position may be lost.
In this case, when the connector configured with _initial_ snapshot mode (the default) is finally restarted, the MongoDB server will no longer have the starting point and the connector will fail with an error.
====
=== MongoDB loses writes
It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, it's oplog is missing the last changes that the old primary had recorded and no longer has those changes.
In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes. At this time, there is no way to prevent this side effect of MongoDB.

View File

@ -1,4 +1,8 @@
= Debezium Connector for PostgreSQL
[id="debezium-connector-for-postgresql"]
= {ProductName} Connector for PostgreSQL
ifndef::cdc-product[]
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
@ -7,20 +11,37 @@ include::../_attributes.adoc[]
:source-highlighter: highlight.js
toc::[]
endif::cdc-product[]
Debezium's PostgreSQL Connector can monitor and record the row-level changes in the schemas of a PostgreSQL database.
ifdef::cdc-product[]
[IMPORTANT]
====
Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process.
For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope^].
====
endif::cdc-product[]
{ProductName}'s PostgreSQL Connector can monitor and record row-level changes in the schemas of a PostgreSQL database.
The first time it connects to a PostgreSQL server/cluster, it reads a consistent snapshot of all of the schemas. When that snapshot is complete, the connector continuously streams the changes that were committed to PostgreSQL 9.6 or later and generates corresponding insert, update and delete events. All of the events for each table are recorded in a separate Kafka topic, where they can be easily consumed by applications and services.
ifndef::cdc-product[]
The connector has been tested and works with Postgres 9.6, 10, 11 and 12.
endif::cdc-product[]
[[overview]]
== Overview
PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-explanation.html[_logical decoding_] feature was first introduced in version 9.4 and is a mechanism which allows the extraction of the changes which were committed to the transaction log and the processing of these changes in a user-friendly manner via the help of an https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_output plugin_]. This output plugin must be installed prior to running the PostgreSQL server and enabled together with a replication slot in order for clients to be able to consume the changes.
Debezium's PostgreSQL connector contains two different parts which work together in order to be able to read and process server changes:
PostgreSQL connector contains two different parts which work together in order to be able to read and process server changes:
ifdef::cdc-product[]
* A logical decoding output plugin, which has to be installed and configured in the PostgreSQL server.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the plugin, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
endif::cdc-product[]
ifndef::cdc-product[]
* a logical decoding output plugin which has to be installed and configured in the PostgreSQL server, one of
** https://github.com/debezium/postgres-decoderbufs[decoderbufs] (maintained by the Debezium community, based on ProtoBuf)
** https://github.com/eulerto/wal2json[wal2json] (maintained by the wal2json community, based on JSON)
@ -28,6 +49,7 @@ Debezium's PostgreSQL connector contains two different parts which work together
this plug-in is always present, meaning that no additional libraries must be installed,
and the Debezium connector will interpret the raw replication event stream into change events directly.
* Java code (the actual Kafka Connect connector) which reads the changes produced by the chosen plugin, using PostgreSQL's https://www.postgresql.org/docs/current/static/logicaldecoding-walsender.html[_streaming replication protocol_], via the PostgreSQL https://github.com/pgjdbc/pgjdbc[_JDBC driver_]
endif::cdc-product[]
The connector then produces a _change event_ for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
@ -35,6 +57,16 @@ PostgreSQL normally purges WAL segments after some period of time. This means th
The connector is also tolerant of failures. As the connector reads changes and produces events, it records the position in the write-ahead log with each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the WAL where it last left off. This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
ifdef::cdc-product[]
[[output-plugin]]
=== Logical decoding output plugin
The `pgoutput` logical decoder is the only supported logical decoder in the Tecnhology Preview release of {ProductName}.
`pgoutput`, the standard logical decoding plug-in in PostgreSQL 10+, is maintained by the Postgres community, and is also used by Postgres for https://www.postgresql.org/docs/current/logical-replication-architecture.html[logical replication].
The `pgoutput` plug-in is always present, meaning that no additional libraries must be installed,
and the connector will interpret the raw replication event stream into change events directly.
endif::cdc-product[]
[[limitations]]
[IMPORTANT]
====
@ -47,22 +79,56 @@ Please be aware of the following limitations which are also reflected by the con
[IMPORTANT]
====
Debezium currently supports only database with UTF-8 character encoding.
{ProductName} currently supports only databases with UTF-8 character encoding.
With a single byte character encoding it is not possible to correctly process strings containing extended ASCII code characters.
====
[[setting-up-PostgreSQL]]
== Setting up PostgreSQL
Before using the Debezium PostgreSQL connector to monitor the changes committed on a PostgreSQL server,
ifdef::cdc-product[]
The Technology Preview release of {ProductName} only supports the native pgoutput logical replication stream.
To set up PostgreSQL using pgoutput, you will need to enable a replication slot, and configure a user with sufficient privileges to perform the replication.
=== Configuring the replication slot
PostgreSQL's logical decoding uses replication slots.
First, you configure the replication slot:
.postgresql.conf
[source]
----
wal_level=logical
max_wal_senders=1
max_replication_slots=1
----
* `wal_level` tells the server to use logical decoding with the write-ahead log
* `max_wal_senders` tells the server to use a maximum of 1 separate processes for processing WAL changes
* `max_replication_slots` tells the server to allow a maximum of 1 replication slots to be created for streaming WAL changes
Replication slots are guaranteed to retain all WAL required for {ProductName} even during {ProductName} outages.
It is important for this reason to closely monitor replication slots to avoid too much disk consumption and other conditions that can happen such as catalog bloat if a replication slot stays unused for too long.
For more information, refer to the https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS[the Postgres documentation].
[NOTE]
====
We recommend reading and understanding the https://www.postgresql.org/docs/current/static/wal-configuration.html[WAL configuration documentation] regarding the mechanics and configuration of the PostgreSQL write-ahead log.
====
endif::cdc-product[]
ifndef::cdc-product[]
Before using the PostgreSQL connector to monitor the changes committed on a PostgreSQL server,
first decide which logical decoder method you intend to use.
If you plan *not* to use the native pgoutput logical replication stream support,
then you will need to install the logical decoding plugin into the PostgreSQL server.
Afterward enable a replication slot, and configure a user with sufficient privileges to perform the replication.
Note that *if your database is hosted by a service* such as https://www.heroku.com/postgres[Heroku Postgres] you may be unable to install the plugin.
Note that if your database is hosted by a service such as https://www.heroku.com/postgres[Heroku Postgres] you may be unable to install the plugin.
If so, and if you're using PostgreSQL 10+, you can use the pgoutput decoder support to monitor your database.
If that isn't an option, you'll be unable to monitor your database with Debezium.
If that isn't an option, you'll be unable to monitor your database with {ProductName}.
[[amazon-rds]]
=== PostgreSQL on Amazon RDS
@ -73,7 +139,7 @@ It is possible to monitor PostgreSQL database running in https://aws.amazon.com/
* Verify that `wal_level` parameter is set to `logical` by running the query `SHOW wal_level` as DB master user; this might not be the case in multi-zone replication setups.
You cannot set this option manually, it's (https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_WorkingWithParamGroups.html[automatically changed]) when the `rds.logical_replication` is set to `1`.
If the `wal_level` is not `logical` after the change above, it's probably because the instance has to be restarted due to the parameter group change, it'll happen accordingly to your maintenance window or can be done manually.
* Set `plugin.name` Debezium parameter to `wal2json`. You can skip this on PostgreSQL 10+ if you wish to use pgoutput logical replication stream support.
* Set `plugin.name` {ProductName} parameter to `wal2json`. You can skip this on PostgreSQL 10+ if you wish to use pgoutput logical replication stream support.
* Use database master account for replication as RDS currently does not support setting of `REPLICATION` privilege for another account.
[IMPORTANT]
@ -91,6 +157,7 @@ As of January 2019, the following Postgres versions on RDS come with an up-to-da
* Postgres 11: any version
====
[[output-plugin]]
=== Installing the Logical Decoding Output Plug-in
@ -107,7 +174,7 @@ This means that a logical decoding output plug-in is no longer necessary and cha
As of PostgreSQL 9.4, the only way to read changes to the write-ahead-log is to first install a logical decoding output plugin. Plugins are written in C, compiled, and installed on the machine which runs the PostgreSQL server. Plugins use a number of PostgreSQL specific APIs, as described by the https://www.postgresql.org/docs/current/static/logicaldecoding-output-plugin.html[_PostgreSQL documentation_].
Debezium's PostgreSQL connector works with one of Debezium's supported logical decoding plugin to encode the changes in either https://github.com/google/protobuf[_Protobuf format_] or http://www.json.org/[_JSON_] format.
PostgreSQL connector works with one of Debezium's supported logical decoding plugin to encode the changes in either https://github.com/google/protobuf[_Protobuf format_] or http://www.json.org/[_JSON_] format.
See the documentation of your chosen plugin (https://github.com/debezium/postgres-decoderbufs/blob/master/README.md[_protobuf_], https://github.com/eulerto/wal2json/blob/master/README.md[_wal2json_]) to learn more about the plugin's requirements, limitations, and how to compile it.
For simplicity, Debezium also provides a Docker image based on a vanilla PostgreSQL server image on top of which it compiles and installs the plugins. We recommend https://github.com/debezium/docker-images/tree/master/postgres/9.6[_using this image_] as an example of the detailed steps required for the installation.
@ -130,14 +197,17 @@ So far these differences have been identified:
* wal2json should be used with setting the `schema.refresh.mode` connector option to `columns_diff_exclude_unchanged_toast`;
otherwise, when receiving a change event for a row containing an unchanged TOAST column, no field for that column will be contained in the emitted change event's `after` structure.
This is because wal2json's messages won't contain a field for such column.
The requirement for adding this is tracked under the wal2json https://github.com/eulerto/wal2json/issues/98[issue 98].
See the documentation of `columns_diff_exclude_unchanged_toast` further below for implications of using it.
* pgoutput plug-in does not emit events for tables without primary keys
All up-to-date differences are tracked in a test suite https://github.com/debezium/debezium/blob/master/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/DecoderDifferences.java[Java class].
[[server-configuration]]
==== Configuring the PostgreSQL Server
=== Configuring the PostgreSQL Server
If you are using one of the supported link:#output-plugin[logical decoding plug-ins] (i.e. not pgoutput) and it has been installed,
configure the server to load the plugin at startup:
@ -177,10 +247,13 @@ Otherwise, its default value will be applied which adds a latency of about 200 m
====
We strongly recommend reading and understanding https://www.postgresql.org/docs/current/static/wal-configuration.html[the official documentation] regarding the mechanics and configuration of the PostgreSQL write-ahead log.
====
endif::cdc-product[]
[[PostgreSQL-permissions]]
=== Setting up Permissions
Next, configure a database user who can perform replications.
Replication can only be performed by a database user that has appropriate permissions and only for a configured number of hosts.
In order to give a user replication permissions, define a PostgreSQL role that has _at least_ the `REPLICATION` and `LOGIN` permissions. For example:
@ -190,42 +263,44 @@ In order to give a user replication permissions, define a PostgreSQL role that h
CREATE ROLE name REPLICATION LOGIN;
----
[TIP]
[NOTE]
====
Superusers have by default both of the above roles.
====
Finally, configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running:
Finally, configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the PostgreSQL connector is running:
*pg_hba.conf*
.pg_hba.conf
[source]
----
local replication <youruser> trust //<1>
host replication <youruser> 127.0.0.1/32 trust //<2>
host replication <youruser> ::1/128 trust //<3>
----
<1> tells the server to allow replication for `<youruser>` locally (i.e. on the server machine)
<2> tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV4`
<3> tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV6`
<1> Tells the server to allow replication for `<youruser>` locally (i.e. on the server machine)
<2> Tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV4`
<3> Tells the server to allow `<youruser>` on `localhost` to receive replication changes using `IPV6`
[TIP]
[NOTE]
====
See https://www.postgresql.org/docs/current/static/datatype-net-types.html[_the PostgreSQL documentation_] for more information on network masks.
====
ifndef::cdc-product[]
[[supported-PostgreSQL-topologies]]
== Supported PostgreSQL Topologies
The PostgreSQL connector can be used with a standalone PostgreSQL server or with a cluster of PostgreSQL servers.
As mentioned link:#limitations[in the beginning], PostgreSQL (for all versions <= 12) only supports logical replication slots on `primary` servers. This means that a replica in a PostgreSQL cluster cannot be configured for logical replication, and consequently that the Debezium PostgreSQL Connector can only connect and communicate with the primary server. Should this server fail, the connector will stop. When the cluster is repaired, if the original primary server is once again promoted to `primary`, the connector can simply be restarted. However, if a different PostgreSQL server _with the plugin and proper configuration_ is promoted to `primary`, the connector configuration must be changed to point to the new `primary` server and then can be restarted.
endif::cdc-product[]
[[wal-disk-space]]
== WAL Disk Space Consumption
=== WAL Disk Space Consumption
In certain cases, it is possible that PostgreSQL disk space consumed by WAL files either experiences spikes or increases out of usual proportions.
There are three potential reasons that explain the situation:
* Debezium regularly confirms LSN of processed events to the database.
* {ProductName} regularly confirms LSN of processed events to the database.
This is visible as `confirmed_flush_lsn` in the `pg_replication_slots` slots table.
The database is responsible for reclaiming the disk space and the WAL size can be calculated from `restart_lsn` of the same table.
So if the `confirmed_flush_lsn` is regularly increasing and `restart_lsn` lags then the database does need to reclaim the space.
@ -233,25 +308,28 @@ Disk space is usually reclaimed in batch blocks so this is expected behaviour an
* There are many updates in a monitored database but only a minuscule amount relates to the monitored table(s) and/or schema(s).
This situation can be easily solved by enabling periodic heartbeat events using `heartbeat.interval.ms` configuration option.
* The PostgreSQL instance contains multiple databases where one of them is a high-traffic database.
Debezium monitors another database that is low-traffic in comparison to the other one.
Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked.
As WAL is shared by all databases it tends to grow until an event is emitted by the database monitored by Debezium.
{ProductName} monitors another database that is low-traffic in comparison to the other one.
{ProductName} then cannot confirm the LSN as replication slots work per-database and {ProductName} is not invoked.
As WAL is shared by all databases it tends to grow until an event is emitted by the database monitored by {ProductName}.
To overcome the third cause it is necessary to
* enable periodic heartbeat record generation using the `heartbeat.interval.ms` configuration option
* regularly emit change events from the database tracked by Debezium
* regularly emit change events from the database tracked by {ProductName}
ifndef::cdc-product[]
** In the case of `wal2json` decoder plugin, it is sufficient to generate empty events.
This can be achieved for example by truncating an empty temporary table.
** For other decoder plug-ins, it is recommended to create a supplementary table that is not monitored by Debezium.
A separate process would then periodically update the table (either inserting a new event or updating the same row all over).
PostgreSQL then will invoke Debezium which will confirm the lastest LSN and allow the database to reclaim the WAL space.
endif::cdc-product[]
A separate process would then periodically update the table (either inserting a new event or updating the same row all over).
PostgreSQL then will invoke {ProductName} which will confirm the latest LSN and allow the database to reclaim the WAL space.
[[how-the-postgresql-connector-works]]
== How the PostgreSQL connector works
=== How the PostgreSQL connector works
[[snapshots]]
=== Snapshots
==== Snapshots
Most PostgreSQL servers are configured to not retain the complete history of the database in the WAL segments, so the PostgreSQL connector would be unable to see the entire history of the database by simply reading the WAL. So, by default the connector will upon first startup perform an initial _consistent snapshot_ of the database. Each snapshot consists of the following steps (when using the builtin snapshot modes, *custom* snapshot modes may override this):
@ -272,8 +350,10 @@ The fourth snapshot mode, *initial only*, will perform a database snapshot and t
The fifth snapshot mode, *exported*, will perform a database snapshot based on the point in time when the replication slot was created. This mode is an excellent way to perform a snapshot in a lock-free way.
ifndef::cdc-product[]
The final snapshot mode, *custom*, allows the user to inject their own implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface via the `snapshot.custom.class` configuration property, with the class on the classpath of your Kafka Connect cluster (or included in the JAR if using the `EmbeddedEngine`). For more details, see the link:#custom-snapshot[Custom Snapshot] section.
[[custom-snapshot]]
=== Custom Snapshotter SPI
@ -281,7 +361,7 @@ For more advanced usages, the user can provide an implementation of the `io.debe
The full API of the interface can be seen here:
[source,java,indent=0]
[source,java,indent=0,subs="+attributes"]
----
**
* This interface is used to determine details about the snapshot process:
@ -291,7 +371,7 @@ The full API of the interface can be seen here:
* - Should streaming occur
* - What queries should be used to snapshot
*
* While many default snapshot modes are provided with debezium (see documentation for details)
* While many default snapshot modes are provided with {ProductName}
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
@ -362,15 +442,16 @@ public interface Snapshotter {
----
All of the builtin snapshot modes are implemented in terms of this interface as well.
endif::cdc-product[]
[[streaming-changes]]
=== Streaming Changes
==== Streaming Changes
The PostgreSQL connector will typically spend the vast majority of its time streaming changes from the PostgreSQL server to which it is connected. This mechanism relies on https://www.postgresql.org/docs/current/static/protocol-replication.html[_PostgreSQL's replication protocol_] where the client can receive changes from the server as they are committed in the server's transaction log at certain positions (also known as `Log Sequence Numbers` or in short LSNs).
Whenever the server commits a transaction, a separate server process invokes a callback function from the link:#output-plugin[logical decoding plugin]. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of Debezium plugin) and writes them on an output stream which can then be consumed by clients.
Whenever the server commits a transaction, a separate server process invokes a callback function from the link:#output-plugin[logical decoding plugin]. This function processes the changes from the transaction, converts them to a specific format (Protobuf or JSON in the case of {ProductName} plugin) and writes them on an output stream which can then be consumed by clients.
The PostgreSQL connector acts as a PostgreSQL client, and when it receives these changes it transforms the events into Debezium _create_, _update_, or _delete_ events that include the LSN position of the event. The PostgreSQL connector forwards these change events to the Kafka Connect framework (running in the same process), which then asynchronously writes them in the same order to the appropriate Kafka topic. Kafka Connect uses the term _offset_ for the source-specific position information that Debezium includes with each event, and Kafka Connect periodically records the most recent offset in another Kafka topic.
The PostgreSQL connector acts as a PostgreSQL client, and when it receives these changes it transforms the events into {ProductName} _create_, _update_, or _delete_ events that include the LSN position of the event. The PostgreSQL connector forwards these change events to the Kafka Connect framework (running in the same process), which then asynchronously writes them in the same order to the appropriate Kafka topic. Kafka Connect uses the term _offset_ for the source-specific position information that {ProductName} includes with each event, and Kafka Connect periodically records the most recent offset in another Kafka topic.
When Kafka Connect gracefully shuts down, it stops the connectors, flushes all events to Kafka, and records the last offset received from each connector. Upon restart, Kafka Connect reads the last recorded offset for each connector, and starts the connector from that point. The PostgreSQL connector uses the LSN recorded in each change event as the offset, so that upon restart the connector requests the PostgreSQL server send it the events starting just after that position.
@ -382,17 +463,17 @@ If the primary key definition of a table changes (by adding, removing or renamin
then there exists a slight risk of an unfortunate timing when the primary key information from JDBC
will not be synchronized with the change data in the logical decoding event and a small amount of messages will be created with an inconsistent key structure.
If this happens then a restart of the connector and a reprocessing of the messages will fix the issue.
To prevent the issue completely it is recommended to synchronize updates to the primary key structure with Debezium roughly using following sequence of operations:
To prevent the issue completely it is recommended to synchronize updates to the primary key structure with {ProductName} roughly using following sequence of operations:
* Put the database or an application into a read-only mode
* Let Debezium process all remaining events
* Stop Debezium
* Let {ProductName} process all remaining events
* Stop {ProductName}
* Update the primary key definition
* Put the database or the application into read/write state and start Debezium again
* Put the database or the application into read/write state and start {ProductName} again
====
[[pgoutput]]
=== PostgreSQL 10+ Logical Decoding Support (pgoutput)
==== PostgreSQL 10+ Logical Decoding Support (pgoutput)
As of PostgreSQL 10+, a new logical replication stream mode was introduced, called _pgoutput_. This logical replication stream mode is natively supported by PostgreSQL,
which means that this connector can consume that replication stream
@ -402,7 +483,7 @@ This is particularly valuable for environments where installation of plug-ins is
See link:#setting-up-PostgreSQL[Setting up PostgreSQL] for more details.
[[topic-names]]
=== Topics Names
==== Topics Names
The PostgreSQL connector writes events for all insert, update, and delete operations on a single table to a single Kafka topic. By default, the Kafka topic name is _serverName_._schemaName_._tableName_ where _serverName_ is the logical name of the connector as specified with the `database.server.name` configuration property, _schemaName_ is the name of the database schema where the operation occurred, and _tableName_ is the name of the database table on which the operation occurred.
@ -421,7 +502,7 @@ If on the other hand the tables were not part of a specific schema but rather cr
* `fulfillment.public.orders`
[[meta-info]]
=== Meta Information
==== Meta Information
Each `record` produced by the PostgreSQL connector has, in addition to the link:#events[_database event_], some meta-information about where the event occurred on the server, the name of the source partition and the name of the Kafka topic and partition where the event should be placed:
@ -447,7 +528,7 @@ The `sourceOffset` portion of the message contains information about the locatio
* `ts_ms` represents the number of microseconds since Unix Epoch as the server time at which the transaction was committed
[[events]]
=== Events
==== Events
All data change events produced by the PostgreSQL connector have a key and a value, although the structure of the key and value depend on the table from which the change events originated (see link:#topic-names[Topic names]).
@ -458,15 +539,15 @@ Starting with Kafka 0.10, Kafka can optionally record with the message key and v
[WARNING]
====
The Debezium PostgreSQL connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names]. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]), and the remaining characters in the logical server name and all characters in the schema and table names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.
The PostgreSQL connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names]. This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]), and the remaining characters in the logical server name and all characters in the schema and table names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]). If not, then all invalid characters will automatically be replaced with an underscore character.
This can lead to unexpected conflicts when the logical server name, schema names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores.
====
Debezium and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a _schema_ and _payload_. The schema describes the structure of the payload, while the payload contains the actual data.
{ProductName} and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a _schema_ and _payload_. The schema describes the structure of the payload, while the payload contains the actual data.
[[change-events-key]]
==== Change Event's Key
===== Change Event's Key
For a given table, the change event's key will have a structure that contains a field for each column in the primary key (or unique key constraint with `REPLICA IDENTITY` set to `FULL` or `USING INDEX` on the table) of the table at the time the event was created.
@ -524,7 +605,7 @@ If the table does not have a primary or unique key, then the change event's key
====
[[change-events-value]]
==== Change Event's Value
===== Change Event's Value
The value of the change event message is a bit more complicated. Like the message key, it has a _schema_ section and _payload_ section. The payload section of every change event value produced by the PostgreSQL connector has an _envelope_ structure with the following fields:
@ -537,7 +618,7 @@ Whether or not this field is available is highly dependent on the link:#replica-
====
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `PostgreSQL_server.public.customers.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of PostgreSQL contains several fields: the Debezium version, the connector name, the name of the affected database, schema and table, whether the event is part of an ongoing snapshot or not and the same fields from the record's link:#meta-info[_meta information_] section
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of PostgreSQL contains several fields: the {ProductName} version, the connector name, the name of the affected database, schema and table, whether the event is part of an ongoing snapshot or not and the same fields from the record's link:#meta-info[_meta information_] section
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
And of course, the _schema_ portion of the event message's value contains a schema that describes this envelope structure and the nested fields within it.
@ -728,16 +809,20 @@ Let's look at what a _create_ event value might look like for our `customers` ta
If we look at the `schema` portion of this event's _value_, we can see the schema for the _envelope_, the schema for the `source` structure (which is specific to the PostgreSQL connector and reused across all events), and the table-specific schemas for the `before` and `after` fields.
[TIP]
[NOTE]
====
The names of the schemas for the `before` and `after` fields are of the form _logicalName_._schemaName_._tableName_.Value, and thus are entirely independent from all other schemas for all other tables. This means that when using the link:/docs/faq/#avro-converter[Avro Converter], the resulting Avro schemas for _each table_ in each _logical source_ have their own evolution and history.
The names of the schemas for the `before` and `after` fields are of the form _logicalName_._schemaName_._tableName_.Value, and thus are entirely independent from all other schemas for all other tables.
This means that when using the Avro Converter, the resulting Avro schemas for _each table_ in each _logical source_ have their own evolution and history.
====
If we look at the `payload` portion of this event's _value_, we can see the information in the event, namely that it is describing that the row was created (since `op=c`), and that the `after` field value contains the values of the new inserted row's' `id`, `first_name`, `last_name`, and `email` columns.
[TIP]
[NOTE]
====
It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message. It is possible and even recommended to use the link:/docs/faq/#avro-converter[Avro Converter] to dramatically decrease the size of the actual messages written to the Kafka topics.
It may appear that the JSON representations of the events are much larger than the rows they describe. This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message.
It is possible and even recommended to use the Avro Converter to dramatically decrease the size of the actual messages written to the Kafka topics.
====
[[update-events]]
@ -782,21 +867,20 @@ When we compare this to the value in the _insert_ event, we see a couple of diff
* The `op` field value is now `u`, signifying that this row changed because of an update
* The `before` field now has the state of the row with the values before the database commit, but only for the primary key column `id`. This is because the link:#replica-identity[_REPLICA IDENTITY_] which is by default `DEFAULT`.
[TIP]
[NOTE]
====
Should we want to see the previous values of all the columns for the row, we would have to change the `customers` table first by running
`ALTER TABLE customers REPLICA IDENTITY FULL`
Should we want to see the previous values of all the columns for the row, we would have to change the `customers` table first by running `ALTER TABLE customers REPLICA IDENTITY FULL`
====
* The `after` field now has the updated state of the row, and here was can see that the `first_name` value is now `Anne Marie`.
* The `source` field structure has the same fields as before, but the values are different since this event is from a different position in the WAL.
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `ts_ms` shows the timestamp that {ProductName} processed this event.
There are several things we can learn by just looking at this `payload` section. We can compare the `before` and `after` structures to determine what actually changed in this row because of the commit. The `source` structure tells us information about PostgreSQL's record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same PostgreSQL commit as other events.
[NOTE]
====
When the columns for a row's primary/unique key are updated, the value of the row's key has changed so Debezium will output _three_ events: a `DELETE` event and link:#tombstone-events[tombstone event] with the old key for the row, followed by an `INSERT` event with the new key for the row.
When the columns for a row's primary/unique key are updated, the value of the row's key has changed so {ProductName} will output _three_ events: a `DELETE` event and link:#tombstone-events[tombstone event] with the old key for the row, followed by an `INSERT` event with the new key for the row.
====
[[delete-events]]
@ -837,7 +921,7 @@ If we look at the `payload` portion, we see a number of differences compared wit
* The `before` field now has the state of the row that was deleted with the database commit. Again this only contains the primary key column due to the link:#replica-identity[_REPLICA IDENTITY_] setting
* The `after` field is null, signifying that the row no longer exists
* The `source` field structure has many of the same values as before, except the `ts_ms`, `lsn` and `txId` fields have changed
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `ts_ms` shows the timestamp that {ProductName} processed this event.
This event gives a consumer all kinds of information that it can use to process the removal of this row.
@ -850,10 +934,10 @@ To be able to process messages from tables without PK set REPLICA IDENTITY to FU
The PostgreSQL connector's events are designed to work with https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction[Kafka log compaction], which allows for the removal of some older messages as long as at least the most recent message for every key is kept. This allows Kafka to reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.
[[tombstone-events]]
When a row is deleted, the _delete_ event value listed above still works with log compaction, since Kafka can still remove all earlier messages with that same key. But only if the message value is `null` will Kafka know that it can remove _all messages_ with that same key. To make this possible, Debezium's PostgreSQL connector always follows the _delete_ event with a special _tombstone_ event that has the same key but `null` value.
When a row is deleted, the _delete_ event value listed above still works with log compaction, since Kafka can still remove all earlier messages with that same key. But only if the message value is `null` will Kafka know that it can remove _all messages_ with that same key. To make this possible, the PostgreSQL connector always follows the _delete_ event with a special _tombstone_ event that has the same key but `null` value.
[[data-types]]
=== Data Types
==== Data Types
As described above, the PostgreSQL connector represents the changes to rows with events that are structured like the table in which the row exist. The event contains a field for each column value, and how that value is represented in the event depends on the PostgreSQL data type of the column. This section describes this mapping.
@ -1032,7 +1116,7 @@ The _semantic type_ describes how the Kafka Connect schema captures the _meaning
Other data type mappings are described in the following sections.
[[temporal-values]]
==== Temporal Values
===== Temporal Values
Other than PostgreSQL's `TIMESTAMPTZ` and `TIMETZ` data types (which contain time zone information), the other temporal types depend on the value of the `time.precision.mode` configuration property. When the `time.precision.mode` configuration property is set to `adaptive` (the default), then the connector will determine the literal type and semantic type for the temporal types based on the column's data type definition so that events _exactly_ represent the values in the database:
@ -1135,10 +1219,10 @@ Such columns are converted into an equivalent Kafka Connect value based on UTC.
So for instance the `TIMESTAMP` value "2018-06-20 15:13:16.945104" will be represented by a `io.debezium.time.MicroTimestamp` with the value "1529507596945104"
(assuming `time.precision.mode` is not set to `connect`).
Note that the timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.
Note that the timezone of the JVM running Kafka Connect and {ProductName} does not affect this conversion.
[[decimal-values]]
==== Decimal Values
===== Decimal Values
When `decimal.handling.mode` configuration property is set to `precise`, then the connector will use the predefined Kafka Connect `org.apache.kafka.connect.data.Decimal` logical type for all `DECIMAL` and `NUMERIC` columns. This is the default mode.
@ -1229,7 +1313,7 @@ The last option for `decimal.handling.mode` configuration property is `string`.
PostgreSQL supports `NaN` (not a number) special value to be stored in the `DECIMAL`/`NUMERIC` values. Only `string` and `double` modes are able to handle such values encoding them as either `Double.NaN` or string constant `NAN`.
[[hstore-values]]
==== HStore Values
===== HStore Values
When `hstore.handling.mode` configuration property is set to `map`, then the connector will use the `java.util.Map<String,String>` logical type, `MAP` schema type for all `HSTORE` columns. This is the default mode.
@ -1281,7 +1365,7 @@ When a column is defined using a domain type that extends another domain type th
[[postgis-types]]
[[network-address-types]]
==== Network Address Types
===== Network Address Types
PostgreSQL also have data types that can store IPv4, IPv6, and MAC addresses. It is better to use these instead of plain text types to store network addresses, because these types offer input error checking and specialized operators and functions.
@ -1313,7 +1397,7 @@ PostgreSQL also have data types that can store IPv4, IPv6, and MAC addresses. It
|MAC addresses in EUI-64 format
|=======================
==== PostGIS Types
===== PostGIS Types
The PostgreSQL connector also has full support for all of the http://postgis.net[PostGIS data types]
@ -1346,18 +1430,18 @@ Please see http://www.opengeospatial.org/standards/sfa[Open Geospatial Consortiu
|=======================
[[toasted-values]]
==== Toasted values
===== Toasted values
PostgreSQL has a hard limit on the page size.
This means that values larger than ca. 8 KB need to be stored using https://www.postgresql.org/docs/current/storage-toast.html[TOAST storage].
This impacts replication messages coming from database, as the values that were stored using the TOAST mechanism and have not been changed are not included in the message, unless they are part of the table's replica identity.
There is no safe way for Debezium to read the missing value out-of-bands directly from database, as this would lead into race conditions potentially.
Debezium thus follows these rules to handle the toasted values:
There is no safe way for {ProductName} to read the missing value out-of-bands directly from database, as this would lead into race conditions potentially.
{ProductName} thus follows these rules to handle the toasted values:
* tables with `REPLICA IDENTITY FULL`: TOAST column values are part of the `before` and `after` blocks of change events as any other column
* tables with `REPLICA IDENTITY DEFAULT`: when receiving an `UPDATE` event from the database,
any unchanged TOAST column value which is not part of the replica identity will not be part of that event;
similarly, when receiving a `DELETE` event, any such TOAST column will not be part of the `before` block.
As Debezium cannot safely provide the column value in this case, it returns a placeholder value defined in configuration option `toasted.value.placeholder`.
As {ProductName} cannot safely provide the column value in this case, it returns a placeholder value defined in configuration option `toasted.value.placeholder`.
[IMPORTANT]
====
@ -1371,90 +1455,63 @@ For consistent toasted values handling we recommend to
* set `include-unchanged-toast=0` for older versions of `wal2json` plugin using `slot.stream.params` configuration option
====
[[fault-tolerance]]
[[when-things-go-wrong]]
=== When Things Go Wrong
Debezium is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then Debezium provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations Debezium (like Kafka) provides _at least once_ delivery of change events.
The rest of this section describes how Debezium handles various kinds of faults and problems.
==== Configuration and Startup Errors
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, when the connector cannot successfully connect to PostgreSQL using the specified connectivity parameters, or when the connector is restarting from a previously-recorded position in the PostgreSQL WAL (via the LSN value) and PostgreSQL no longer has that history available.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the PostgreSQL problem has been addressed.
==== PostgreSQL Becomes Unavailable
Once the connector is running, if the PostgreSQL server it has been connected to becomes unavailable for any reason, the connector will fail with an error and the connector will stop. Simply restart the connector when the server is available.
The PostgreSQL connector stores externally the last processed offset (in the form of a PostgreSQL `log sequence number` value). Once a connector is restarted and connects to a server instance, it will ask the server to continue streaming from that particular offset. This offset will always remain available so long as the Debezium replication slot remains intact. Never drop a replication slot on the primary or you will lose data. See the next section for failure cases when a slot has been removed.
==== Cluster Failures
As of `12`, PostgreSQL allows logical replication slots _only on primary servers_, which means that a PostgreSQL connector can only be pointed to the active primary of a database cluster.
Also replication slots themselves are not propagated to replicas.
If the primary node goes down, only after a new primary has been promoted (with the link:#output-plugin[logical decoding plugin] installed) and a replication slot has been created there, the connector can be restarted and pointed to the new server.
There are some really important caveats to failovers, and you should pause Debezium until you can verify that you have a replication slot intact which has not lost data. After a failover, you will miss change events unless your administration of failovers includes a process to recreate the Debezium replication slot before the application is allowed to write to the *new* primary. You also may need to verify in a failover situation that Debezium was able to read all changes in the slot **before the old primary failed**.
One reliable method of recovering and verifying any lost changes (yet administratively difficult) is to recover a backup of your failed primary to the point immediately before it failed, which would allow you to inspect the replication slot for any unconsumed changes. In any case, it is crucial that you recreate the replication slot on the new primary prior to allowing writes to it.
[NOTE]
====
There are discussions in the PostgreSQL community around a feature called `failover slots` which would help mitigate this problem, but as of `12` they have not been implemented yet. However, there is active development for Postgres 13 to support logical decoding on standbys, which is a major requirement to make failover possible. You can find more about this on the
link:++https://www.postgresql.org/message-id/CAJ3gD9fE=0w50sRagcs+jrktBXuJAWGZQdSTMa57CCY+Dh-xbg@mail.gmail.com++[community thread].
You can find out more about the concept of failover slots here http://blog.2ndquadrant.com/failover-slots-postgresql[this blog post].
====
==== Kafka Connect Process Stops Gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There will be a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
==== Kafka Connect Process Crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will obviously terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the PostgreSQL connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events will depend on the offset flush period and the volume of data changes just before the crash.
[TIP]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. Debezium change are idempotent, so a sequence of events always results in the same state.
Debezium also includes with each change event message the source-specific information about the origin of the event, including the PostgreSQL server's time of the event, the id of the server transaction and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information (especially the LSN position) to know whether they have already seen a particular event.
====
==== Kafka Becomes Unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be re-established, at which point the connectors will resume exactly where they left off.
==== Connector Is Stopped for a Duration
If the connector is gracefully stopped, the database can continue to be used and any new changes will be recorded in the PostgreSQL WAL. When the connector is restarted, it will resume streaming changes where it last left off, recording change events for all of the changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines[massive throughput]. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.
[[configuration]]
[[deploying-a-connector]]
== Deploying a Connector
== Deploying the PostgreSQL Connector
ifndef::cdc-product[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and http://kafka.apache.org/documentation.html#connect[Kafka Connect], then using Debezium's PostgreSQL connector is easy. Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/{debezium-version}/debezium-connector-postgres-{debezium-version}-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to http://docs.confluent.io/{confluent-platform-version}/connect/userguide.html#installing-plugins[Kafka Connect's classpath]. Restart your Kafka Connect process to pick up the new JARs.
endif::cdc-product[]
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[Debezium's Docker images] for Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already pre-installed and ready to go. You can even link:/blog/2016/05/31/Debezium-on-Kubernetes/[run Debezium on Kubernetes and OpenShift].
ifdef::cdc-product[]
Installing the PostgreSQL connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
To use the connector to produce change events for a particular PostgreSQL server or cluster:
.Prerequisites
. install the link:#output-plugin[logical decoding plugin]
. configure the link:#server-configuration[PostgreSQL server] to support logical replication
. create a link:#example-configuration[configuration file for the PostgreSQL Connector] and use the https://docs.confluent.io/{confluent-platform-version}/connect/restapi.html[Kafka Connect REST API] to add that connector to your Kafka Connect cluster.
* You have link:https://zookeeper.apache.org/[Zookeeper], link:http://kafka.apache.org/[Kafka], and link:http://kafka.apache.org/documentation.html#connect[Kafka Connect] installed.
* You have PostgreSQL installed and setup.
When the connector starts, it will grab a consistent snapshot of the databases in your PostgreSQL server and start streaming changes, producing events for every inserted, updated, and deleted row. You can also choose to produce events for a subset of the schemas and tables. Optionally ignore, mask, or truncate columns that are sensitive, too large, or not needed.
.Procedure
. Visit link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?downloadType=distributions&product=jboss.amq.streams[Product Downloads^] on the Red Hat Customer Portal and download the PostgreSQL connector.
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
+
[source]
----
plugin.path=/kafka/connect
----
NOTE: The above example assumes you have extracted the {prodname} PostgreSQL connector to the `/kafka/connect/{prodname}-connector-postgresql` path.
[start=4]
. Restart your Kafka Connect process. This ensures the new JARs are picked up.
.Additional resources
For more information on the deployment process, and deploying connectors with AMQ Streams, refer to the {ProductName} installation guides.
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_openshift[Installing {ProductName} on OpenShift]
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_rhel[Installing {ProductName} on RHEL]
endif::cdc-product[]
ifndef::cdc-product[]
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[{ProductName}'s Docker images] for Zookeeper, Kafka, PostgreSQL and Kafka Connect with the PostgreSQL connector already pre-installed and ready to go. You can even link:/blog/2016/05/31/Debezium-on-Kubernetes/[run Debezium on Kubernetes and OpenShift].
endif::cdc-product[]
[[example]]
[[example-configuration]]
=== Example Configuration
Using the PostgreSQL connector is straightforward. Here is an example of the configuration for a PostgreSQL connector that monitors a PostgreSQL server at port 5432 on 192.168.99.100, which we logically name `fullfillment`:
To use the connector to produce change events for a particular PostgreSQL server or cluster:
. Install the link:#output-plugin[logical decoding plugin]
. Configure the link:#server-configuration[PostgreSQL server] to support logical replication
. Create a configuration file for the PostgreSQL connector in JSON.
When the connector starts, it will grab a consistent snapshot of the databases in your PostgreSQL server and start streaming changes, producing events for every inserted, updated, and deleted row. You can also choose to produce events for a subset of the schemas and tables.
Optionally ignore, mask, or truncate columns that are sensitive, too large, or not needed.
Here is an example of the configuration for a PostgreSQL connector that monitors a PostgreSQL server at port 5432 on 192.168.99.100, which we logically name `fullfillment`:
[source,json]
----
@ -1480,14 +1537,13 @@ Using the PostgreSQL connector is straightforward. Here is an example of the con
<5> The name of the PostgreSQL user that has the link:#PostgreSQL-permissions[required privileges].
<6> The password for the PostgreSQL user that has the link:#PostgreSQL-permissions[required privileges].
<7> The name of the PostgreSQL database to connect to
<8> The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the link:#avro-converter[Avro Connector] is used.
<8> The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
<9> A list of all tables hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the schemas and tables to include or exclude from monitoring.
See the link:#connector-properties[complete list of connector properties] that can be specified in these configurations.
This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the PostgreSQL database and record events to Kafka topics.
[[connector-properties]]
=== Connector Properties
@ -1513,10 +1569,16 @@ The following configuration properties are _required_ unless a default value is
|`plugin.name`
|`decoderbufs`
|The name of the Postgres link:#output-plugin[logical decoding plugin] installed on the server. Supported values are `decoderbufs`, `wal2json`, `wal2json_rds`, `wal2json_streaming` (added in 0.8.0.Beta1), `wal2json_rds_streaming` (added in 0.8.0.Beta1) and `pgoutput` (added in 0.10.0.Beta3)
|The name of the Postgres link:#output-plugin[logical decoding plugin] installed on the server.
ifdef::cdc-product[]
The only supported value is `pgoutput`.
endif::cdc-product[]
ifndef::cdc-product[]
Supported values are `decoderbufs`, `wal2json`, `wal2json_rds`, `wal2json_streaming` (added in 0.8.0.Beta1), `wal2json_rds_streaming` (added in 0.8.0.Beta1) and `pgoutput` (added in 0.10.0.Beta3)
endif::cdc-product[]
When the processed transactions are very large it is possible that the `JSON` batch event with all changes in the transaction will not fit into the hard-coded memory buffer of size 1 GB.
In such cases it is possible to switch to so-called *streaming* mode when every change in transactions is sent as a separate message from PostgreSQL into Debezium.
In such cases it is possible to switch to so-called *streaming* mode when every change in transactions is sent as a separate message from PostgreSQL into {ProductName}.
|`slot.name`
|`debezium`
@ -1531,12 +1593,12 @@ In such cases it is possible to switch to so-called *streaming* mode when every
|The name of the PostgreSQL publication created created for streaming changes when using `pgoutput`.
This publication is created at start-up if it does not already exist to include _all tables_.
Debezium will then use its own white-/blacklist filtering capabilities to limit change events to the specific tables of interest if configured.
{ProductName} will then use its own white-/blacklist filtering capabilities to limit change events to the specific tables of interest if configured.
Note the connector user must have superuser permissions in order to create this publication,
so it is usually preferable to create the publication upfront.
If the publication already exists (either for all tables or configured with a subset of tables),
Debezium will instead use the publication as defined.
{ProductName} will instead use the publication as defined.
|`database.hostname`
|
|IP address or hostname of the PostgreSQL database server.
@ -1657,21 +1719,22 @@ The following _advanced_ configuration properties have good defaults that will w
|`snapshot.mode`
|`initial`
|Specifies the criteria for running a snapshot upon startup of the connector. The default is *initial*, and specifies the connector can run a snapshot only when no offsets have been recorded for the logical server name. The *always* option specifies that the connector run a snapshot each time on startup. The *never* option specifies that the connect should never use snapshots and that upon first startup with a logical server name the connector should read from either from where it last left off (last LSN position) or start from the beginning from the point of the view of the logical replication slot. The *initial_only* option specifies that the connector should only take an initial snapshot and then stop, without processing any subsequent changes. The *exported* option specifies that the database snapshot will be based on the point in time when the replication slot was created and is an excellent way to perform the snapshot in a lock-free way. Finally, if set to *custom* then the user must also set `snapshot.custom.class` which is a custom implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. See link:#snapshots[snapshots].
|Specifies the criteria for running a snapshot upon startup of the connector. The default is *initial*, and specifies the connector can run a snapshot only when no offsets have been recorded for the logical server name. The *always* option specifies that the connector run a snapshot each time on startup. The *never* option specifies that the connect should never use snapshots and that upon first startup with a logical server name the connector should read from either from where it last left off (last LSN position) or start from the beginning from the point of the view of the logical replication slot. The *initial_only* option specifies that the connector should only take an initial snapshot and then stop, without processing any subsequent changes. The *exported* option specifies that the database snapshot will be based on the point in time when the replication slot was created and is an excellent way to perform the snapshot in a lock-free way.
ifndef::cdc-product[]
Finally, if set to *custom* then the user must also set `snapshot.custom.class` which is a custom implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. See link:#snapshots[snapshots].
|`snapshot.custom.class`
|
| A full java class name that must be an implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. Only used when `snapshot.mode` is *custom*
| A full java class name that must be an implementation of the `io.debezium.connector.postgresql.spi.Snapshotter` interface. Only used when `snapshot.mode` is *custom
endif::cdc-product[]
|`snapshot.lock.timeout.ms`
|`10000`
|Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail. See link:#snapshots[snapshots]
|Positive integer value that specifies the maximum amount of time (in milliseconds) to wait to obtain table locks when performing a snapshot. If table locks cannot be acquired in this time interval, the snapshot will fail See link:#snapshots[snapshots]
|`snapshot.select.statement.overrides`
|
|Controls which rows from tables will be included in snapshot. +
This property contains a comma-separated list of fully-qualified tables _(DB_NAME.TABLE_NAME)_. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id `snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]`. The value of those properties is the SELECT statement to use when retrieving data from the specific table during snapshotting. _A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted._ +
*Note*: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
NOTE: This setting has impact on snapshots only. Events generated by logical decoder are not affected by it at all.
|`max.queue.size`
|`20240`
@ -1687,18 +1750,18 @@ This property contains a comma-separated list of fully-qualified tables _(DB_NAM
|`include.unknown.datatypes`
|`false`
|When Debezium meets a field whose data type is unknown, then by default the field is omitted from the change event and a warning is logged.
|When {ProductName} meets a field whose data type is unknown, then by default the field is omitted from the change event and a warning is logged.
In some cases it may be preferable though to include the field and send it downstream to clients in the opaque binary representation so the clients will decode it themselves.
Set to `false` to filter unknown data out of events and `true` to keep them in binary format.
_Note: The clients risk backward compatibility issues. Not only may the database specific binary representation change between releases, but also when the datatype is supported by Debezium eventually, it will be sent downstream in a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added._
NOTE: The clients risk backward compatibility issues. Not only may the database specific binary representation change between releases, but also when the datatype is supported by {ProductName} eventually, it will be sent downstream in a logical type, requiring adjustments by consumers. In general, when encountering unsupported data types, please file a feature request so that support can be added.
|`database.initial.statements`
|
|A semicolon separated list of SQL statements to be executed when a JDBC connection (not the transaction log reading connection) to the database is established.
Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter.
_Note: The connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only, but not for executing DML statements._
NOTE: The connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only, but not for executing DML statements.
|`heartbeat.interval.ms`
|`0`
@ -1744,15 +1807,17 @@ The connector will read the table contents in multiple batches of this size. Def
|`slot.stream.params`
|
|Optional list of parameters to be passed to the configured logical decoding plug-in;
can for instance be used to enable server-side table filtering when using the wal2json plug-in.
Allowed values depend on the chosen plug-in are separated by semicolon,
e.g. `add-tables=public.table,public.table2;include-lsn=true`.
|Optional list of parameters to be passed to the configured logical decoding plug-in.
ifndef::cdc-product[]
Can for instance be used to enable server-side table filtering when using the wal2json plug-in.
Allowed values depend on the chosen plug-in are separated by semicolon.
endif::cdc-product[]
For example, `add-tables=public.table,public.table2;include-lsn=true`.
|`sanitize.field.names`
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
|Whether field names will be sanitized to adhere to Avro naming requirements.
See xref:configuration/avro.adoc#names[Avro naming] for more details.
See xref:https://debezium.io/documentation/reference/1.0/configuration/avro.html#names[Avro naming] for more details.
|`slot.max.retries`
|6
@ -1764,7 +1829,7 @@ See xref:configuration/avro.adoc#names[Avro naming] for more details.
|`toasted.value.placeholder`
|`__debezium_unavailable_value`
|Specify the constant that will be provided by Debezium to indicate that the original value is a toasted value not provided by the database.
|Specify the constant that will be provided by {ProductName} to indicate that the original value is a toasted value not provided by the database.
If starts with `hex:` prefix it is expected that the rest of the string repesents hexadecimally encoded octets.
See link:#toasted-values[section] with additional details.
@ -1773,3 +1838,67 @@ See link:#toasted-values[section] with additional details.
The connector also supports _pass-through_ configuration properties that are used when creating the Kafka producer and consumer.
Be sure to consult the http://kafka.apache.org/documentation.html[Kafka documentation] for all of the configuration properties for Kafka producers and consumers. (The PostgreSQL connector does use the http://kafka.apache.org/documentation.html#newconsumerconfigs[new consumer].)
[[when-things-go-wrong]]
== PostgreSQL common issues
{ProductName} is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then {ProductName} provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations {ProductName}, like Kafka, provides _at least once_ delivery of change events.
The rest of this section describes how {ProductName} handles various kinds of faults and problems.
=== Configuration and Startup Errors
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, when the connector cannot successfully connect to PostgreSQL using the specified connectivity parameters, or when the connector is restarting from a previously-recorded position in the PostgreSQL WAL (via the LSN value) and PostgreSQL no longer has that history available.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the PostgreSQL problem has been addressed.
=== PostgreSQL Becomes Unavailable
Once the connector is running, if the PostgreSQL server it has been connected to becomes unavailable for any reason, the connector will fail with an error and the connector will stop. Simply restart the connector when the server is available.
The PostgreSQL connector stores externally the last processed offset (in the form of a PostgreSQL `log sequence number` value). Once a connector is restarted and connects to a server instance, it will ask the server to continue streaming from that particular offset. This offset will always remain available so long as the {ProductName} replication slot remains intact. Never drop a replication slot on the primary or you will lose data. See the next section for failure cases when a slot has been removed.
=== Cluster Failures
As of `12`, PostgreSQL allows logical replication slots _only on primary servers_, which means that a PostgreSQL connector can only be pointed to the active primary of a database cluster.
Also replication slots themselves are not propagated to replicas.
If the primary node goes down, only after a new primary has been promoted (with the link:#output-plugin[logical decoding plugin] installed) and a replication slot has been created there, the connector can be restarted and pointed to the new server.
There are some really important caveats to failovers, and you should pause Debezium until you can verify that you have a replication slot intact which has not lost data. After a failover, you will miss change events unless your administration of failovers includes a process to recreate the Debezium replication slot before the application is allowed to write to the *new* primary. You also may need to verify in a failover situation that Debezium was able to read all changes in the slot **before the old primary failed**.
One reliable method of recovering and verifying any lost changes (yet administratively difficult) is to recover a backup of your failed primary to the point immediately before it failed, which would allow you to inspect the replication slot for any unconsumed changes. In any case, it is crucial that you recreate the replication slot on the new primary prior to allowing writes to it.
ifndef::cdc-product[]
[NOTE]
====
There are discussions in the PostgreSQL community around a feature called `failover slots` which would help mitigate this problem, but as of `12` they have not been implemented yet. However, there is active development for Postgres 13 to support logical decoding on standbys, which is a major requirement to make failover possible. You can find more about this on the
link:++https://www.postgresql.org/message-id/CAJ3gD9fE=0w50sRagcs+jrktBXuJAWGZQdSTMa57CCY+Dh-xbg@mail.gmail.com++[community thread].
You can find out more about the concept of failover slots here http://blog.2ndquadrant.com/failover-slots-postgresql[this blog post].
====
endif::cdc-product[]
=== Kafka Connect Process Stops Gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off. There will be a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
=== Kafka Connect Process Crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will obviously terminate without recording their most recently-processed offsets. When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes. However, the PostgreSQL connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash. The number of duplicate events will depend on the offset flush period and the volume of data changes just before the crash.
[NOTE]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. {ProductName} changes are idempotent, so a sequence of events always results in the same state.
{ProductName} also includes with each change event message the source-specific information about the origin of the event, including the PostgreSQL server's time of the event, the id of the server transaction and the position in the write-ahead log where the transaction changes were written. Consumers can keep track of this information (especially the LSN position) to know whether they have already seen a particular event.
====
=== Kafka Becomes Unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency you've specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be re-established, at which point the connectors will resume exactly where they left off.
=== Connector Is Stopped for a Duration
If the connector is gracefully stopped, the database can continue to be used and any new changes will be recorded in the PostgreSQL WAL. When the connector is restarted, it will resume streaming changes where it last left off, recording change events for all of the changes that were made while the connector was stopped.
A properly configured Kafka cluster is able to handle https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines[massive throughput]. Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in PostgreSQL.

View File

@ -1,4 +1,6 @@
= Debezium Connector for SQL Server
= {ProductName} Connector for SQL Server
ifndef::cdc-product[]
include::../_attributes.adoc[]
:toc:
:toc-placement: macro
@ -12,22 +14,33 @@ Want to help us further hone and improve it? link:/docs/contribute/[Learn how].
====
toc::[]
endif::cdc-product[]
Debezium's SQL Server Connector can monitor and record the row-level changes in the schemas of a SQL Server database.
ifdef::cdc-product[]
[IMPORTANT]
====
Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete; therefore, Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process.
For more information about support scope, see link:https://access.redhat.com/support/offerings/techpreview/[Technology Preview Features Support Scope^].
====
endif::cdc-product[]
{ProductName}'s SQL Server Connector can monitor and record the row-level changes in the schemas of a SQL Server database.
ifndef::cdc-product[]
This connector was added in Debezium 0.9.0.
endif::cdc-product[]
The first time it connects to a SQL Server database/cluster, it reads a consistent snapshot of all of the schemas.
When that snapshot is complete, the connector continuously streams the changes that were committed to SQL Server and generates corresponding insert, update and delete events.
All of the events for each table are recorded in a separate Kafka topic, where they can be easily consumed by applications and services.
[[overview]]
[[sqlserver-overview]]
== Overview
The functionality of the connector is based upon https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/about-change-data-capture-sql-server?view=sql-server-2017[change data capture] feature provided by SQL Server Standard (https://blogs.msdn.microsoft.com/sqlreleaseservices/sql-server-2016-service-pack-1-sp1-released/[since SQL Server 2016 SP1]) or Enterprise edition.
Using this mechanism a SQL Server capture process monitors all databases and tables the user is interested in and stores the changes into specifically created _CDC_ tables that have stored procedure facade.
The connector has been tested with SQL Server 2017, but community members have reportedly used it successfully with earlier versions up to 2014, too (as long as the CDC feature is provided).
The database operator must https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-2017[enable] _CDC_ for the table(s) that should be captured by the Debezium connector.
The database operator must https://docs.microsoft.com/en-us/sql/relational-databases/track-changes/enable-and-disable-change-data-capture-sql-server?view=sql-server-2017[enable] _CDC_ for the table(s) that should be captured by the connector.
The connector then produces a _change event_ for every row-level insert, update, and delete operation that was published via the _CDC API_, recording all the change events for each table in a separate Kafka topic.
The client applications read the Kafka topics that correspond to the database tables they're interested in following, and react to every row-level event it sees in those topics.
@ -40,12 +53,12 @@ This way, we start with a consistent view of all of the data, yet continue readi
The connector is also tolerant of failures.
As the connector reads changes and produces events, it records the position in the database log (_LSN / Log Sequence Number_), that is associated with _CDC_ record, with each event.
If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart it simply continues reading the _CDC_ tables where it last left off.
This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it will begin a new snapshot.
This includes snapshots: if the snapshot was not completed when the connector is stopped, upon restart it begins a new snapshot.
[[setting-up-sqlserver]]
== Setting up SQL Server
Before using the Debezium SQL Server connector to monitor the changes committed on SQL Server, first enable _CDC_ on a monitored database.
Before using the SQL Server connector to monitor the changes committed on SQL Server, first enable _CDC_ on a monitored database.
Please bear in mind that _CDC_ cannot be enabled for `master` database.
[source,sql]
----
@ -94,6 +107,7 @@ If the result is empty then please make sure that the user has privileges to acc
The SQL Server plug-in has not been tested with SQL Server on Azure.
We welcome any feedback from a user to try the plug-in with database in managed environment.
ifndef::cdc-product[]
[[always-on-replica]]
=== SQL Server Always On
@ -108,6 +122,7 @@ When Debezium detects this configuration option then it will:
** set `snapshot.isolation.mode` to `snapshot` as this is the only one transaction isolation mode supported by raed-only replicas
** commit the (read-only) transaction in every execution of the streaming query loop, as this is necessary to get the latest view on CDC data
endif::cdc-product[]
[[how-the-connector-works]]
== How the SQL Server connector works
@ -115,7 +130,7 @@ When Debezium detects this configuration option then it will:
=== Snapshots
SQL Server CDC is not designed to store the complete history of database changes.
It is thus necessary that Debezium establishes the baseline of current database content and streams it to the Kafka.
It is thus necessary that {ProductName} establishes the baseline of current database content and streams it to the Kafka.
This is achieved via a process called snapshotting.
By default (snapshotting mode *initial*) the connector will upon the first startup perform an initial _consistent snapshot_ of the database
@ -140,7 +155,7 @@ Then the connector identifies a change table for each of the source tables and e
1. For each change table read all changes that were created between last stored maximum LSN and current maximum LSN
2. Order the read changes incrementally according to commit LSN and change LSN.
This assures that the changes are replayed by Debezium in the same order as were made to the database.
This ensures that the changes are replayed by {ProductName} in the same order as were made to the database.
3. Pass commit and change LSNs as offsets to Kafka Connect.
4. Store the maximum LSN and repeat the loop.
@ -161,7 +176,7 @@ For example, consider a SQL Server installation with an `inventory` database tha
=== Schema change topic
The user-facing schema change topic is not implemented yet (see {jira-url}/browse/DBZ-753[DBZ-753]).
The user-facing schema change topic is not implemented yet (see https://issues.jboss.org/browse/DBZ-753[DBZ-753]).
=== Events
@ -169,7 +184,7 @@ All data change events produced by the SQL Server connector have a key and a val
[WARNING]
====
The Debezium SQL Server connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names].
The SQL Server connector ensures that all Kafka Connect _schema names_ are http://avro.apache.org/docs/current/spec.html#names[valid Avro schema names].
This means that the logical server name must start with Latin letters or an underscore (e.g., [a-z,A-Z,\_]),
and the remaining characters in the logical server name and all characters in the schema and table names must be Latin letters, digits, or an underscore (e.g., [a-z,A-Z,0-9,\_]).
If not, then all invalid characters will automatically be replaced with an underscore character.
@ -177,7 +192,7 @@ If not, then all invalid characters will automatically be replaced with an under
This can lead to unexpected conflicts when the logical server name, schema names, and table names contain other characters, and the only distinguishing characters between table full names are invalid and thus replaced with underscores.
====
Debezium and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events may change over time.
{ProductName} and Kafka Connect are designed around _continuous streams of event messages_, and the structure of these events may change over time.
This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained.
Every message key and value has two parts: a _schema_ and _payload_.
The schema describes the structure of the payload, while the payload contains the actual data.
@ -228,7 +243,7 @@ If we look at the value of the key's `payload` field, we'll see that it is indee
Therefore, we interpret this key as describing the row in the `dbo.customers` table (output from the connector named `server1`) whose `id` primary key column had a value of `1004`.
////
ifndef::cdc-product[]
[NOTE]
====
Although the `column.blacklist` configuration property allows you to remove columns from the event values, all columns in a primary or unique key are always included in the event's key.
@ -238,7 +253,7 @@ Although the `column.blacklist` configuration property allows you to remove colu
====
If the table does not have a primary or unique key, then the change event's key will be null. This makes sense since the rows in a table without a primary or unique key constraint cannot be uniquely identified.
====
////
endif::cdc-product[]
[[change-event-values]]
==== Change Event Values
@ -247,10 +262,10 @@ Like the message key, the value of a change event message has a _schema_ section
The payload section of every change event value produced by the SQL Server connector has an _envelope_ structure with the following fields:
* `op` is a mandatory field that contains a string value describing the type of operation. Values for the SQL Server connector are `c` for create (or insert), `u` for update, `d` for delete, and `r` for read (in the case of a snapshot).
* `before` is an optional field that if present contains the state of the row _before_ the event occurred. The structure will be described by the `server1.dbo.customers.Value` Kafka Connect schema, which the `server1` connector uses for all rows in the `dbo.customers` table.
* `before` is an optional field that if present contains the state of the row _before_ the event occurred. The structure is described by the `server1.dbo.customers.Value` Kafka Connect schema, which the `server1` connector uses for all rows in the `dbo.customers` table.
* `after` is an optional field that if present contains the state of the row _after_ the event occurred. The structure is described by the same `server1.dbo.customers.Value` Kafka Connect schema used in `before`.
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of SQL Server contains these fields: the Debezium version, the connector name, whether the event is part of an ongoing snapshot or not, the commit LSN (not while snapshotting), the LSN of the change, database, schema and table where the change happened, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, it'll be the point in time of snapshotting).
* `source` is a mandatory field that contains a structure describing the source metadata for the event, which in the case of SQL Server contains these fields: the {ProductName} version, the connector name, whether the event is part of an ongoing snapshot or not, the commit LSN (not while snapshotting), the LSN of the change, database, schema and table where the change happened, and a timestamp representing the point in time when the record was changed in the source database (during snapshotting, it'll be the point in time of snapshotting).
+
Also a field `event_serial_no` is present during streaming.
This is used to differentiate among events that have the same commit and change LSN.
@ -258,14 +273,14 @@ There are mostly two situations when you can see it present with value different
+
** update events will have the value set to `2`, this is because the update generates two events in the CDC change table of SQL Server (https://docs.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-capture-instance-ct-transact-sql?view=sql-server-2017[source documentation]).
The first one contains the old values and the second one contains new values.
So the first one is dropped and the values from it are used with the second one to create the Debezium change event.
So the first one is dropped and the values from it are used with the second one to create the {ProductName} change event.
** when a primary key is updated, then SQL Server emits two records - `delete` to remove the record with the old primary key value and `insert` to create the record with the new primary key.
Both operations share the same commit and change LSN and their event numbers are `1` and `2`.
* `ts_ms` is optional and if present contains the time (using the system clock in the JVM running the Kafka Connect task) at which the connector processed the event.
And of course, the _schema_ portion of the event message's value contains a schema that describes this envelope structure and the nested fields within it.
[[create-events]]
[[sqlserver-create-events]]
===== Create events
Let's look at what a _create_ event value might look like for our `customers` table:
@ -439,24 +454,24 @@ Let's look at what a _create_ event value might look like for our `customers` ta
If we look at the `schema` portion of this event's _value_, we can see the schema for the _envelope_, the schema for the `source` structure (which is specific to the SQL Server connector and reused across all events), and the table-specific schemas for the `before` and `after` fields.
[TIP]
[NOTE]
====
The names of the schemas for the `before` and `after` fields are of the form _logicalName_._schemaName_._tableName_.Value, and thus are entirely independent from all other schemas for all other tables.
This means that when using the link:/docs/faq/#avro-converter[Avro Converter], the resulting Avro schems for _each table_ in each _logical source_ have their own evolution and history.
This means that when using the Avro Converter, the resulting Avro schemas for _each table_ in each _logical source_ have their own evolution and history.
====
If we look at the `payload` portion of this event's _value_, we can see the information in the event, namely that it is describing that the row was created (since `op=c`), and that the `after` field value contains the values of the new inserted row's' `id`, `first_name`, `last_name`, and `email` columns.
[TIP]
[NOTE]
====
It may appear that the JSON representations of the events are much larger than the rows they describe.
This is true, because the JSON representation must include the _schema_ and the _payload_ portions of the message.
It is possible and even recommended to use the link:/docs/faq/#avro-converter[Avro Converter] to dramatically decrease the size of the actual messages written to the Kafka topics.
It is possible and even recommended to use the to dramatically decrease the size of the actual messages written to the Kafka topics.
====
[[update-events]]
[[sqlserver-update-events]]
===== Update events
The value of an _update_ change event on this table will actually have the exact same _schema_, and its payload will be structured the same but will hold different values.
The value of an _update_ change event on this table will actually have the exact same _schema_, and its payload is structured the same but will hold different values.
Here's an example:
[source,json,indent=0,subs="attributes"]
@ -504,21 +519,21 @@ When we compare this to the value in the _insert_ event, we see a couple of diff
* The `event_serial_no` field has value `2`.
That is due to the update event composed of two events behind the scenes and we are exposing only the second one.
If you are interested in details please check the https://docs.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-capture-instance-ct-transact-sql?view=sql-server-2017[source documentation] and refer to the field `$operation`.
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `ts_ms` shows the timestamp that {ProductName} processed this event.
There are several things we can learn by just looking at this `payload` section. We can compare the `before` and `after` structures to determine what actually changed in this row because of the commit.
The `source` structure tells us information about SQL Server's record of this change (providing traceability), but more importantly this has information we can compare to other events in this and other topics to know whether this event occurred before, after, or as part of the same SQL Server commit as other events.
[NOTE]
====
When the columns for a row's primary/unique key are updated, the value of the row's key has changed so Debezium will output _three_ events: a `DELETE` event and a link:#tombstone-events[tombstone event] with the old key for the row, followed by an `INSERT` event with the new key for the row.
When the columns for a row's primary/unique key are updated, the value of the row's key has changed so {ProductName} will output _three_ events: a `DELETE` event and a link:#tombstone-events[tombstone event] with the old key for the row, followed by an `INSERT` event with the new key for the row.
====
[[delete-events]]
[[sqlserver-delete-events]]
===== Delete events
So far we've seen samples of _create_ and _update_ events.
Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value will be exactly the same as with the _create_ and _update_ events:
Now, let's look at the value of a _delete_ event for the same table. Once again, the `schema` portion of the value is exactly the same as with the _create_ and _update_ events:
[source,json,indent=0,subs="attributes"]
----
@ -558,7 +573,7 @@ If we look at the `payload` portion, we see a number of differences compared wit
* The `before` field now has the state of the row that was deleted with the database commit.
* The `after` field is null, signifying that the row no longer exists
* The `source` field structure has many of the same values as before, except the `ts_ms`, `commit_lsn` and `change_lsn` fields have changed
* The `ts_ms` shows the timestamp that Debezium processed this event.
* The `ts_ms` shows the timestamp that {ProductName} processed this event.
This event gives a consumer all kinds of information that it can use to process the removal of this row.
@ -566,26 +581,26 @@ The SQL Server connector's events are designed to work with https://cwiki.apache
which allows for the removal of some older messages as long as at least the most recent message for every key is kept.
This allows Kafka to reclaim storage space while ensuring the topic contains a complete dataset and can be used for reloading key-based state.
[[tombstone-events]]
[[sqlserver-tombstone-events]]
When a row is deleted, the _delete_ event value listed above still works with log compaction, since Kafka can still remove all earlier messages with that same key.
But only if the message value is `null` will Kafka know that it can remove _all messages_ with that same key.
To make this possible, Debezium's SQL Server connector always follows the _delete_ event with a special _tombstone_ event that has the same key but `null` value.
To make this possible, the SQL Server connector always follows the _delete_ event with a special _tombstone_ event that has the same key but `null` value.
[[schema-evolution]]
=== Database schema evolution
Debezium is able to capture schema changes over time.
Due to the way CDC is implemented in SQL Server, it is necessary to work in co-operation with a database operator in order to ensure the Debezium connector continues to produce data change events when the schema is updated.
{ProductName} is able to capture schema changes over time.
Due to the way CDC is implemented in SQL Server, it is necessary to work in co-operation with a database operator in order to ensure the connector continues to produce data change events when the schema is updated.
As was already mentioned before, Debezium uses SQL Server's change data capture functionality.
As was already mentioned before, {ProductName} uses SQL Server's change data capture functionality.
This means that SQL Server creates a capture table that contains all changes executed on the source table.
Unfortunately, the capture table is static and needs to be updated when the source table structure changes.
This update is not done by the Debezium connector itself but must be executed by an operator with elevated privileges.
This update is not done by the connector itself but must be executed by an operator with elevated privileges.
There are generally two procedures how to execute the schema change:
* cold - this is executed when Debezium is stopped
* hot - executed while Debezium is running
* cold - this is executed when {ProductName} is stopped
* hot - executed while {ProductName} is running
Both approaches have their own advantages and disadvantages.
@ -604,7 +619,7 @@ One such exception identified is renaming a column or changing its type, SQL Ser
[NOTE]
====
Although not required by SQL Server's CDC mechanism itself, a new capture instance must be created when altering a column from `NULL` to `NOT NULL` or vice versa.
This is required so that the Debezium SQL Server connector can pick up that changed information.
This is required so that the SQL Server connector can pick up that changed information.
Otherwise, emitted change events will have the `optional` value for the corresponding field (`true` or `false`) set to match the original value.
====
@ -614,13 +629,13 @@ This is the safest procedure but might not be feasible for applications with hig
The operator should follow this sequence of steps
1. Suspend the application that generates the database records
2. Wait for Debezium to stream all unstreamed changes
3. Stop Debezium connector
2. Wait for {ProductName} to stream all unstreamed changes
3. Stop the connector
4. Apply all changes to the source table schema
5. Create a new capture table for the update source table using `sys.sp_cdc_enable_table` procedure with a unique value for parameter `@capture_instance`
6. Resume the application
7. Start Debezium connector
8. When Debezium starts streaming from the new capture table it is possible to drop the old one using `sys.sp_cdc_disable_table` stored procedure with parameter `@capture_instance` set to the old capture instance name
7. Start the connector
8. When {ProductName} starts streaming from the new capture table it is possible to drop the old one using `sys.sp_cdc_disable_table` stored procedure with parameter `@capture_instance` set to the old capture instance name
==== Hot schema update
@ -629,17 +644,20 @@ The procedure itself is also much simpler than in case of cold schema update
1. Apply all changes to the source table schema
2. Create a new capture table for the update source table using `sys.sp_cdc_enable_table` procedure with a unique value for parameter `@capture_instance`
3. When Debezium starts streaming from the new capture table it is possible to drop the old one using `sys.sp_cdc_disable_table` stored procedure with parameter `@capture_instance` set to the old capture instance name
3. When {ProductName} starts streaming from the new capture table it is possible to drop the old one using `sys.sp_cdc_disable_table` stored procedure with parameter `@capture_instance` set to the old capture instance name
The hot schema update has one drawback.
There is a period of time between the database schema update and creating the new capture instance.
All changes that will arrive during this period will be captured by the old instance with the old structure.
All changes that will arrive during this period are captured by the old instance with the old structure.
For instance this means that in case of a newly added column any change event produced during this time will not yet contain a field for that new column.
If your application does not tolerate such a transition period we recommend to follow the cold schema update.
==== Example
ifndef::cdc-product[]
Let's deploy the SQL Server based https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server[Debezium tutorial] to demonstrate the hot schema update.
A column `phone_number` will be added to the `customers` table.
endif::cdc-product[]
In this example, a column `phone_number` is added to the `customers` table.
[source,shell]
----
@ -669,7 +687,7 @@ connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for Cha
connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]
```
Eventually, there will be a new field in the schema and value of the messages written to the Kafka topic.
Eventually, there is a new field in the schema and value of the messages written to the Kafka topic.
[source,json]
----
...
@ -695,7 +713,7 @@ EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_custom
GO
----
[[data-types]]
[[sqlserver-data-types]]
=== Data types
As described above, the SQL Server connector represents the changes to rows with events that are structured like the table in which the row exist.
@ -791,12 +809,14 @@ The _semantic type_ describes how the Kafka Connect schema captures the _meaning
Other data type mappings are described in the following sections.
If present, a column's default value will be propagated to the corresponding field's Kafka Connect schema.
If present, a column's default value is propagated to the corresponding field's Kafka Connect schema.
Change messages will contain the field's default value
(unless an explicit column value had been given), so there should rarely be the need to obtain the default value from the schema.
ifndef::cdc-product[]
Passing the default value helps though with satisfying the compatibility rules when xref:configuration/avro.advoc[using Avro] as serialization format together with the Confluent schema registry.
endif::cdc-product[]
[[temporal-values]]
[[sqlserver-temporal-values]]
==== Temporal values
Other than SQL Server's `DATETIMEOFFSET` data type (which contain time zone information), the other temporal types depend on the value of the `time.precision.mode` configuration property. When the `time.precision.mode` configuration property is set to `adaptive` (the default), then the connector will determine the literal type and semantic type for the temporal types based on the column's data type definition so that events _exactly_ represent the values in the database:
@ -891,14 +911,14 @@ When the `time.precision.mode` configuration property is set to `connect`, then
|=======================
[[timestamp-values]]
[[sqlserver-timestamp-values]]
===== Timestamp values
The `DATETIME`, `SMALLDATETIME` and `DATETIME2` types represent a timestamp without time zone information.
Such columns are converted into an equivalent Kafka Connect value based on UTC.
So for instance the `DATETIME2` value "2018-06-20 15:13:16.945104" will be represented by a `io.debezium.time.MicroTimestamp` with the value "1529507596945104".
So for instance the `DATETIME2` value "2018-06-20 15:13:16.945104" is represented by a `io.debezium.time.MicroTimestamp` with the value "1529507596945104".
Note that the timezone of the JVM running Kafka Connect and Debezium does not affect this conversion.
Note that the timezone of the JVM running Kafka Connect and {ProductName} does not affect this conversion.
==== Decimal values
@ -935,30 +955,66 @@ The `connect.decimal.precision` schema parameter contains an integer representin
|=======================
[[deploying-a-connector]]
== Deploying a connector
[[sqlserver-deploying-a-connector]]
== Deploying the SQL Server connector
ifndef::cdc-product[]
If you've already installed https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and http://kafka.apache.org/documentation.html#connect[Kafka Connect], then using Debezium's SQL Server` connector is easy.
Simply download the https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/0.9.0.Alpha1/debezium-connector-sqlserver-0.9.0.Alpha1-plugin.tar.gz[connector's plugin archive], extract the JARs into your Kafka Connect environment, and add the directory with the JARs to http://docs.confluent.io/{confluent-platform-version}/connect/userguide.html#installing-plugins[Kafka Connect's classpath].
Restart your Kafka Connect process to pick up the new JARs.
endif::cdc-product[]
ifdef::cdc-product[]
Installing the SQL Server connector is a simple process whereby you only need to download the JAR, extract it to your Kafka Connect environment, and ensure the plugin's parent directory is specified in your Kafka Connect environment.
.Prerequisites
* You have link:https://zookeeper.apache.org/[Zookeeper], link:http://kafka.apache.org/[Kafka], and link:http://kafka.apache.org/documentation.html#connect[Kafka Connect] installed.
* You have SQL Server installed and setup.
.Procedure
. Visit link:https://access.redhat.com/jbossnetwork/restricted/listSoftware.html?downloadType=distributions&product=jboss.amq.streams[Product Downloads^] on the Red Hat Customer Portal and download the SQL Server connector.
. Extract the files into your Kafka Connect environment.
. Add the plugin's parent directory to your Kafka Connect plugin path:
+
[source]
----
plugin.path=/kafka/connect
----
NOTE: The above example assumes you have extracted the {prodname} SQL Server connector to the `/kafka/connect/{prodname}-connector-sqlserver` path.
[start=4]
. Restart your Kafka Connect process. This ensures the new JARs are picked up.
.Additional resources
For more information on the deployment process, and deploying connectors with AMQ Streams, refer to the {ProductName} installation guides.
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_openshift[Installing {ProductName} on OpenShift]
* link:https://access.redhat.com/documentation/en-us/red_hat_integration/2019-12/html-single/installing_change_data_capture_on_rhel[Installing {ProductName} on RHEL]
endif::cdc-product[]
ifndef::cdc-product[]
If immutable containers are your thing, then check out https://hub.docker.com/r/debezium/[Debezium's Docker images] for Zookeeper, Kafka and Kafka Connect with the SQL Server connector already pre-installed and ready to go.
You can even link:/docs/openshift/[run Debezium on OpenShift].
endif::cdc-product[]
[[example]]
[[sqlserver-example-configuration]]
=== Example configuration
To use the connector to produce change events for a particular SQL Server database or cluster:
. enable the link:#setting-up-sqlserver[CDC on SQL Server] to publish the _CDC_ events in the database
. create a link:#example-configuration[configuration file for the SQL Server Connector] and use the https://docs.confluent.io/{confluent-platform-version}/connect/restapi.html[Kafka Connect REST API] to add that connector to your Kafka Connect cluster.
. Enable the link:#setting-up-sqlserver[CDC on SQL Server] to publish the _CDC_ events in the database
. Create a configuration file for the SQL Server connector in JSON.
When the connector starts, it will grab a consistent snapshot of the schemas in your SQL Server database and start streaming changes, producing events for every inserted, updated, and deleted row.
You can also choose to produce events for a subset of the schemas and tables.
Optionally ignore, mask, or truncate columns that are sensitive, too large, or not needed.
[[example]]
[[example-configuration]]
=== Example configuration
Using the SQL Server connector is straightforward. Here is an example of the configuration for a connector instance that monitors a SQL Server server at port 3306 on 192.168.99.100, which we logically name `fullfillment`:
Here is an example of the configuration for a connector instance that monitors a SQL Server server at port 3306 on 192.168.99.100, which we logically name `fullfillment`:
[source,json]
----
@ -985,8 +1041,8 @@ Using the SQL Server connector is straightforward. Here is an example of the con
<5> The name of the SQL Server user
<6> The password for the SQL Server user
<7> The name of the database to capture changes from
<8> The logical name of the SQL Server instance/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the xref:configuration/avro.adoc[Avro Connector] is used.
<9> A list of all tables whose changes Debezium should capture
<8> The logical name of the SQL Server instance/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
<9> A list of all tables whose changes {ProductName} should capture
<10> The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
<11> The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
@ -998,7 +1054,7 @@ This configuration can be sent via POST to a running Kafka Connect service, whic
[[monitoring]]
=== Monitoring
Kafka, Zookeeper, and Kafka Connect all have xref:operations/monitoring.adoc[built-in support] for JMX metrics. The SQL Server connector also publishes a number of metrics about the connector's activities that can be monitored through JMX. The connector has two types of metrics. Snapshot metrics help you monitor the snapshot activity and are available when the connector is performing a snapshot. Streaming metrics help you monitor the progress and activity while the connector reads CDC table data.
Kafka, Zookeeper, and Kafka Connect all have built-in support for JMX metrics. The SQL Server connector also publishes a number of metrics about the connector's activities that can be monitored through JMX. The connector has two types of metrics. Snapshot metrics help you monitor the snapshot activity and are available when the connector is performing a snapshot. Streaming metrics help you monitor the progress and activity while the connector reads CDC table data.
[[monitoring-snapshots]]
[[snapshot-metrics]]
@ -1117,7 +1173,7 @@ Kafka, Zookeeper, and Kafka Connect all have xref:operations/monitoring.adoc[bui
|`MilliSecondsBehindSource`
|`long`
|The number of milliseconds between the last change event's timestamp and the connector processing it. The values will incorporate any differences between the clocks on the machines where the database server and the Debezium connector are running.
|The number of milliseconds between the last change event's timestamp and the connector processing it. The values will incorporate any differences between the clocks on the machines where the database server and the connector are running.
|`NumberOfCommittedTransactions`
|`long`
@ -1181,7 +1237,7 @@ Kafka, Zookeeper, and Kafka Connect all have xref:operations/monitoring.adoc[bui
[[connector-properties]]
[[sqlserver-connector-properties]]
=== Connector properties
The following configuration properties are _required_ unless a default value is available.
@ -1234,15 +1290,17 @@ The following configuration properties are _required_ unless a default value is
|`database.history{zwsp}.kafka.bootstrap.servers`
|
|A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster. This connection will be used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.
|A list of host/port pairs that the connector will use for establishing an initial connection to the Kafka cluster.
This connection is used for retrieving database schema history previously stored by the connector, and for writing each DDL statement read from the source database. This should point to the same Kafka cluster used by the Kafka Connect process.
|`table.whitelist`
|
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form _schemaName_._tableName_. By default the connector will monitor every non-system table in each monitored schema. May not be used with `table.blacklist`.
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist is excluded from monitoring. Each identifier is of the form _schemaName_._tableName_. By default the connector will monitor every non-system table in each monitored schema. May not be used with `table.blacklist`.
|`table.blacklist`
|
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form _schemaName_._tableName_. May not be used with `table.whitelist`.
|An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist is monitored.
Each identifier is of the form _schemaName_._tableName_. May not be used with `table.whitelist`.
|`column.blacklist`
|_empty string_
@ -1263,7 +1321,7 @@ Emitting the tombstone event (the default behavior) allows Kafka to completely d
|`column.propagate.source.type`
|_n/a_
|An optional comma-separated list of regular expressions that match the fully-qualified names of columns whose original type and length should be added as a parameter to the corresponding field schemas in the emitted change messages.
The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` will be used to propagate the original type name and length (for variable-width types), respectively.
The schema parameters `pass:[_]pass:[_]debezium.source.column.type`, `pass:[_]pass:[_]debezium.source.column.length` and `pass:[_]pass:[_]debezium.source.column.scale` is used to propagate the original type name and length (for variable-width types), respectively.
Useful to properly size corresponding columns in sink databases.
Fully-qualified names for columns are of the form _schemaName_._tableName_._columnName_.
@ -1285,7 +1343,7 @@ The following _advanced_ configuration properties have good defaults that will w
|`snapshot.mode`
|_initial_
|A mode for taking an initial snapshot of the structure and optionally data of captured tables. Supported values are _initial_ (will take a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables) and _schema_only_ (will take a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics). Once the snapshot is complete, the connector will continue reading change events from the database's redo logs.
|A mode for taking an initial snapshot of the structure and optionally data of captured tables. Supported values are _initial_ (will take a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables) and _initial_schema_only_ (will take a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics). Once the snapshot is complete, the connector will continue reading change events from the database's redo logs.
|`snapshot.isolation.mode`
|_repeatable_read_
@ -1323,7 +1381,7 @@ This property contains an interval in milli-seconds that defines how frequently
This can be used to monitor whether the connector is still receiving change events from the database.
You also should leverage heartbeat messages in cases where only records in non-captured tables are changed for a longer period of time.
In such situation the connector would proceed to read the log from the database but never emit any change messages into Kafka,
which in turn means that no offset updates will be committed to Kafka.
which in turn means that no offset updates are committed to Kafka.
This may result in more change events to be re-sent after a connector restart.
Set this parameter to `0` to not send heartbeat messages at all. +
Disabled by default.
@ -1350,7 +1408,7 @@ When set to `0` the connector will fail immediately when it cannot obtain the lo
|`snapshot.select.statement.overrides`
|
|Controls which rows from tables will be included in snapshot. +
|Controls which rows from tables are included in snapshot. +
This property contains a comma-separated list of fully-qualified tables _(SCHEMA_NAME.TABLE_NAME)_. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id `snapshot.select.statement.overrides.[SCHEMA_NAME].[TABLE_NAME]`. The value of those properties is the SELECT statement to use when retrieving data from the specific table during snapshotting. _A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted._ +
*Note*: This setting has impact on snapshots only. Events captured during log reading are not affected by it.
@ -1360,12 +1418,14 @@ This property contains a comma-separated list of fully-qualified tables _(SCHEMA
changes to the structure of the `source` block in order to unify the exposed structure across
all the connectors. +
By setting this option to `v1` the structure used in earlier versions can be produced.
Note that this setting is not recommended and is planned for removal in a future Debezium version.
Note that this setting is not recommended and is planned for removal in a future {ProductName} version.
|`sanitize.field.names`
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
|Whether field names will be sanitized to adhere to Avro naming requirements.
|Whether field names are sanitized to adhere to Avro naming requirements.
ifndef::cdc-product[]
See xref:configuration/avro.adoc#names[Avro naming] for more details.
endif::cdc-product[]
|`database.server.timezone`
|