tet123/documentation/modules/ROOT/pages/connectors/mongodb.adoc
Jakub Cechacek c03dfa8636 DBZ-7862 Applied docs suggestions from code review
Co-authored-by: roldanbob <broldan@redhat.com>
2024-05-24 20:04:22 +02:00

2432 lines
134 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Category: debezium-using
// Type: assembly
[id="debezium-connector-for-mongodb"]
= {prodname} connector for MongoDB
:context: mongodb
:data-collection: collection
:mbean-name: {context}
:connector-file: {context}
:connector-class: MongoDb
:connector-name: MongoDB
:include-list-example: public.inventory
ifdef::community[]
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
:source-highlighter: highlight.js
toc::[]
endif::community[]
{prodname}'s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics.
The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and awaiting the resolution of communications problems.
ifdef::community[]
For information about the MongoDB versions that are compatible with this connector, see the link:https://debezium.io/releases/[{prodname} release overview].
endif::community[]
ifdef::product[]
For information about the MongoDB versions that are compatible with this connector, see the link:{LinkDebeziumSupportedConfigurations}[{NameDebeziumSupportedConfigurations}].
endif::product[]
ifdef::product[]
Information and procedures for using a {prodname} MongoDB connector is organized as follows:
* xref:overview-of-debezium-mongodb-connector[]
* xref:how-debezium-mongodb-connectors-work[]
* xref:descriptions-of-debezium-mongodb-connector-data-change-events[]
* xref:setting-up-mongodb-to-work-with-debezium[]
* xref:deployment-of-debezium-mongodb-connectors[]
* xref:monitoring-debezium-mongodb-connector-performance[]
* xref:how-debezium-mongodb-connectors-handle-faults-and-problems[]
endif::product[]
// Type: concept
// Title: Overview of {prodname} MongoDB connector
// ModuleID: overview-of-debezium-mongodb-connector
[[mongodb-overview]]
== Overview
MongoDB's replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production.
MongoDB connector captures the changes in a replica set or sharded cluster.
A MongoDB _replica set_ consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set's _primary_ are correctly applied to the other replica set's servers, called _secondaries_.
MongoDB replication works by having the primary record the changes in its _oplog_ (or operation log), and then each of the secondaries reads the primary's oplog and applies in order all of the operations to their own documents.
When a new server is added to a replica set, that server first performs an https://docs.mongodb.com/manual/core/replica-set-sync/[snapshot] of all of the databases and collections on the primary, and then reads the primary's oplog to apply all changes that might have been made since it began the snapshot.
This new server becomes a secondary (and able to handle queries) when it catches up to the tail of the primary's oplog.
// Type: concept
// Title: Description of how the MongoDB connector uses change streams to capture event records
// ModuleID: description-of-how-the-mongodb-connector-uses-change-streams-to-capture-event-records
[id="change-streams"]
=== Change streams
Although the {prodname} MongoDB connector does not become part of a replica set, it uses a similar replication mechanism to obtain oplog data.
The main difference is that the connector does not read the oplog directly.
Instead, it delegates the capture and decoding of oplog data to the MongoDB https://docs.mongodb.com/manual/changeStreams/[change streams] feature.
With change streams, the MongoDB server exposes the changes that occur in a collection as an event stream.
The {prodname} connector monitors the stream and then delivers the changes downstream.
The first time that the connector detects a replica set, it examines the oplog to obtain the last recorded transaction, and then performs a snapshot of the primary's databases and collections.
After the connector finishes copying the data, it creates a change stream beginning from the oplog position that it read earlier.
As the MongoDB connector processes changes, it periodically records the position at which the event originated in the oplog stream.
When the connector stops, it records the last oplog stream position that it processed, so that after a restart it can resume streaming from that position.
In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and always pick up exactly where it left off without losing a single event.
Of course, MongoDB oplogs are usually capped at a maximum size, so if the connector is stopped for long periods, operations in the oplog might be purged before the connector has a chance to read them.
In this case, after a restart the connector detects the missing oplog operations, performs a snapshot, and then proceeds to stream changes.
The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures.
The connector always uses the replica set's primary node to stream changes, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop streaming changes, connect to the new primary, and start streaming changes using the new primary node.
Similarly, if connector is unable to communicate with the replica set primary, it attempts to reconnect (using exponential backoff so as to not overwhelm the network or replica set).
After connection is reestablished, the connector continues to stream changes from the last event that it captured.
In this way the connector dynamically adjusts to changes in replica set membership, and automatically handles communication disruptions.
.Additional resources
* link:https://docs.mongodb.com/manual/replication/[Replication mechanism]
* link:https://docs.mongodb.com/manual/tutorial/deploy-replica-set/[Replica set]
* link:https://docs.mongodb.com/manual/core/replica-set-elections/[Replica set elections]
* link:https://docs.mongodb.com/manual/core/sharded-cluster-components/[Sharded cluster]
* link:https://docs.mongodb.com/manual/tutorial/add-shards-to-shard-cluster/[Shard addition]
* link:https://docs.mongodb.com/manual/tutorial/remove-shards-from-cluster/[Shard removal]
* link:https://docs.mongodb.com/manual/changeStreams/[Change Streams]
// Type: concept
// Title: Description of how the MongoDB connector uses the MongoDB read preference
// ModuleID: description-of-how-the-mongodb-connector-uses-the-mongodb-read-preference
[id="read-preference"]
=== Read Preference
You specify read preferences for a MongoDB connection by setting the `readPreference` parameter in the xref:mongodb-property-mongodb-connection-string[`mongodb.connection.string`].
// Type: assembly
// ModuleID: how-debezium-mongodb-connectors-work
// Title: How {prodname} MongoDB connectors work
[[how-the-mongodb-connector-works]]
== How the MongoDB connector works
An overview of the MongoDB topologies that the connector supports is useful for planning your application.
When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets.
Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set.
The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.
[NOTE]
====
When running the connector against a sharded cluster, use a value of `tasks.max` that is greater than the number of replica sets.
This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.
====
ifdef::product[]
The following topics provide details about how the {prodname} MongoDB connector works:
* xref:mongodb-topologies-supported-by-debezium-connectors[]
* xref:how-debezium-mongodb-connectors-use-logical-names-for-replica-sets-and-sharded-clusters[]
* xref:how-debezium-mongodb-connectors-perform-snapshots[]
* xref:debezium-mongodb-ad-hoc-snapshots[]
* xref:debezium-mongodb-incremental-snapshots[]
* xref:how-the-debezium-mongodb-connector-streams-change-event-records[]
* xref:default-names-of-kafka-topics-that-receive-debezium-mongodb-change-event-records[]
* xref:how-event-keys-control-topic-partitioning-for-the-debezium-mongodb-connector[]
* xref:debezium-mongodb-connector-generated-events-that-represent-transaction-boundaries[]
endif::product[]
// Type: concept
// ModuleID: mongodb-topologies-supported-by-debezium-connectors
// Title: MongoDB topologies supported by {prodname} connectors
[[supported-mongodb-topologies]]
=== Supported MongoDB topologies
The MongoDB connector supports the following MongoDB topologies:
[[mongodb-replicaset]]
MongoDB replica set::
The {prodname} MongoDB connector can capture changes from a single https://docs.mongodb.com/manual/replication/[MongoDB replica set].
Production replica sets require a minimum of https://docs.mongodb.com/manual/core/replica-set-architecture-three-members/[at least three members].
+
To use the MongoDB connector with a replica set, you must set the value of the `mongodb.connection.string` property in the connector configuration to the https://www.mongodb.com/docs/manual/reference/connection-string/[replica set connection string].
When the connector is ready to begin capturing changes from a MongoDB change stream, it starts a connection task.
The connection task then uses the specified connection string to establish a connection to an available replica set member.
[[mongodb-sharded-cluster]]
MongoDB sharded cluster::
A https://docs.mongodb.com/manual/sharding/[MongoDB sharded cluster] consists of:
* One or more _shards_, each deployed as a replica set;
* A separate replica set that acts as the cluster's _configuration server_
* One or more _routers_ (also called `mongos`) to which clients connect and that routes requests to the appropriate shards
+
To use the MongoDB connector with a sharded cluster, in the connector configuration, set the value of the `mongodb.connection.string` property to the https://www.mongodb.com/docs/manual/reference/connection-string/[sharded cluster connection string].
[[mongodb-standalone-server]]
MongoDB standalone server::
The MongoDB connector is not capable of monitoring the changes of a standalone MongoDB server, since standalone servers do not have an oplog.
The connector will work if the standalone server is converted to a replica set with one member.
[NOTE]
====
MongoDB does not recommend running a standalone server in production.
For more information, see the https://docs.mongodb.com/manual/core/replica-set-architectures/[MongoDB documentation].
====
// Type: concept
// ModuleID: mongodb-user-permissions-required-by-debezium-connectors
// Title: User permissions required by {prodname} connectors
[[required-user-permissions]]
=== Required user permissions
To capture data from MongoDB, {prodname} attaches to the database as a MongoDB user.
The MongoDB user account that you create for {prodname} requires specific database permissions to read from the database.
The connector user requires the following permissions:
* Read from the database.
* Run the `hello` command.
The connector user might also require the following permission:
* Read from the `config.shards` system collection.
.Database read permissions
The connector user must be able to read from all databases, or to read from a specific database, depending on the value of the connector's xref:mongodb-property-capture-scope[`capture.scope`] property.
Assign one of the following permissions to the user, depending on the `capture.scope` setting:
`capture.scope` is set to `deployment`::
Grant the user permission to read any database.
`capture.scope` is set to `database`::
Grant the user permission to read the database specified by the connector's xref:mongodb-property-capture-target[`capture.target`] property.
.Permission to use the MongoDB `hello` command
Regardless of the `capture.scope` setting, the user requires permission to run the MongoDB https://www.mongodb.com/docs/manual/reference/command/hello/[hello] command.
.Permission to read the `config.shards` collection
Depending on your {prodname} environment, to enable the connector to perform offset consolidation, you must grant the connector user explicit permission to read the `config.shards` collection.
Permission to read the `config.shards` collection is required for the following connector environments:
* Connectors upgraded from {prodname} 2.5 or earlier.
* Connectors configured to capture changes from a sharded MongoDB cluster.
// Type: concept
// Title: How {prodname} MongoDB connectors use logical names for replica sets and sharded clusters
// ModuleID: how-debezium-mongodb-connectors-use-logical-names-for-replica-sets-and-sharded-clusters
[[mongodb-logical-connector-name]]
=== Logical connector name
The connector configuration property `topic.prefix` serves as a _logical name_ for the MongoDB replica set or sharded cluster.
The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the change stream position of each replica set.
You should give each MongoDB connector a unique logical name that meaningfully describes the source MongoDB system.
We recommend logical names begin with an alphabetic or underscore character, and remaining characters that are alphanumeric or underscore.
// Type: concept
// Title: How {prodname} MongoDB connectors perform offset consolidation
// ModuleID: how-debezium-mongodb-connectors-perform-offset-consolidation
[[mongodb-offset-consolidation]]
=== Offset consolidation
The {prodname} MongoDB connector no longer supports `replica_set` connections to sharded MongoDB deployments.
As a result, the offsets recorded by connector versions that used the `replica_set` connection mode are incompatible with the current version.
To minimize the affects of the connection mode change, and to prevent the connector from running unnecessary snapshots, when the connector restarts after the upgrade, it runs a procedure to consolidate offsets.
During this offset consolidation procedure, the connector completes the following steps to reconcile offsets that were recorded by the earlier connector version:
1. Offsets that were recorded by connector versions later than 2.5 are used as-is.
2. Offsets for events that were captured in `sharded` connection mode from sharded MongoDB deployments, or from MongoDB replica set deployments, are used as-is.
3. Shard-specific offsets that were recorded by connector versions 2.5.x and earlier are used as-is, if both of the following conditions apply:
* The offsets exist for all current database shards.
* xref:mongodb-property-allow-offset-invalidation[Offset invalidation] is enabled. +
If offset invalidation is disabled, the connector fails to start.
4. After the connector processes existing offsets in the preceding steps, it resumes streaming changes, and then commits offsets for new events that it captures. +
If the offset consolidation procedure does not detect any existing offsets, the connector xref:mongodb-performing-a-snapshot[performs an initial snapshot].
// Type: concept
// Title: How {prodname} MongoDB connectors perform snapshots
// ModuleID: how-debezium-mongodb-connectors-perform-snapshots
[[mongodb-performing-a-snapshot]]
=== Performing a snapshot
When a {prodname} task starts to use a replica set, it uses the connector's logical name and the replica set name to find an _offset_ that describes the position where the connector previously stopped reading changes.
If an offset can be found and it still exists in the oplog, then the task immediately proceeds with xref:mongodb-streaming-changes[streaming changes], starting at the recorded offset position.
However, if no offset is found, or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing a _snapshot_.
This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes a snapshot has been started).
The task then proceeds to copy each collection, spawning as many threads as possible (up to the value of the `snapshot.max.threads` configuration property) to perform this work in parallel.
The connector records a separate _read event_ for each document it sees.
Each read event contains the object's identifier, the complete state of the object, and _source_ information about the MongoDB replica set where the object was found.
The source information also includes a flag that denotes that the event was produced during a snapshot.
This snapshot will continue until it has copied all collections that match the connector's filters.
If the connector is stopped before the tasks' snapshots are completed, upon restart the connector begins the snapshot again.
[NOTE]
====
Try to avoid task reassignment and reconfiguration while the connector performs snapshots of any replica sets.
The connector generates log messages to report on the progress of the snapshot.
To provide for the greatest control, run a separate Kafka Connect cluster for each connector.
====
ifdef::product[]
You can find more information about snapshots in the following sections:
* xref:debezium-postgresql-ad-hoc-snapshots[]
* xref:debezium-postgresql-incremental-snapshots[]
endif::product[]
[id="mongodb-connector-snapshot-mode-options"]
.Settings for `snapshot.mode` connector configuration property
[cols="30%a,70%a",options="header"]
|===
|Setting |Description
|`always`
|The connector performs a snapshot every time that it starts.
After the snapshot completes, the connector begins to stream event records for subsequent database changes.
|`initial`
|After the connector starts, it performs an initial database snapshot.
//as described in the xref:default-workflow-for-performing-an-initial-snapshot[default workflow for creating an initial snapshot].
|`initial_only`
|The connector performs a database snapshot.
After the snapshot completes, the connector stops, and does not stream event records for subsequent database changes.
|`never`
|Deprecated, see `no_data`.
|`no_data`
|The connector captures the structure of all relevant tables,
//performing all the steps described in the xref:default-workflow-for-performing-an-initial-snapshot[default snapshot workflow],
but it does not create `READ` events to represent the data set at the point of the connector's start-up.
//(Step 7.b).
|`when_needed`
|After the connector starts, it performs a snapshot only if it detects one of the following circumstances:
* It cannot detect any topic offsets.
* A previously recorded offset specifies a log position that is not available on the server.
ifdef::community[]
|`configuration_based`
|Set the snapshot mode to `configuration_based` to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'.
endif::community[]
ifdef::community[]
|`custom`
|The `custom` snapshot mode lets you inject your own implementation of the `io.debezium.spi.snapshot.Snapshotter` interface.
Set the `snapshot.mode.custom.name` configuration property to the name provided by the `name()` method of your implementation.
The name is specified on the classpath of your Kafka Connect cluster.
If you use the {prodname} `EmbeddedEngine`, the name is included in the connector JAR file.
For more information, see xref:connector-custom-snapshot[custom snapshotter SPI].
endif::community[]
|===
For more information, see xref:mongodb-property-snapshot-mode[`snapshot.mode`] in the table of connector configuration properties.
// Type: concept
// ModuleID: debezium-mongodb-ad-hoc-snapshots
[id="mongodb-ad-hoc-snapshots"]
=== Ad hoc snapshots
include::{partialsdir}/modules/all-connectors/con-connector-ad-hoc-snapshots.adoc[leveloffset=+3]
// Type: assembly
// ModuleID: debezium-mongodb-incremental-snapshots
[id="debezium-mongodb-incremental-snapshots"]
=== Incremental snapshots
include::{partialsdir}/modules/all-connectors/con-connector-incremental-snapshot.adoc[leveloffset=+3]
[WARNING]
====
Incremental snapshots require that the primary key for each table is stably ordered.
Because `String` fields can include special characters, and are subject to different encodings, string-based primary keys do not lend themselves to sorting in a consistent and predictable order.
When performing incremental snapshots, it's best to set the primary key to a data type other than `String`.
For more information about BSON string types in MongoDB, see the https://www.mongodb.com/docs/manual/reference/bson-types/#string/[MongoDB documentation]).
====
.Incremental snapshots for sharded clusters
To use incremental snapshots with sharded MongoDB clusters, you must set specific values for the following properties:
* Set xref:mongodb-property-mongodb-connection-mode[`mongodb.connection.mode`] to `sharded`.
* Set xref:mongodb-property-incremental-snapshot-chunk-size[`incremental.snapshot.chunk.size`] to a value that is high enough to compensate for the link:https://www.mongodb.com/docs/manual/administration/change-streams-production-recommendations/#sharded-clusters[increased complexity] of change stream pipelines.
// Type: procedure
// ModuleID: debezium-mongodb-triggering-an-incremental-snapshot
[id="mongodb-triggering-an-incremental-snapshot"]
==== Triggering an incremental snapshot
include::{partialsdir}/modules/all-connectors/proc-triggering-an-incremental-snapshot-nosql.adoc[leveloffset=+1]
// Type: procedure
// ModuleID: debezium-mongdb-using-the-kafka-signaling-channel-to-trigger-an-incremental-snapshot
[id="mongodb-triggering-an-incremental-snapshot-kafka"]
==== Using the Kafka signaling channel to trigger an incremental snapshot
include::{partialsdir}/modules/all-connectors/proc-triggering-an-incremental-snapshot-kafka.adoc[leveloffset=+1]
// Type: procedure
// ModuleID: debezium-mongodb-stopping-an-incremental-snapshot
[id="mongodb-stopping-an-incremental-snapshot"]
==== Stopping an incremental snapshot
include::{partialsdir}/modules/all-connectors/proc-stopping-an-incremental-snapshot-nosql.adoc[leveloffset=+1]
// Type: procedure
// ModuleID: debezium-mongodb-using-the-kafka-signaling-channel-to-stop-an-incremental-snapshot
[id="mongodb-stopping-an-incremental-snapshot-kafka"]
==== Using the Kafka signaling channel to stop an incremental snapshot
include::{partialsdir}/modules/all-connectors/proc-stopping-an-incremental-snapshot-kafka.adoc[leveloffset=+1]
ifdef::community[]
[[connector-custom-snapshot]]
=== Custom snapshotter SPI
include::{partialsdir}/modules/all-connectors/custom-snapshotter-spi.adoc[leveloffset=+3]
endif::community[]
// Type: concept
[id="mongodb-blocking-snapshots"]
=== Blocking snapshots
include::{partialsdir}/modules/all-connectors/con-connector-blocking-snapshot.adoc[leveloffset=+3]
// Type: concept
// ModuleID: how-the-debezium-mongodb-connector-streams-change-event-records
// Title: How the {prodname} MongoDB connector streams change event records
[[mongodb-streaming-changes]]
=== Streaming changes
[[mongodb-tailing-the-oplog]]
After the connector task for a replica set records an offset, it uses the offset to determine the position in the oplog where it should start streaming changes.
The task then (depending on the configuration) either connects to the replica set's primary node or connects to a replica-set-wide change stream and starts streaming changes from that position.
It processes all of create, insert, and delete operations, and converts them into {prodname} xref:mongodb-events[change events].
Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset.
The interval at which the offset is recorded is governed by link:https://kafka.apache.org/documentation/#offset.flush.interval.ms[`offset.flush.interval.ms`], which is a Kafka Connect worker configuration property.
When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off.
If the connector's tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the last _recorded_ offset, possibly generating some the same events that were previously generated just prior to the crash.
[NOTE]
====
When all components in a Kafka pipeline operate nominally, Kafka consumers receive every message *_exactly once_*.
However, when things go wrong, Kafka can only guarantee that consumers receive every message *_at least once_*.
To avoid unexpected results, consumers must be able to handle duplicate messages.
====
As mentioned earlier, the connector tasks always use the replica set's primary node to stream changes from the oplog, ensuring that the connector sees the most up-to-date operations as possible and can capture the changes with lower latency than if secondaries were to be used instead.
When the replica set elects a new primary, the connector immediately stops streaming changes, connects to the new primary, and starts streaming changes from the new primary node at the same position.
Likewise, if the connector experiences any problems communicating with the replica set members, it tries to reconnect, by using exponential backoff so as to not overwhelm the replica set, and once connected it continues streaming changes from where it last left off.
In this way, the connector is able to dynamically adjust to changes in replica set membership and automatically handle communication failures.
To summarize, the MongoDB connector continues running in most situations. Communication problems might cause the connector to wait until the problems are resolved.
// Type: concept
// ModuleID: mongodb-support-for-populating-the-before-state-in-debezium-change-events
// Title: MongoDB support for populating the `before` field in {prodname} change event
[[mongodb-pre-image-support]]
=== Pre-image support
In MongoDB 6.0 and later, you can configure change streams to emit the pre-image state of a document to populate the `before` field for MongoDB change events.
To enable the use of pre-images in MongoDB, you must set the `changeStreamPreAndPostImages` for a collection by using `db.createCollection()`, `create`, or `collMod`.
To enable the {prodname} MongoDB to include pre-images in change events, set the `capture.mode` for the connector to one of the `*_with_pre_image` options.
[NOTE]
.Size limits on MongoDB change stream events
====
The size of a MongoDB change stream event is limited to 16 megabytes.
The use of pre-images thus increases the likelihood of exceeding this threshold, which can lead to failures.
For information about how to avoid exceeding the change stream limit, see the https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre--and-post-images/[MongoDB documentation].
====
// Type: concept
// ModuleID: default-names-of-kafka-topics-that-receive-debezium-mongodb-change-event-records
// Title: Default names of Kafka topics that receive {prodname} MongoDB change event records
[[mongodb-topic-names]]
=== Topic names
The MongoDB connector writes events for all insert, update, and delete operations to documents in each collection to a single Kafka topic.
The name of the Kafka topics always takes the form _logicalName_._databaseName_._collectionName_, where _logicalName_ is the xref:mongodb-logical-connector-name[logical name] of the connector as specified with the `topic.prefix` configuration property, _databaseName_ is the name of the database where the operation occurred, and _collectionName_ is the name of the MongoDB collection in which the affected document existed.
For example, consider a MongoDB replica set with an `inventory` database that contains four collections: `products`, `products_on_hand`, `customers`, and `orders`.
If the connector monitoring this database were given a logical name of `fulfillment`, then the connector would produce events on these four Kafka topics:
* `fulfillment.inventory.products`
* `fulfillment.inventory.products_on_hand`
* `fulfillment.inventory.customers`
* `fulfillment.inventory.orders`
Notice that the topic names do not incorporate the replica set name or shard name.
As a result, all changes to a sharded collection (where each shard contains a subset of the collection's documents) all go to the same Kafka topic.
You can set up Kafka to {link-kafka-docs}.html#basic_ops_add_topic[auto-create] the topics as they are needed.
If not, then you must use Kafka administration tools to create the topics before starting the connector.
// Type: concept
// ModuleID: how-event-keys-control-topic-partitioning-for-the-debezium-mongodb-connector
// Title: How event keys control topic partitioning for the {prodname} MongoDB connector
[[mongodb-partitions]]
=== Partitions
The MongoDB connector does not make any explicit determination about how to partition topics for events.
Instead, it allows Kafka to determine how to partition topics based on event keys.
You can change Kafka's partitioning logic by defining the name of the `Partitioner` implementation in the Kafka Connect worker configuration.
Kafka maintains total order only for events written to a single topic partition.
Partitioning the events by key does mean that all events with the same key always go to the same partition.
This ensures that all events for a specific document are always totally ordered.
// Type: concept
// ModuleID: debezium-mongodb-connector-generated-events-that-represent-transaction-boundaries
// Title: {prodname} MongoDB connector-generated events that represent transaction boundaries
[[mongodb-transaction-metadata]]
=== Transaction Metadata
{prodname} can generate events that represents transaction metadata boundaries and enrich change data event messages.
[NOTE]
.Limits on when {prodname} receives transaction metadata
====
{prodname} registers and receives metadata only for transactions that occur after you deploy the connector.
Metadata for transactions that occur before you deploy the connector is not available.
====
For every transaction `BEGIN` and `END`, {prodname} generates an event that contains the following fields:
`status`:: `BEGIN` or `END`
`id`:: String representation of unique transaction identifier.
`event_count` (for `END` events):: Total number of events emitted by the transaction.
`data_collections` (for `END` events):: An array of pairs of `data_collection` and `event_count` that provides number of events emitted by changes originating from given data collection.
The following example shows a typical message:
[source,json,indent=0,subs="+attributes"]
----
{
"status": "BEGIN",
"id": "1462833718356672513",
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "1462833718356672513",
"event_count": 2,
"data_collections": [
{
"data_collection": "rs0.testDB.collectiona",
"event_count": 1
},
{
"data_collection": "rs0.testDB.collectionb",
"event_count": 1
}
]
}
----
Unless overridden via the xref:mongodb-property-topic-transaction[`topic.transaction`] option,
transaction events are written to the topic named xref:mongodb-property-topic-prefix[`_<topic.prefix>_`]`.transaction`.
.Change data event enrichment
When transaction metadata is enabled, the data message `Envelope` is enriched with a new `transaction` field.
This field provides information about every event in the form of a composite of fields:
`id`:: String representation of unique transaction identifier.
`total_order`:: The absolute position of the event among all events generated by the transaction.
`data_collection_order`:: The per-data collection position of the event among all events that were emitted by the transaction.
Following is an example of what a message looks like:
[source,json,indent=0,subs="+attributes"]
----
{
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335486",
"ts_ns": "1580390884335486281",
"transaction": {
"id": "1462833718356672513",
"total_order": "1",
"data_collection_order": "1"
}
}
----
// Type: assembly
// ModuleID: descriptions-of-debezium-mongodb-connector-data-change-events
// Title: Descriptions of {prodname} MongoDB connector data change events
[[mongodb-events]]
== Data change events
The {prodname} MongoDB connector generates a data change event for each document-level operation that inserts, updates, or deletes data. Each event contains a key and a value. The structure of the key and the value depends on the collection that was changed.
{prodname} and Kafka Connect are designed around _continuous streams of event messages_. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A `schema` field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converter and you configure it to produce all four basic change event parts, change events have this structure:
[source,json,index=0]
----
{
"schema": { // <1>
...
},
"payload": { // <2>
...
},
"schema": { // <3>
...
},
"payload": { // <4>
...
},
}
----
.Overview of change event basic content
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The first `schema` field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key's `payload` portion. In other words, the first `schema` field describes the structure of the key for the document that was changed.
|2
|`payload`
|The first `payload` field is part of the event key. It has the structure described by the previous `schema` field and it contains the key for the document that was changed.
|3
|`schema`
|The second `schema` field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value's `payload` portion. In other words, the second `schema` describes the structure of the document that was changed. Typically, this schema contains nested schemas.
|4
|`payload`
|The second `payload` field is part of the event value. It has the structure described by the previous `schema` field and it contains the actual data for the document that was changed.
|===
By default, the connector streams change event records to topics with names that are the same as the event's originating collection. See xref:mongodb-topic-names[topic names].
[WARNING]
====
The MongoDB connector ensures that all Kafka Connect schema names adhere to the link:http://avro.apache.org/docs/current/spec.html#names[Avro schema name format]. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or \_. Each remaining character in the logical server name and each character in the database and collection names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.
This can lead to unexpected conflicts if the logical server name, a database name, or a collection name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.
====
ifdef::product[]
For more information, see the following topics:
* xref:about-keys-in-debezium-mongodb-change-events[]
* xref:about-values-in-debezium-mongodb-change-events[]
endif::product[]
// Type: concept
// ModuleID: about-keys-in-debezium-mongodb-change-events
// Title: About keys in {prodname} MongoDB change events
[[mongodb-change-events-key]]
=== Change event keys
A change event's key contains the schema for the changed document's key and the changed document's actual key. For a given collection, both the schema and its corresponding payload contain a single `id` field.
The value of this field is the document's identifier represented as a string that is derived from link:https://docs.mongodb.com/manual/reference/mongodb-extended-json/[MongoDB extended JSON serialization strict mode].
Consider a connector with a logical name of `fulfillment`, a replica set containing an `inventory` database, and a `customers` collection that contains documents such as the following.
.Example document
[source,json,indent=0]
----
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
----
.Example change event key
Every change event that captures a change to the `customers` collection has the same event key schema. For as long as the `customers` collection has the previous definition, every change event that captures a change to the `customers` collection has the following key structure. In JSON, it looks like this:
[source,json,indent=0]
----
{
"schema": { // <1>
"type": "struct",
"name": "fulfillment.inventory.customers.Key", // <2>
"optional": false, // <3>
"fields": [ // <4>
{
"field": "id",
"type": "string",
"optional": false
}
]
},
"payload": { // <5>
"id": "1004"
}
}
----
.Description of change event key
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The schema portion of the key specifies a Kafka Connect schema that describes what is in the key's `payload` portion.
|2
|`fulfillment.inventory.customers.Key`
a|Name of the schema that defines the structure of the key's payload. This schema describes the structure of the key for the document that was changed. Key schema names have the format _connector-name_._database-name_._collection-name_.`Key`. In this example: +
* `fulfillment` is the name of the connector that generated this event. +
* `inventory` is the database that contains the collection that was changed. +
* `customers` is the collection that contains the document that was updated.
|3
|`optional`
|Indicates whether the event key must contain a value in its `payload` field. In this example, a value in the key's payload is required. A value in the key's payload field is optional when a document does not have a key.
|4
|`fields`
|Specifies each field that is expected in the `payload`, including each field's name, type, and whether it is required.
|5
|`payload`
|Contains the key for the document for which this change event was generated. In this example, the key contains a single `id` field of type `string` whose value is `1004`.
|===
This example uses a document with an integer identifier, but any valid MongoDB document identifier works the same way, including a document identifier. For a document identifier, an event key's `payload.id` value is a string that represents the updated document's original `_id` field as a MongoDB extended JSON serialization that uses strict mode. The following table provides examples of how different types of `_id` fields are represented.
.Examples of representing document `_id` fields in event key payloads
[options="header",role="code-wordbreak-col2 code-wordbreak-col3"]
|===
|Type |MongoDB `_id` Value|Key's payload
|Integer |1234|`{ "id" : "1234" }`
|Float |12.34|`{ "id" : "12.34" }`
|String |"1234"|`{ "id" : "\"1234\"" }`
|Document|`{ "hi" : "kafka", "nums" : [10.0, 100.0, 1000.0] }`|`{ "id" : "{\"hi\" : \"kafka\", \"nums\" : [10.0, 100.0, 1000.0]}" }`
|ObjectId |`ObjectId("596e275826f08b2730779e1f")`|`{ "id" : "{\"$oid\" : \"596e275826f08b2730779e1f\"}" }`
|Binary |`BinData("a2Fma2E=",0)`|`{ "id" : "{\"$binary\" : \"a2Fma2E=\", \"$type\" : \"00\"}" }`
|===
// Type: concept
// ModuleID: about-values-in-debezium-mongodb-change-events
// Title: About values in {prodname} MongoDB change events
[[mongodb-change-events-value]]
=== Change event values
The value in a change event is a bit more complicated than the key. Like the key, the value has a `schema` section and a `payload` section. The `schema` section contains the schema that describes the `Envelope` structure of the `payload` section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.
Consider the same sample document that was used to show an example of a change event key:
.Example document
[source,json,indent=0]
----
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
----
The value portion of a change event for a change to this document is described for each event type:
* <<mongodb-create-events,_create_ events>>
* <<mongodb-update-events,_update_ events>>
* <<mongodb-delete-events,_delete_ events>>
* <<mongodb-tombstone-events,Tombstone events>>
[id="mongodb-create-events"]
=== _create_ events
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the `customers` collection:
[source,json,options="nowrap",indent=0,subs="+attributes"]
----
{
"schema": { // <1>
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json", // <2>
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source", // <3>
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope" // <4>
},
"payload": { // <5>
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", // <6>
"source": { // <7>
"version": "{debezium-version}",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_ms": 1558965508000000,
"ts_ms": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c", // <8>
"ts_ms": 1558965515240, // <9>
"ts_us": 1558965515240142, // <10>
"ts_ns": 1558965515240142879, // <11>
}
}
----
.Descriptions of _create_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The value's schema, which describes the structure of the value's payload. A change event's value schema is the same in every change event that the connector generates for a particular collection.
|2
|`name`
a|In the `schema` section, each `name` field specifies the schema for a field in the value's payload. +
+
`io.debezium.data.Json` is the schema for the payload's `after`, `patch`, and `filter` fields. This schema is specific to the `customers` collection. A _create_ event is the only kind of event that contains an `after` field. An _update_ event contains a `filter` field and a `patch` field. A _delete_ event contains a `filter` field, but not an `after` field nor a `patch` field.
|3
|`name`
a|`io.debezium.connector.mongo.Source` is the schema for the payload's `source` field. This schema is specific to the MongoDB connector. The connector uses it for all events that it generates.
|4
|`name`
a|`dbserver1.inventory.customers.Envelope` is the schema for the overall structure of the payload, where `dbserver1` is the connector name, `inventory` is the database, and `customers` is the collection. This schema is specific to the collection.
|5
|`payload`
|The value's actual data. This is the information that the change event is providing. +
+
It may appear that the JSON representations of the events are much larger than the documents they describe. This is because the JSON representation must include the schema and the payload portions of the message.
However, by using the {link-prefix}:{link-avro-serialization}#avro-serialization[Avro converter], you can significantly decrease the size of the messages that the connector streams to Kafka topics.
|6
|`after`
|An optional field that specifies the state of the document after the event occurred.
In this example, the `after` field contains the values of the new document's `\_id`, `first_name`, `last_name`, and `email` fields.
The `after` value is always a string.
By convention, it contains a JSON representation of the document.
MongoDB oplog entries contain the full state of a document only for _create_ events and also for `update` events, when the `capture.mode` option is set to `change_streams_update_full`;
in other words, a _create_ event is the only kind of event that contains an _after_ field regardless of `capture.mode` option.
|7
|`source`
a|Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
* {prodname} version.
* Name of the connector that generated the event.
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contain the new document.
* If the event was part of a snapshot.
* Timestamp for when the change was made in the database and ordinal of the event within the timestamp.
* Unique identifier of the MongoDB operation (the `h` field in the oplog event).
* Unique identifiers of the MongoDB session `lsid` and transaction number `txnNumber` in case the change was executed inside a transaction (change streams capture mode only).
|8
|`op`
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `c` indicates that the operation created a document. Valid values are:
* `c` = create
* `u` = update
* `d` = delete
* `r` = read (applies to only snapshots)
|9
|`ts_ms`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|10
|`ts_us`
a|Optional field that displays the time at which the connector processed the event, in microseconds.
The time is based on the system clock in the JVM running the Kafka Connect task.
|9
|`ts_ns`
a|Optional field that displays the time at which the connector processed the event, in nanoseconds.
The time is based on the system clock in the JVM running the Kafka Connect task.
|===
[id="mongodb-update-events"]
=== _update_ events
==== Change streams capture mode
The value of a change event for an update in the sample `customers` collection has the same schema as a _create_ event for that collection.
Likewise, the event value's payload has the same structure.
However, the event value payload contains different values in an _update_ event.
An _update_ event includes an `after` value only if the `capture.mode` option is set to `change_streams_update_full`.
A `before` value is provided if the `capture.mode` option is set to one of the `*_with_pre_image` option.
There is a new structured field `updateDescription` with a few additional fields in this case:
* `updatedFields` is a string field that contains the JSON representation of the updated document fields with their values
* `removedFields` is a list of field names that were removed from the document
* `truncatedArrays` is a list of arrays in the document that were truncated
Here is an example of a change event value in an event that the connector generates for an update in the `customers` collection:
[source,json,indent=0,options="nowrap",subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"op": "u", // <1>
"ts_ms": 1465491461815, // <2>
"ts_us": 1465491461815698, // <2>
"ts_ns": 1465491461815698142, // <2>
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", // <3>
"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", // <4>
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Anne Marie\"}", // <5>
"truncatedArrays": null
},
"source": { // <6>
"version": "{debezium-version}",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1,
"h": null,
"tord": null,
"stxnid": null,
"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
"txnNumber":1
}
}
}
----
.Descriptions of _update_ event value fields
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`op`
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `u` indicates that the operation updated a document.
|2
|`ts_ms`, `ts_us`, `ts_ns`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|3
|`before`
|Contains the JSON string representation of the actual MongoDB document before change.
+
An _update_ event value does not contain an `before` field if the capture mode is not set to one of the `*_with_preimage` options.
|4
|`after`
|Contains the JSON string representation of the actual MongoDB document.
+
An _update_ event value does not contain an `after` field if the capture mode is not set to `change_streams_update_full`
|5
|`updatedFields`
|Contains the JSON string representation of the updated field values of the document. In this example, the update changed the `first_name` field to a new value.
|6
|`source`
a|Mandatory field that describes the source metadata for the event. This field contains the same information as a _create_ event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
* {prodname} version.
* Name of the connector that generated the event.
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contain the updated document.
* If the event was part of a snapshot.
* Timestamp for when the change was made in the database and ordinal of the event within the timestamp.
* Unique identifiers of the MongoDB session `lsid` and transaction number `txnNumber` in case the change was executed inside a transaction.
|===
[WARNING]
====
The `after` value in the event should be handled as the at-point-of-time value of the document.
The value is not calculated dynamically but is obtained from the collection.
It is thus possible if multiple updates are closely following one after the other, that all _update_ updates events will contain the same `after` value which will be representing the last value stored in the document.
If your application depends on gradual change evolution then you should rely on `updateDescription` only.
====
[id="mongodb-delete-events"]
=== _delete_ events
The value in a _delete_ change event has the same `schema` portion as _create_ and _update_ events for the same collection. The `payload` portion in a _delete_ event contains values that are different from _create_ and _update_ events for the same collection. In particular, a _delete_ event contains neither an `after` value nor a `updateDescription` value. Here is an example of a _delete_ event for a document in the `customers` collection:
[source,json,indent=0,subs="+attributes"]
----
{
"schema": { ... },
"payload": {
"op": "d", // <1>
"ts_ms": 1465495462115, // <2>
"ts_us": 1465495462115748, // <2>
"ts_ns": 1465495462115748263, // <2>
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",// <3>
"source": { // <4>
"version": "{debezium-version}",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
----
.Descriptions of _delete_ event value fields
[cols="1,2,7",options="header",subs="+attributes"]
|===
|Item |Field name |Description
|1
|`op`
a|Mandatory string that describes the type of operation. The `op` field value is `d`, signifying that this document was deleted.
|2
|`ts_ms`, `ts_us`. `ts_ns`
a|Optional field that displays the time at which the connector processed the event.
The time is based on the system clock in the JVM running the Kafka Connect task. +
+
In the `source` object, `ts_ms` indicates the time that the change was made in the database. By comparing the value for `payload.source.ts_ms` with the value for `payload.ts_ms`, you can determine the lag between the source database update and {prodname}.
|3
|`before`
|Contains the JSON string representation of the actual MongoDB document before change.
+
An _update_ event value does not contain an `before` field if the capture mode is not set to one of the `*_with_preimage` options.
|4
|`source`
a|Mandatory field that describes the source metadata for the event. This field contains the same information as a _create_ or _update_ event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
* {prodname} version.
* Name of the connector that generated the event.
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contained the deleted document.
* If the event was part of a snapshot.
* Timestamp for when the change was made in the database and ordinal of the event within the timestamp.
* Unique identifier of the MongoDB operation (the `h` field in the oplog event).
* Unique identifiers of the MongoDB session `lsid` and transaction number `txnNumber` in case the change was executed inside a transaction (change streams capture mode only).
|===
MongoDB connector events are designed to work with link:{link-kafka-docs}/#compaction[Kafka log compaction]. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.
[id="mongodb-tombstone-events"]
=== Tombstone events
All MongoDB connector events for a uniquely identified document have exactly the same key. When a document is deleted, the _delete_ event value still works with log compaction because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that key, the message value must be `null`. To make this possible, after {prodname}s MongoDB connector emits a _delete_ event, the connector emits a special tombstone event that has the same key but a `null` value. A tombstone event informs Kafka that all messages with that same key can be removed.
// Type: assembly
// ModuleID: setting-up-mongodb-to-work-with-debezium
// Title: Setting up MongoDB to work with a {prodname} connector
[[setting-up-mongodb]]
== Setting up MongoDB
The MongoDB connector uses MongoDB's change streams to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set.
See the MongoDB documentation for setting up a https://docs.mongodb.com/manual/replication/[replica set] or https://docs.mongodb.com/manual/sharding/[sharded cluster].
Also, be sure to understand how to enable https://docs.mongodb.com/manual/tutorial/deploy-replica-set-with-keyfile-access-control/#deploy-repl-set-with-auth[access control and authentication] with replica sets.
You must also have a MongoDB user that has the appropriate roles to read the `admin` database where the oplog can be read. Additionally, the user must also be able to read the `config` database in the configuration server of a sharded cluster and must have `listDatabases` privilege action.
When change streams are used (the default) the user also must have cluster-wide privilege actions `find` and `changeStream`.
When you intend to utilize pre-image and populate the `before` field, you need to first enable `changeStreamPreAndPostImages` for a collection using `db.createCollection()`, `create`, or `collMod`.
ifdef::community[]
[[mongodb-in-the-cloud]]
=== MongoDB in the Cloud
You can use the {prodname} connector for MongoDB with https://www.mongodb.com/atlas/database[MongoDB Atlas].
Note that MongoDB Atlas only supports secure connections via SSL, i.e. the xref:mongodb-property-mongodb-ssl-enabled[`+mongodb.ssl.enabled`] connector option _must_ be set to `true`.
endif::community[]
[[mongodb-optimal-oplog-config]]
=== Optimal Oplog Config
The {prodname} MongoDB connector reads https://www.mongodb.com/docs/manual/changeStreams/[change streams] to obtain oplog data for a replica set.
Because the oplog is a fixed-sized, capped collection, if it exceeds its maximum configured size, it begins to overwrite its oldest entries.
If the connector is stopped for any reason, when it restarts, it attempts to resume streaming from the last oplog stream position.
However, if last stream position was removed from the oplog, depending on the value specified in the connector's xref:mongodb-property-snapshot-mode[`snapshot.mode`] property, the connector might fail to start, reporting an xref:connector-error-invalid-resume-token[invalid resume token error].
In the event of a failure, you must create a new connector to enable {prodname} to continue capturing records from the database.
For more information, see xref:debezium-mongodb-connector-is-stopped-for-a-long-interval[Connector fails after it is stopped for a long interval if snapshot.mode is set to initial].
To ensure that the oplog retains the offset values that {prodname} requires to resume streaming, you can use either of the following approaches:
* https://www.mongodb.com/docs/manual/core/replica-set-oplog/[Increase the size of the oplog].
Based on your typical workloads, set the oplog size to a value that is greater than the peak number of oplog entries per hour.
* https://www.mongodb.com/docs/manual/core/replica-set-oplog/#std-label-replica-set-minimum-oplog-size/[Increase the minimum number of hours that an oplog entry is retained] (MongoDB 4.4 and greater).
This setting is time-based, such that entries in the last _n_ hours are guaranteed to be available even if the oplog reaches its maximum configured size.
Although this is generally the preferred option, for clusters with high workloads that are nearing capacity, specify the maximum oplog size.
To help prevent failures that are related to missing oplog entries, it's important to track metrics that report replication behavior, and to optimize the oplog size to support {prodname}.
In particular, you should monitor the values of Oplog GB/Hour and Replication Oplog Window.
If {prodname} is offline for an interval that exceeds the value of the replication oplog window, and the primary oplog grows faster than {prodname} can consume entries, a connector failure can result.
For information about how to monitor these metrics, see https://www.mongodb.com/basics/how-to-monitor-mongodb-and-what-metrics-to-monitor#mongodb-replication-metrics[the MongoDB documentation].
It's best to set the maximum oplog size to a value that is based on the anticipated hourly growth of the oplog (https://www.mongodb.com/basics/how-to-monitor-mongodb-and-what-metrics-to-monitor#oplog-gbhour[Oplog GB/Hour]), multiplied by the time that might be required to address a {prodname} failure.
That is,
`_Oplog GB/Hour_` X `_average reaction time to {prodname} failure_`
For example, if the oplog size limit is set to 1GB, and the oplog grows by 3GB per hour, oplog entries are cleared three times per hour.
If {prodname} were to fail during this time, its last oplog position is likely to be removed.
If the oplog grows at the rate of 3GB/hour, and {prodname} is offline for two hours, you would thus set the oplog size to 3GB/hour X 2 hours, or 6GB.
// Type: assembly
// ModuleID: deployment-of-debezium-mongodb-connectors
// Title: Deployment of {prodname} MongoDB connectors
[[mongodb-deploying-a-connector]]
== Deployment
ifdef::community[]
To deploy a {prodname} MongoDB connector, you install the {prodname} MongoDB connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
.Prerequisites
* link:https://zookeeper.apache.org/[Apache Zookeeper], link:http://kafka.apache.org/[Apache Kafka], and link:{link-kafka-docs}.html#connect[Kafka Connect] are installed.
* MongoDB is installed and is xref:setting-up-mongodb[set up to work with the {prodname} connector].
.Procedure
. Download the
ifeval::['{page-version}' == 'master']
{link-mongodb-plugin-snapshot}[connector's plug-in archive],
endif::[]
ifeval::['{page-version}' != 'master']
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mongodb/{debezium-version}/debezium-connector-mongodb-{debezium-version}-plugin.tar.gz[connector's plug-in archive],
endif::[]
. Extract the JAR files into your Kafka Connect environment.
. Add the directory with the JAR files to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`].
. Restart your Kafka Connect process to pick up the new JAR files.
If you are working with immutable containers, see link:https://quay.io/organization/debezium[{prodname}'s Container images] for Apache Zookeeper, Apache Kafka, and Kafka Connect with the MongoDB connector already installed and ready to run.
You can also xref:operations/openshift.adoc[run {prodname} on Kubernetes and OpenShift].
The {prodname} xref:tutorial.adoc[tutorial] walks you through using these images, and this is a great way to learn about {prodname}.
endif::community[]
ifdef::product[]
You can use either of the following methods to deploy a {prodname} MongoDB connector:
* xref:openshift-streams-mongodb-connector-deployment[Use {StreamsName} to automatically create an image that includes the connector plug-in].
+
This is the preferred method.
* xref:deploying-debezium-mongodb-connectors[Build a custom Kafka Connect container image from a Dockerfile].
.Additional resources
* xref:mongodb-connector-properties[]
// Type: concept
[id="openshift-streams-mongodb-connector-deployment"]
=== MongoDB connector deployment using {StreamsName}
include::{partialsdir}/modules/all-connectors/con-connector-streams-deployment.adoc[leveloffset=+1]
// Type: procedure
[id="using-streams-to-deploy-debezium-mongodb-connectors"]
=== Using {StreamsName} to deploy a {prodname} MongoDB connector
include::{partialsdir}/modules/all-connectors/proc-using-streams-to-deploy-a-debezium-mongodb-connector.adoc[leveloffset=+1]
// Type: procedure
[id="deploying-debezium-mongodb-connectors"]
=== Deploying a {prodname} MongoDB connector by building a custom Kafka Connect container image from a Dockerfile
To deploy a {prodname} MongoDB connector, you must build a custom Kafka Connect container image that contains the {prodname} connector archive and then push this container image to a container registry.
You then create two custom resources (CRs):
* A `KafkaConnect` CR that defines your Kafka Connect instance.
The `image` property in the CR specifies the name of the container image that you create to run your {prodname} connector.
You apply this CR to the OpenShift instance where link:https://access.redhat.com/products/red-hat-amq#streams[Red Hat {StreamsName}] is deployed.
{StreamsName} offers operators and images that bring Apache Kafka to OpenShift.
* A `KafkaConnector` CR that defines your {prodname} MongoDB connector.
Apply this CR to the same OpenShift instance where you apply the `KafkaConnect` CR.
.Prerequisites
* MongoDB is running and you completed the steps to xref:setting-up-mongodb-to-work-with-debezium[set up MongoDB to work with a {prodname} connector].
* {StreamsName} is deployed on OpenShift and is running Apache Kafka and Kafka Connect.
For more information, see link:{LinkDeployManageStreamsOpenShift}[{NameDeployManageStreamsOpenShift}].
* Podman or Docker is installed.
* You have an account and permissions to create and manage containers in the container registry (such as `quay.io` or `docker.io`) to which you plan to add the container that will run your Debezium connector.
.Procedure
. Create the {prodname} MongoDB container for Kafka Connect:
.. Create a Dockerfile that uses `{DockerKafkaConnect}` as the base image.
For example, from a terminal window, enter the following command:
+
=====================================================================
[source,shell,subs="+attributes,+quotes"]
----
cat <<EOF >debezium-container-for-{context}.yaml // <1>
FROM {DockerKafkaConnect}
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium // <2>
RUN cd /opt/kafka/plugins/debezium/ \
&& curl -O {red-hat-maven-repository}debezium/debezium-connector-{connector-file}/{debezium-version}-redhat-{debezium-build-number}/debezium-connector-{connector-file}-{debezium-version}-redhat-{debezium-build-number}-plugin.zip \
&& unzip debezium-connector-{connector-file}-{debezium-version}-redhat-{debezium-build-number}-plugin.zip \
&& rm debezium-connector-{connector-file}-{debezium-version}-redhat-{debezium-build-number}-plugin.zip
RUN cd /opt/kafka/plugins/debezium/
USER 1001
EOF
----
=====================================================================
+
[cols="1,7",options="header"]
|===
|Item |Description
|1
|You can specify any file name that you want.
|2
|Specifies the path to your Kafka Connect plug-ins directory.
If your Kafka Connect plug-ins directory is in a different location, replace this path with the actual path of your directory.
|===
+
The command creates a Dockerfile with the name `debezium-container-for-mongodb.yaml` in the current directory.
.. Build the container image from the `debezium-container-for-mongodb.yaml` Docker file that you created in the previous step.
From the directory that contains the file, open a terminal window and enter one of the following commands:
+
[source,shell,options="nowrap"]
----
podman build -t debezium-container-for-mongodb:latest .
----
+
[source,shell,options="nowrap"]
----
docker build -t debezium-container-for-mongodb:latest .
----
The preceding commands build a container image with the name `debezium-container-for-mongodb`.
.. Push your custom image to a container registry, such as `quay.io` or an internal container registry.
The container registry must be available to the OpenShift instance where you want to deploy the image.
Enter one of the following commands:
+
[source,shell,subs="+quotes"]
----
podman push _<myregistry.io>_/debezium-container-for-mongodb:latest
----
+
[source,shell,subs="+quotes"]
----
docker push _<myregistry.io>_/debezium-container-for-mongodb:latest
----
.. Create a new {prodname} MongoDB `KafkaConnect` custom resource (CR).
For example, create a `KafkaConnect` CR with the name `dbz-connect.yaml` that specifies `annotations` and `image` properties.
The following example shows an excerpt from a `dbz-connect.yaml` file that describes a `KafkaConnect` custom resource. +
+
=====================================================================
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectApiVersion}
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true" // <1>
spec:
#...
image: debezium-container-for-mongodb // <2>
...
----
=====================================================================
+
[cols="1,7",options="header"]
|===
|Item |Description
|1
|`metadata.annotations` indicates to the Cluster Operator that `KafkaConnector` resources are used to configure connectors in this Kafka Connect cluster.
|2
|`spec.image` specifies the name of the image that you created to run your Debezium connector.
This property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator.
|===
.. Apply the `KafkaConnect` CR to the OpenShift Kafka Connect environment by entering the following command:
+
[source,shell,options="nowrap"]
----
oc create -f dbz-connect.yaml
----
+
The command adds a Kafka Connect instance that specifies the name of the image that you created to run your {prodname} connector.
. Create a `KafkaConnector` custom resource that configures your {prodname} MongoDB connector instance.
+
You configure a {prodname} MongoDB connector in a `.yaml` file that specifies the configuration properties for the connector.
The connector configuration might instruct {prodname} to produce change events for a subset of MongoDB replica sets or sharded clusters.
Optionally, you can set properties that filter out collections that are not needed.
+
The following example configures a {prodname} connector that connects to a MongoDB replica set `rs0` at port `27017` on `192.168.99.100`,
and captures changes that occur in the `inventory` collection.
`inventory-connector-{context}` is the logical name of the replica set. +
+
=====================================================================
.MongoDB `inventory-connector.yaml`
[source,yaml,options="nowrap",subs="+attributes"]
----
apiVersion: {KafkaConnectApiVersion}
kind: KafkaConnector
metadata:
name: inventory-connector-{context} // <1>
labels: strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mongodb.MongoDbConnector // <2>
config:
mongodb.connection.string: mongodb://192.168.99.100:27017/?replicaSet=rs0 // <3>
topic.prefix: inventory-connector-{context} // <4>
collection.include.list: inventory[.]* // <5>
----
=====================================================================
+
.Descriptions of settings in the MongoDB `inventory-connector.yaml` example
[cols="1,7",options="header",subs="+attributes"]
|===
|Item |Description
|1
|The name that is used to register the connector with Kafka Connect.
|2
|The name of the MongoDB connector class.
|3
|The host addresses to use to connect to the MongoDB replica set.
|4
|The _logical name_ of the MongoDB replica set.
The logical name forms a namespace for generated events, and is used in the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
|5
|An optional list of regular expressions that match the collection namespaces (for example, __<dbName>.<collectionName>__) of all collections to be monitored.
|===
. Create your connector instance with Kafka Connect.
For example, if you saved your `KafkaConnector` resource in the `inventory-connector.yaml` file, you would run the following command:
+
[source,shell,options="nowrap"]
----
oc apply -f inventory-connector.yaml
----
+
The preceding command registers `inventory-connector` and the connector starts to run against the `inventory` collection as defined in the `KafkaConnector` CR.
endif::product[]
ifdef::community[]
[[mongodb-example-configuration]]
=== MongoDB connector configuration example
Following is an example of the configuration for a connector instance that captures data from a MongoDB replica set `rs0` at port 27017 on 192.168.99.100, which we logically name `fullfillment`.
Typically, you configure the {prodname} MongoDB connector in a JSON file by setting the configuration properties that are available for the connector.
You can choose to produce events for a particular MongoDB replica set or sharded cluster.
Optionally, you can filter out collections that are not needed.
[source,json]
----
{
"name": "inventory-connector", // <1>
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", // <2>
"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", // <3>
"topic.prefix": "fullfillment", // <4>
"collection.include.list": "inventory[.]*" // <5>
}
}
----
<1> The name of our connector when we register it with a Kafka Connect service.
<2> The name of the MongoDB connector class.
<3> The connection string to use to connect to the MongoDB replica set.
<4> The _logical name_ of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
<5> A list of regular expressions that match the collection namespaces (for example, <dbName>.<collectionName>) of all collections to be monitored. This is optional.
endif::community[]
For the complete list of the configuration properties that you can set for the {prodname} MongoDB connector,
see xref:mongodb-connector-properties[MongoDB connector configuration properties].
ifdef::community[]
You can send this configuration with a `POST` command to a running Kafka Connect service.
The service records the configuration and starts one connector task that performs the following actions:
* Connects to the MongoDB replica set or sharded cluster.
* Assigns tasks for each replica set.
* Performs a snapshot, if necessary.
* Reads the change stream.
* Streams change event records to Kafka topics.
[[mongodb-adding-connector-configuration]]
=== Adding connector configuration
To start running a {prodname} MongoDB connector, create a connector configuration, and add the configuration to your Kafka Connect cluster.
.Prerequisites
* xref:setting-up-mongodb[MongoDB is set up to work with a {prodname} connector].
* The {prodname} MongoDB connector is installed.
.Procedure
. Create a configuration for the MongoDB connector.
. Use the link:{link-kafka-docs}/#connect_rest[Kafka Connect REST API] to add that connector configuration to your Kafka Connect cluster.
endif::community[]
.Results
After the connector starts, it completes the following actions:
* xref:mongodb-performing-a-snapshot[Performs a consistent snapshot] of the collections in your MongoDB replica sets.
* Reads the change streams for the replica sets.
* Produces change events for every inserted, updated, and deleted document.
* Streams change event records to Kafka topics.
ifdef::product[]
// Type: procedure
[id="verifying-that-the-debezium-mongodb-connector-is-running"]
=== Verifying that the {prodname} MongoDB connector is running
include::{partialsdir}/modules/all-connectors/proc-verifying-the-connector-deployment.adoc[leveloffset=+1]
endif::product[]
// Type: reference
// Title: Descriptions of {prodname} MongoDB connector configuration properties
[[mongodb-connector-properties]]
=== Connector properties
The {prodname} MongoDB connector has numerous configuration properties that you can use to achieve the right connector behavior for your application.
Many properties have default values. Information about the properties is organized as follows:
* xref:debezium-mongodb-connector-required-configuration-properties[Required {prodname} MongoDB connector configuration properties]
* xref:debezium-mongodb-connector-advanced-configuration-properties[Advanced {prodname} MongoDB connector configuration properties]
The following configuration properties are _required_ unless a default value is available.
[id="debezium-mongodb-connector-required-configuration-properties"]
.Required {prodname} MongoDB connector configuration properties
[cols="30%a,25%a,45%a",options="header"]
|===
|Property |Default |Description
|[[mongodb-property-allow-offset-invalidation]]<<mongodb-property-allow-offset-invalidation, `+internal.mongodb.allow.offset.invalidation+`>>
|false
|Set this property to `true` to enable the connector to invalidate and xref:mongodb-offset-consolidation[consolidate] shard-specific offsets that were recorded by earlier connector versions.
[WARNING]
====
This property permits you to modify the current default behavior.
The property is subject to removal in a future release if the default behavior changes to permit the connector to automatically invalidate and consolidate offsets that are recorded by earlier connector versions.
====
|[[mongodb-property-name]]<<mongodb-property-name, `+name+`>>
|No default
|Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
|[[mongodb-property-connector-class]]<<mongodb-property-connector-class, `+connector.class+`>>
|No default
|The name of the Java class for the connector. Always use a value of `io.debezium.connector.mongodb.MongoDbConnector` for the MongoDB connector.
|[[mongodb-property-mongodb-connection-string]]<<mongodb-property-mongodb-connection-string, `+mongodb.connection.string+`>>
|No default
|Specifies a https://www.mongodb.com/docs/manual/reference/connection-string/[connection string] that the connector uses to connect to a MongoDB replica set.
This property replaces the `mongodb.hosts` property that was available in previous versions of the MongoDB connector. +
|[[mongodb-property-topic-prefix]]<<mongodb-property-topic-prefix, `+topic.prefix+`>>
|No default
|A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors.
Each server should be monitored by at most one {prodname} connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
Use only alphanumeric characters, hyphens, dots and underscores to form the name.
The logical name should be unique across all other connectors, because the name is used as the prefix in naming the Kafka topics that receive records from this connector. +
+
[WARNING]
====
Do not change the value of this property.
If you change the name value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value.
====
|[[mongodb-property-mongodb-authentication-class]]<<mongodb-property-mongodb-authentication-class, `+mongodb.authentication.class+`>>
|_DefaultMongoDbAuthProvider_
|A full Java class name that is an implementation of the io.debezium.connector.mongodb.connection.MongoDbAuthProvider interface.
This class handles setting the credentials on the MongoDB connection (called on each app boot).
Default behavior uses the xref:mongodb-property-mongodb-user[`mongodb.user`], xref:mongodb-property-mongodb-password[`mongodb.password`], and xref:mongodb-property-mongodb-authsource[`mongodb.authsource`] properties according to each of their documentation,
but other implementations may use them differently or ignore them altogether.
Note that any setting in xref:mongodb-property-mongodb-connection-string[`mongodb.connection.string`] will override settings set by this class
|[[mongodb-property-mongodb-user]]<<mongodb-property-mongodb-user, `+mongodb.user+`>>
|No default
|When using default xref:mongodb-property-mongodb-authentication-class[`mongodb.authentication.class`]:
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
|[[mongodb-property-mongodb-password]]<<mongodb-property-mongodb-password, `+mongodb.password+`>>
|No default
|When using default xref:mongodb-property-mongodb-authentication-class[`mongodb.authentication.class`]:
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
|[[mongodb-property-mongodb-authsource]]<<mongodb-property-mongodb-authsource, `+mongodb.authsource+`>>
|`admin`
|When using default xref:mongodb-property-mongodb-authentication-class[`mongodb.authentication.class`]:
Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than `admin`.
|[[mongodb-property-mongodb-ssl-enabled]]<<mongodb-property-mongodb-ssl-enabled, `+mongodb.ssl.enabled+`>>
|`false`
|Connector will use SSL to connect to MongoDB instances.
|[[mongodb-property-mongodb-ssl-invalid-hostname-allowed]]<<mongodb-property-mongodb-ssl-invalid-hostname-allowed, `+mongodb.ssl.invalid.hostname.allowed+`>>
|`false`
|When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If `true` the connection will not prevent man-in-the-middle attacks.
|[[mongodb-property-filters-match-mode]]<<mongodb-property-filters-match-mode, `+filters.match.mode+`>>
|regex
|The mode used to match events based on included/excluded database and collection names.
Set the property to one of the following values:
`regex`::
Database and collection includes/excludes are evaluated as comma-separated list of regular expressions.
`literal`::
Database and collection includes/excludes are evaluated as comma-separated list of string literals. Whitespace characters surrounding these literals are stripped.
|[[mongodb-property-database-include-list]]<<mongodb-property-database-include-list, `+database.include.list+`>>
|_empty string_
|An optional comma-separated list of regular expressions or literals that match database names to be monitored.
By default, all databases are monitored. +
When `database.include.list` is set, the connector monitors only the databases that the property specifies.
Other databases are excluded from monitoring.
To match the name of a database, {prodname} performs one of the following actions based on the value of xref:mongodb-property-filters-match-mode[`filters.match.mode`] property
- applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the database; it does not match substrings that might be present in a database name.
- compares the literals that you specify with the entire name string of the database +
If you include this property in the configuration, do not also set the `database.exclude.list` property.
|[[mongodb-property-database-exclude-list]]<<mongodb-property-database-exclude-list, `+database.exclude.list+`>>
|_empty string_
|An optional comma-separated list of regular expressions or literals that match database names to be excluded from monitoring.
When `database.exclude.list` is set, the connector monitors every database except the ones that the property specifies.
To match the name of a database, {prodname} performs one of the following actions based on the value of xref:mongodb-property-filters-match-mode[`filters.match.mode`] property
- applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the database; it does not match substrings that might be present in a database name.
- compares the literals that you specify with the entire name string of the database +
If you include this property in the configuration, do not set the `database.include.list` property.
|[[mongodb-property-collection-include-list]]<<mongodb-property-collection-include-list, `+collection.include.list+`>>
|_empty string_
|An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be monitored.
By default, the connector monitors all collections except those in the `local` and `admin` databases.
When `collection.include.list` is set, the connector monitors only the collections that the property specifies.
Other collections are excluded from monitoring.
Collection identifiers are of the form _databaseName_._collectionName_.
To match the name of a namespace, {prodname} performs one of the following actions based on the value of xref:mongodb-property-filters-match-mode[`filters.match.mode`] property
- applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the namespace; it does not match substrings in the name.
- compares the literals that you specify with the entire name string of the namespace +
If you include this property in the configuration, do not also set the `collection.exclude.list` property.
|[[mongodb-property-collection-exclude-list]]<<mongodb-property-collection-exclude-list, `+collection.exclude.list+`>>
|_empty string_
|An optional comma-separated list of regular expressions or literals that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring.
When `collection.exclude.list` is set, the connector monitors every collection except the ones that the property specifies.
Collection identifiers are of the form _databaseName_._collectionName_. +
To match the name of a namespace, {prodname} performs one of the following actions based on the value of xref:mongodb-property-filters-match-mode[`filters.match.mode`] property
- applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the namespace; it does not match substrings that might be present in a database name.
- compares the literals that you specify with the entire name string of the namespace +
If you include this property in the configuration, do not set the `collection.include.list` property.
|[[mongodb-property-capture-mode]]<<mongodb-property-capture-mode, `+capture.mode+`>>
|`change_streams_update_full`
|Specifies the method that the connector uses to capture `update` event changes from a MongoDB server.
Set this property to one of the following values:
`change_streams`:: `update` event messages do not include the full document.
Messages do not include a field that represents the state of the document `before` the change.
`change_streams_update_full`:: `update` event messages include the full document.
Messages do not include a `before` field that represents the state of the document before the update.
The event message returns the full state of the document in the `after` field.
Set xref:mongodb-property-capture-mode-full-update-type[capture.mode.full.update.type] to specify how the connector fetches full documents from the database.
+
[NOTE]
====
In some situations, when `capture.mode` is configured to return full documents, the `updateDescription` and `after` fields of the update event message might report inconsistent values.
Such discrepancies can result after multiple updates are applied to a document in rapid succession.
The connector requests the full document from the MongoDB database only after it receives the update described in the event's `updateDescription` field.
If a later update modifies the source document before the connector can retrieve it from the database, the connector receives the document that is modified by this later update.
====
`change_streams_update_full_with_pre_image`::
`update` event event messages include the full document, and include a field that represents the state of the document `before` the change.
Set xref:mongodb-property-capture-mode-full-update-type[capture.mode.full.update.type] to specify how the connector fetches full documents from the database.
`change_streams_with_pre_image`::
`update` events do not include the full document, but include a field that represents the state of the document `before` the change.
|[[mongodb-property-capture-scope]]<<mongodb-property-capture-scope, `+capture.scope+`>>
|`deployment`
|Specifies the https://www.mongodb.com/docs/manual/changeStreams/#watch-a-collection--database--or-deployment[scope of the change streams] that the connector opens.
Set this property to one of the following values:
`deployment`:: Opens a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases, except for `admin`, `local`, and `config`.
`database`:: Opens a change stream cursor for a single database to watch for changes to all of its non-system collections.
+
[WARNING]
====
To support {link-prefix}:{link-signalling}#[{prodname} signaling], if you set `capture.scope` to `database`, the xref:mongodb-property-signal-data-collection[signaling data collection] must reside in a database that is specified by the xref:mongodb-property-capture-target[`capture.target`] property.
====
|[[mongodb-property-capture-target]]<<mongodb-property-capture-target, `+capture.target+`>>
|
| Specifies the database that the connector monitors for changes.
This property applies only if the xref:mongodb-property-capture-scope[`capture.scope`] is set to `database`.
|[[mongodb-property-field-exclude-list]]<<mongodb-property-field-exclude-list, `+field.exclude.list+`>>
|_empty string_
|An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values.
Fully-qualified names for fields are of the form _databaseName_._collectionName_._fieldName_._nestedFieldName_, where _databaseName_ and _collectionName_ may contain the wildcard (*) which matches any characters.
|[[mongodb-property-field-renames]]<<mongodb-property-field-renames, `+field.renames+`>>
|_empty string_
|An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form _databaseName_._collectionName_._fieldName_._nestedFieldName_:__newNestedFieldName__, where _databaseName_ and _collectionName_ may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.
|[[mongodb-property-tombstones-on-delete]]<<mongodb-property-tombstones-on-delete, `+tombstones.on.delete+`>>
|`true`
|Controls whether a _delete_ event is followed by a tombstone event. +
+
`true` - a delete operation is represented by a _delete_ event and a subsequent tombstone event. +
+
`false` - only a _delete_ event is emitted. +
+
After a source record is deleted, emitting a tombstone event (the default behavior) allows Kafka to completely delete all events that pertain to the key of the deleted row in case {link-kafka-docs}/#compaction[log compaction] is enabled for the topic.
|[[mongodb-property-schema-name-adjustment-mode]]<<mongodb-property-schema-name-adjustment-mode,`+schema.name.adjustment.mode+`>>
|none
|Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
|[[mongodb-property-field-name-adjustment-mode]]<<mongodb-property-field-name-adjustment-mode,`+field.name.adjustment.mode+`>>
|none
|Specifies how field names should be adjusted for compatibility with the message converter used by the connector. Possible settings: +
* `none` does not apply any adjustment. +
* `avro` replaces the characters that cannot be used in the Avro type name with underscore. +
* `avro_unicode` replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java +
See {link-prefix}:{link-avro-serialization}#avro-naming[Avro naming] for more details.
|===
The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration.
// Title: Advanced {prodname} MongoDB connector configuration properties
[id="debezium-mongodb-connector-advanced-configuration-properties"]
.{prodname} MongoDB connector advanced configuration properties
[cols="30%a,25%a,45%a",options="header"]
|===
|Property
|Default
|Description
|[[mongodb-property-capture-mode-full-update-type]]<<mongodb-property-capture-mode-full-update-type, `+capture.mode.full.update.type+`>>
|`lookup`
|Specifies how the connector looks up the full value of an updated document when the xref:mongodb-property-capture-mode[`capture.mode`] is set retrieve full documents.
The connector retrieves full documents when its `capture.mode` is set to one of the following options:
* `change_streams_update_full`
* `change_streams_update_full_with_pre-image`
To use this option with a MongoDB change streams collection, you must configure the collection to https://www.mongodb.com/docs/manual/changeStreams/#change-streams-with-document-pre--and-post-images[return document pre- and post-images].
Pre- and post-images for an operation are available only if the required configuration is in place before the operation occurs.
Set this property to one of the following values:
`lookup`:: The connector uses a separate lookup to fetch the updated full MongoDB document.
[WARNING]
====
If the lookup process fails to retrieve a document, it cannot populate the full document to the `after` state in the event payload.
In such a situation, the connector emits an event message that contains a `null` value in the `after` field.
Failed lookups can occur because a delete operation removed the document immediately after it was created, or because a change to the sharding key results in the document being moved to a different location.
Sharding key changes can result when you modify any of the properties that make up the key.
====
`post_image`:: The connector uses MongoDB post images to populate events with the full MongoDB document.
The database must be running MongoDB 6.0 or later to use this option.
|[[mongodb-property-max-batch-size]]<<mongodb-property-max-batch-size, `+max.batch.size+`>>
|`2048`
|Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
|[[mongodb-property-max-queue-size]]<<mongodb-property-max-queue-size, `+max.queue.size+`>>
|`8192`
|Positive integer value that specifies the maximum number of records that the blocking queue can hold.
When {prodname} reads events streamed from the database, it places the events in the blocking queue before it writes them to Kafka.
The blocking queue can provide backpressure for reading change events from the database
in cases where the connector ingests messages faster than it can write them to Kafka, or when Kafka becomes unavailable.
Events that are held in the queue are disregarded when the connector periodically records offsets.
Always set the value of `max.queue.size` to be larger than the value of xref:{context}-property-max-batch-size[`max.batch.size`].
|[[mongodb-property-max-queue-size-in-bytes]]<<mongodb-property-max-queue-size-in-bytes, `+max.queue.size.in.bytes+`>>
|`0`
|A long integer value that specifies the maximum volume of the blocking queue in bytes.
By default, volume limits are not specified for the blocking queue.
To specify the number of bytes that the queue can consume, set this property to a positive long value. +
If xref:mongodb-property-max-queue-size[`max.queue.size`] is also set, writing to the queue is blocked when the size of the queue reaches the limit specified by either property.
For example, if you set `max.queue.size=1000`, and `max.queue.size.in.bytes=5000`, writing to the queue is blocked after the queue contains 1000 records, or after the volume of the records in the queue reaches 5000 bytes.
|[[mongodb-property-poll-interval-ms]]<<mongodb-property-poll-interval-ms, `+poll.interval.ms+`>>
|`1000`
|Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 500 milliseconds, or 0.5 second.
|[[mongodb-property-connect-backoff-initial-delay-ms]]<<mongodb-property-connect-backoff-initial-delay-ms, `+connect.backoff.initial.delay.ms+`>>
|`1000`
|Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).
|[[mongodb-property-connect-backoff-max-delay-ms]]<<mongodb-property-connect-backoff-max-delay-ms, `+connect.backoff.max.delay.ms+`>>
|`1000`
|Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).
|[[mongodb-property-connect-max-attempts]]<<mongodb-property-connect-max-attempts, `+connect.max.attempts+`>>
|`16`
|Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for `connect.backoff.initial.delay.ms` and `connect.backoff.max.delay.ms` results in just over 20 minutes of attempts before failing.
ifdef::community[]
|[[mongodb-property-source-struct-version]]<<mongodb-property-source-struct-version, `+source.struct.version+`>>
|v2
|Schema version for the `source` block in CDC events. {prodname} 0.10 introduced a few breaking +
changes to the structure of the `source` block in order to unify the exposed structure across
all the connectors. +
By setting this option to `v1` the structure used in earlier versions can be produced.
Note that this setting is not recommended and is planned for removal in a future {prodname} version.
endif::community[]
|[[mongodb-property-heartbeat-interval-ms]]<<mongodb-property-heartbeat-interval-ms, `+heartbeat.interval.ms+`>>
|`0`
|Controls how frequently heartbeat messages are sent. +
This property contains an interval in milliseconds that defines how frequently the connector sends messages into a heartbeat topic.
This can be used to monitor whether the connector is still receiving change events from the database.
You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time.
In such situation the connector would proceed to read the oplog/change stream from the database but never emit any change messages into Kafka,
which in turn means that no offset updates are committed to Kafka.
This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to `0` to not send heartbeat messages at all. +
Disabled by default.
|[[mongodb-property-skipped-operations]]<<mongodb-property-skipped-operations, `+skipped.operations+`>>
|`t`
|A comma-separated list of operation types that will be skipped during streaming.
The operations include: `c` for inserts/create, `u` for updates/replace, `d` for deletes, `t` for truncates, and `none` to not skip any aforementioned operations.
By default, for consistency with other Debezium connectors, truncate operations are skipped (not emitted by this connector). However, since MongoDB https://www.mongodb.com/docs/manual/reference/change-events/#operation-types[does not support] truncate change events, this is effectively the same as specifying `none`.
|[[mongodb-property-snapshot-collection-filter-overrides]]<<mongodb-property-snapshot-collection-filter-overrides, `+snapshot.collection.filter.overrides+`>>
|No default
| Controls which collection items are included in snapshot. This property affects snapshots only. Specify a comma-separated list of collection names in the form _databaseName.collectionName_.
For each collection that you specify, also specify another configuration property: `snapshot.collection.filter.overrides._databaseName_._collectionName_`. For example, the name of the other configuration property might be: `snapshot.collection.filter.overrides.customers.orders`. Set this property to a valid filter expression that retrieves only the items that you want in the snapshot. When the connector performs a snapshot, it retrieves only the items that matches the filter expression.
|[[mongodb-property-snapshot-delay-ms]]<<mongodb-property-snapshot-delay-ms, `+snapshot.delay.ms+`>>
|No default
|An interval in milliseconds that the connector should wait before taking a snapshot after starting up; +
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.
|[[mongodb-property-snapshot-fetch-size]]<<mongodb-property-snapshot-fetch-size, `+snapshot.fetch.size+`>>
|`0`
|Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot.
The connector will read the collection contents in multiple batches of this size. +
Defaults to 0, which indicates that the server chooses an appropriate fetch size.
|[[mongodb-property-snapshot-include-collection-list]]<<mongodb-property-snapshot-include-collection-list, `+snapshot.include.collection.list+`>>
| All collections specified in `collection.include.list`
|An optional, comma-separated list of regular expressions that match the fully-qualified names (`_<databaseName>_._<collectionName>_`) of the schemas that you want to include in a snapshot.
The specified items must be named in the connectors's xref:mongodb-property-collection-include-list[`collection.include.list`] property.
This property takes effect only if the connector's xref:mongodb-property-snapshot-mode[`snapshot.mode`] property is set to a value other than `never`. +
This property does not affect the behavior of incremental snapshots. +
To match the name of a schema, {prodname} applies the regular expression that you specify as an _anchored_ regular expression.
That is, the specified expression is matched against the entire name string of the schema; it does not match substrings that might be present in a schema name.
|[[mongodb-property-snapshot-max-threads]]<<mongodb-property-snapshot-max-threads, `+snapshot.max.threads+`>>
|`1`
|Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.
|[[mongodb-property-snapshot-mode]]<<mongodb-property-snapshot-mode, `+snapshot.mode+`>>
|_initial_
|Specifies the criteria for performing a snapshot when the connector starts.
Set the property to one of the following values:
`always`:: The connector performs a snapshot every time that it starts.
The snapshot includes the structure and data of the captured tables.
Specify this value to populate topics with a complete representation of the data from the captured tables every time that the connector starts.
After the snapshot completes, the connector begins to stream event records for subsequent database changes.
`initial`:: When the connector starts, it performs an initial database snapshot.
//as described in the xref:default-workflow-for-performing-an-initial-snapshot[default workflow for creating an initial snapshot].
After the snapshot completes, the connector begins to stream event records for subsequent database changes.
`initial_only`:: The connector performs a database a snapshot only when no offsets have been recorded for the logical server name.
After the snapshot completes, the connector stops.
It does not transition to streaming event records for subsequent database changes.
`never`:: Deprecated, see `no_data`.
`no_data`:: The connector runs a snapshot that captures the structure of all relevant tables,
//, performing all the steps described in the xref:default-workflow-for-performing-an-initial-snapshot[default snapshot workflow], except that
but it does not create `READ` events to represent the data set at the point of the connector's start-up.
// (Step 7.b).
`when_needed`:: After the connector starts, it performs a snapshot only if it detects one of the following circumstances:
* It cannot detect any topic offsets.
* A previously recorded offset specifies a log position that is not available on the server.
ifdef::community[]
`configuration_based`:: With this option, you control snapshot behavior through a set of connector properties that have the prefix 'snapshot.mode.configuration.based'.
endif::community[]
ifdef::community[]
`custom`:: The `custom` snapshot mode lets you inject your own implementation of the `io.debezium.spi.snapshot.Snapshotter` interface.
Set the `snapshot.mode.custom.name` configuration property to the name provided by the `name()` method of your implementation.
For more information, see xref:connector-custom-snapshot[custom snapshotter SPI].
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-configuration-based-snapshot-data]]<<mongodb-property-configuration-based-snapshot-data, `+snapshot.mode.configuration.based.snapshot.data+`>>
|false
|If the `snapshot.mode` is set to `configuration_based`, set this property to specify whether the connector includes table data when it performs a snapshot.
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-configuration-based-snapshot-schema]]<<mongodb-property-configuration-based-snapshot-schema, `+snapshot.mode.configuration.based.snapshot.schema+`>>
|false
|If the `snapshot.mode` is set to `configuration_based`, set this property to specify whether the connector includes the table schema when it performs a snapshot.
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-configuration-based-start-stream]]<<mongodb-property-configuration-based-start-stream, `+snapshot.mode.configuration.based.start.stream+`>>
|false
|If the `snapshot.mode` is set to `configuration_based`, set this property to specify whether the connector begins to stream change events after a snapshot completes.
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-configuration-based-snapshot-on-schema-error]]<<mongodb-property-configuration-based-snapshot-on-schema-error, `+snapshot.mode.configuration.based.snapshot.on.schema.error+`>>
|false
|If the `snapshot.mode` is set to `configuration_based`, set this property to specify whether the connector includes table schema in a snapshot if the schema history topic is not available.
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-configuration-based-snapshot-on-data-error]]<<mongodb-property-configuration-based-snapshot-on-data-error, `+snapshot.mode.configuration.based.snapshot.on.data.error+`>>
|false
|If the `snapshot.mode` is set to `configuration_based`, this property specifies whether the connector attempts to snapshot table data if it does not find the last committed offset in the transaction log. +
Set the value to `true` to instruct the connector to perform a new snapshot.
endif::community[]
ifdef::community[]
|[[mongodb-property-snapshot-mode-custom-name]]<<mongodb-property-snapshot-mode-custom-name, `+snapshot.mode.custom.name+`>>
|No default
|If `snapshot.mode` is set to `custom`, use this setting to specify the name of the custom implementation that is provided in the `name()` method that is defined in the 'io.debezium.spi.snapshot.Snapshotter' interface.
After a connector restart, {prodname} calls the specified custom implementation to determine whether to perform a snapshot.
For more information, see xref:connector-custom-snapshot[custom snapshotter SPI].
endif::community[]
|[[mongodb-property-provide-transaction-metadata]]<<mongodb-property-provide-transaction-metadata, `+provide.transaction.metadata+`>>
|`false`
|When set to `true` {prodname} generates events with transaction boundaries and enriches data events envelope with transaction metadata.
See xref:mongodb-transaction-metadata[Transaction Metadata] for additional details.
|[[mongodb-property-retriable-restart-connector-wait-ms]]<<mongodb-property-retriable-restart-connector-wait-ms, `+retriable.restart.connector.wait.ms+`>>
|10000 (10 seconds)
|The number of milliseconds to wait before restarting a connector after a retriable error occurs.
|[[mongodb-property-mongodb-poll-interval-ms]]<<mongodb-property-mongodb-poll-interval-ms, `+mongodb.poll.interval.ms+`>>
|`30000`
|The interval in which the connector polls for new, removed, or changed replica sets.
|[[mongodb-property-mongodb-connect-timeout-ms]]<<mongodb-property-mongodb-connect-timeout-ms, `+mongodb.connect.timeout.ms+`>>
|10000 (10 seconds)
|The number of milliseconds the driver will wait before a new connection attempt is aborted.
|[[mongodb-property-mongodb-heartbeat-frequency-ms]]<<mongodb-property-mongodb-heartbeat-frequency-ms, `+mongodb.heartbeat.frequency.ms+`>>
|10000 (10 seconds)
|The frequency that the cluster monitor attempts to reach each server.
|[[mongodb-property-mongodb-socket-timeout-ms]]<<mongodb-property-mongodb-socket-timeout-ms, `+mongodb.socket.timeout.ms+`>>
|0
|The number of milliseconds before a send/receive on the socket can take before a timeout occurs.
A value of `0` disables this behavior.
|[[mongodb-property-mongodb-server-selection-timeout-ms]]<<mongodb-property-mongodb-server-selection-timeout-ms, `+mongodb.server.selection.timeout.ms+`>>
|30000 (30 seconds)
|The number of milliseconds the driver will wait to select a server before it times out and throws an error.
|[[mongodb-property-cursor-pipeline]]<<mongodb-property-cursor-pipeline, `+cursor.pipeline+`>>
|No default
|When streaming changes, this setting applies processing to change stream events as part of the standard MongoDB aggregation stream pipeline. A pipeline is a MongoDB aggregation pipeline composed of instructions to the database to filter or transform data. This can be used customize the data that the connector consumes.
The value of this property must be an array of permitted https://www.mongodb.com/docs/manual/changeStreams/#modify-change-stream-output[aggregation pipeline stages] in JSON format.
Note that this is appended after the internal pipeline used to support the connector (e.g. filtering operation types, database names, collection names, etc.).
|[[mongodb-property-cursor-pipeline-order]]<<mongodb-property-cursor-pipeline-order, `+cursor.pipeline.order+`>>
|internal_first
|The order used to construct the effective MongoDB aggregation stream pipeline.
Set the property to one of the following values:
`internal_first`::
Internal stages defined by the connector are applied first. This means that only the events which ought to be captured by the connector are fed to the user defined stages (configured by setting `cursor.pipeline`).
`user_first`::
Stages defined by the 'cursor.pipeline' property are applied first. In this mode all events, included those not captured by the connector, are fed to user defined pipeline stages. This mode can have negative performance impact if the value of `cursor.pipeline` contains complex operations.
`user_only`::
Stages defined by the 'cursor.pipeline' property will replace internal stages defined by the connector. This mode is **intended only for expert users** since all events are processed only by user defined pipeline stages. **This mode can have negative impact on performance and overall functionality of the connector!**
|[[mongodb-property-cursor-oversize-handling-mode]]<<mongodb-property-cursor-oversize-handling-mode, `+cursor.oversize.handling.mode+`>>
|fail
|The strategy used to handle change events for documents exceeding specified BSON size.
Set the property to one of the following values:
`fail`::
The connector fails if the total size of change event exceed the maximum BSON size.
`skip`::
Any change events for documents exceeding the maximum (specified by the `cursor.oversize.skip.threshold` property) size will be ignored
`split`::
Change events exceeding the maximum BSON size will be split using the https://www.mongodb.com/docs/v6.0/reference/operator/aggregation/changeStreamSplitLargeEvent/[$changeStreamSplitLargeEvent] aggregation. This option requires **MongoDB 6.0.9 or newer**.
|[[mongodb-property-cursor-oversize-skip-threshold]]<<mongodb-property-cursor-oversize-skip-threshold, `+cursor.oversize.skip.threshold+`>>
|0
|The maximum allowed size **in bytes** of the stored document for which change events are processed. This includes both, the size before and after database operation, more specifically this limits the size of fullDocument and fullDocumentBeforeChange filed of MongoDB change events.
|[[mongodb-property-cursor-max-await-time-ms]]<<mongodb-property-cursor-max-await-time-ms, `+cursor.max.await.time.ms+`>>
|`0`
|Specifies the maximum number of milliseconds the oplog/change stream cursor will wait for the server to produce a result before causing an execution timeout exception.
A value of `0` indicates using the server/driver default wait timeout.
|[[mongodb-property-signal-data-collection]]<<mongodb-property-signal-data-collection, `+signal.data.collection+`>>
|No default
| Fully-qualified name of the data collection that is used to send {link-prefix}:{link-signalling}#debezium-signaling-enabling-source-signaling-channel[signals] to the connector.
Use the following format to specify the collection name: +
`_<databaseName>_._<collectionName>_` +
|[[mongodb-property-signal-enabled-channels]]<<mongodb-property-signal-enabled-channels, `+signal.enabled.channels+`>>
|source
| List of the signaling channel names that are enabled for the connector.
By default, the following channels are available:
* `source`
* `kafka`
* `file`
* `jmx`
ifdef::community[]
Optionally, you can also implement a {link-prefix}:{link-signalling}#debezium-signaling-enabling-custom-signaling-channel[custom signaling channel].
endif::community[]
|[[mongodb-property-notification-enabled-channels]]<<mongodb-property-notification-enabled-channels, `+notification.enabled.channels+`>>
|No default
| List of notification channel names that are enabled for the connector.
By default, the following channels are available:
* `sink`
* `log`
* `jmx`
ifdef::community[]
Optionally, you can also implement a {link-prefix}:{link-notification}#debezium-notification-custom-channel[custom notification channel].
endif::community[]
|[[mongodb-property-incremental-snapshot-chunk-size]]<<mongodb-property-incremental-snapshot-chunk-size, `+incremental.snapshot.chunk.size+`>>
|`1024`
|The maximum number of documents that the connector fetches and reads into memory during an incremental snapshot chunk.
Increasing the chunk size provides greater efficiency, because the snapshot runs fewer snapshot queries of a greater size.
However, larger chunk sizes also require more memory to buffer the snapshot data.
Adjust the chunk size to a value that provides the best performance in your environment. +
|[[mongodb-property-incremental-snapshot-watermarking-strategy]]<<mongodb-property-incremental-snapshot-watermarking-strategy, `+incremental.snapshot.watermarking.strategy+`>>
|`insert_insert`
|Specifies the watermarking mechanism that the connector uses during an incremental snapshot to deduplicate events that might be captured by an incremental snapshot and then recaptured after streaming resumes. +
You can specify one of the following options:
`insert_insert`:: When you send a signal to initiate an incremental snapshot, for every chunk that {prodname} reads during the snapshot, it writes an entry to the signaling data collection to record the signal to open the snapshot window.
After the snapshot completes, {prodname} inserts a second entry that records the signal to close the window.
`insert_delete`:: When you send a signal to initiate an incremental snapshot, for every chunk that {prodname} reads, it writes a single entry to the signaling data collection to record the signal to open the snapshot window.
After the snapshot completes, this entry is removed.
No entry is created for the signal to close the snapshot window.
Set this option to prevent rapid growth of the signaling data collection.
|[[mongodb-property-topic-naming-strategy]]<<mongodb-property-topic-naming-strategy, `topic.naming.strategy`>>
|`io.debezium.schema.DefaultTopicNamingStrategy`
|The name of the TopicNamingStrategy class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event etc., defaults to `DefaultTopicNamingStrategy`.
|[[mongodb-property-topic-delimiter]]<<mongodb-property-topic-delimiter, `topic.delimiter`>>
|`.`
|Specify the delimiter for topic name, defaults to `.`.
|[[mongodb-property-topic-cache-size]]<<mongodb-property-topic-cache-size, `topic.cache.size`>>
|`10000`
|The size used for holding the topic names in bounded concurrent hash map. This cache will help to determine the topic name corresponding to a given data collection.
|[[mongodb-property-topic-heartbeat-prefix]]<<mongodb-property-topic-heartbeat-prefix, `+topic.heartbeat.prefix+`>>
|`__debezium-heartbeat`
|Controls the name of the topic to which the connector sends heartbeat messages. The topic name has this pattern: +
+
_topic.heartbeat.prefix_._topic.prefix_ +
+
For example, if the topic prefix is `fulfillment`, the default topic name is `__debezium-heartbeat.fulfillment`.
|[[mongodb-property-topic-transaction]]<<mongodb-property-topic-transaction, `topic.transaction`>>
|`transaction`
|Controls the name of the topic to which the connector sends transaction metadata messages. The topic name has this pattern: +
+
_topic.prefix_._topic.transaction_ +
+
For example, if the topic prefix is `fulfillment`, the default topic name is `fulfillment.transaction`.
|[[mongodb-property-custom-metric-tags]]<<mongodb-property-custom-metric-tags, `custom.metric.tags`>>
|`No default`
|Defines tags that customize MBean object names by adding metadata that provides contextual information.
Specify a comma-separated list of key-value pairs.
Each key represents a tag for the MBean object name, and the corresponding value represents a value for the key, for example, +
`k1=v1,k2=v2`
The connector appends the specified tags to the base MBean object name.
Tags can help you to organize and categorize metrics data.
You can define tags to identify particular application instances, environments, regions, versions, and so forth.
For more information, see xref:customized-mbean-names[Customized MBean names].
|[[mongodb-property-errors-max-retires]]<<mongodb-property-errors-max-retires, `errors.max.retries`>>
|`-1`
|Specifies how the connector responds after an operation that results in a retriable error, such as a connection error. +
Set one of the following options:
`-1`:: No limit. The connector always restarts automatically, and retries the operation, regardless of the number of previous failures.
`0`:: Disabled. The connector fails immediately, and never retries the operation.
User intervention is required to restart the connector.
`> 0`:: The connector restarts automatically until it reaches the specified maximum number of retries.
After the next failure, the connector stops, and user intervention is required to restart it.
|===
[id="debezium-mongodb-connector-kafka-signals-configuration-properties"]
==== {prodname} connector Kafka signals configuration properties
include::{partialsdir}/modules/all-connectors/ref-connector-pass-through-kafka-signals-configuration-properties.adoc[leveloffset=+1]
[id="debezium-mongodb-connector-kafka-notifications-configuration-properties"]
==== {prodname} connector sink notifications configuration properties
include::{partialsdir}/modules/all-connectors/ref-connector-pass-through-kafka-notification-configuration-properties.adoc[leveloffset=+1]
// Type: assembly
// ModuleID: monitoring-debezium-mongodb-connector-performance
// Title: Monitoring {prodname} MongoDB connector performance
[[mongodb-monitoring]]
== Monitoring
The {prodname} MongoDB connector has two metric types in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect have.
* xref:mongodb-snapshot-metrics[Snapshot metrics] provide information about connector operation while performing a snapshot.
* xref:mongodb-streaming-metrics[Streaming metrics] provide information about connector operation when the connector is capturing changes and streaming change event records.
The {link-prefix}:{link-debezium-monitoring}#monitoring-debezium[{prodname} monitoring documentation] provides details about how to expose these metrics by using JMX.
// Type: concept
// ModuleID: monitoring-debezium-mongodb-connectors-customized-mbean-names
// Title: Customized names for Db2 connector snapshot and streaming MBean objects
=== Customized MBean names
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=mbeans-shared]
// Type: reference
// ModuleID: monitoring-debezium-during-mongodb-snapshots
// Title: Monitoring {prodname} during MongoDB snapshots
[[mongodb-snapshot-metrics]]
=== Snapshot Metrics
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=mongodb-snapshot]
include::{partialsdir}/modules/all-connectors/ref-connector-monitoring-snapshot-metrics.adoc[leveloffset=+1]
The {prodname} MongoDB connector also provides the following custom snapshot metrics:
[cols="3,2,5",options="header"]
|===
|Attribute |Type |Description
|`NumberOfDisconnects`
|`long`
|Number of database disconnects.
|===
// Type: reference
// ModuleID: monitoring-debezium-mongodb-connector-record-streaming
// Title: Monitoring {prodname} MongoDB connector record streaming
[[mongodb-streaming-metrics]]
=== Streaming Metrics
include::{partialsdir}/modules/all-connectors/frag-common-mbean-name.adoc[leveloffset=+1,tags=mongodb-streaming]
include::{partialsdir}/modules/all-connectors/ref-connector-monitoring-streaming-metrics.adoc[leveloffset=+1]
The {prodname} MongoDB connector also provides the following custom streaming metrics:
[cols="3,2,5",options="header"]
|===
|Attribute |Type |Description
|`NumberOfDisconnects`
|`long`
|Number of database disconnects.
|`NumberOfPrimaryElections`
|`long`
|Number of primary node elections.
|===
// Type: concept
// ModuleID: how-debezium-mongodb-connectors-handle-faults-and-problems
// Title: How {prodname} MongoDB connectors handle faults and problems
[[mongodb-when-things-go-wrong]]
== MongoDB connector common issues
{prodname} is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event.
When the system is operating normally and is managed carefully, then {prodname} provides _exactly once_ delivery of every change event.
If a fault occurs, the system does not lose any events.
However, while it is recovering from the fault, it might repeat some change events.
In such situations, {prodname}, like Kafka, provides _at least once_ delivery of change events.
ifdef::community[]
The rest of this section describes how {prodname} handles various kinds of faults and problems.
endif::community[]
ifdef::product[]
The following topics provide details about how the {prodname} MongoDB connector handles various kinds of faults and problems.
* xref:debezium-mongodb-connector-configuration-and-startup-errors[]
* xref:mongodb-becomes-unavailable-while-debezium-is-running[]
* xref:debezium-mongodb-kafka-connect-process-stops-gracefully[]
* xref:debezium-mongodb-kafka-connect-process-crashes[]
* xref:debezium-mongodb-kafka-process-becomes-unavailable[]
* xref:debezium-mongodb-connector-is-stopped-for-a-long-interval[]
* xref:mongodb-crash-results-in-lost-commits[]
endif::product[]
[id="debezium-mongodb-connector-configuration-and-startup-errors"]
=== Configuration and startup errors
In the following situations, the connector fails when trying to start, reports an error or exception in the log, and stops running:
* The connector's configuration is invalid.
* The connector cannot successfully connect to MongoDB by using the specified connection parameters.
After a failure, the connector attempts to reconnect by using exponential backoff.
You can configure the maximum number of reconnection attempts.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.
[id="mongodb-becomes-unavailable-while-debezium-is-running"]
=== MongoDB becomes unavailable
Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.
The attempts to reconnect are controlled by three properties:
* `connect.backoff.initial.delay.ms` - The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).
* `connect.backoff.max.delay.ms` - The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).
* `connect.max.attempts` - The maximum number of attempts before an error is produced, with a default of 16.
Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.
[cols="30%a,30%a,40%a",options="header"]
|===
|Reconnection attempt number
|Delay before attempt, in seconds
|Total delay before attempt, in minutes and seconds
|1 |1 |00:01
|2 |2 |00:03
|3 |4 |00:07
|4 |8 |00:15
|5 |16 |00:31
|6 |32 |01:03
|7 |64 |02:07
|8 |120|04:07
|9 |120|06:07
|10 |120|08:07
|11 |120|10:07
|12 |120|12:07
|13 |120|14:07
|14 |120|16:07
|15 |120|18:07
|16 |120|20:07
|===
[id="connector-error-invalid-resume-token"]
=== Connector Unable to Start - InvalidResumeToken or ChangeStreamHistoryLost
A connector that is stopped for a long period fails to start, and reports the following exception:
----
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
----
The preceding exception indicates that the entry that corresponds to the connector's resume token is no longer present in the oplog.
Because the oplog no longer contains the last offset that the connector processed, the connector cannot resume streaming.
You can use either of the following options to recover from the failure:
* Delete the failed connector, and create a new connector with the same configuration but with a different connector name.
* Pause the connector and then remove offsets, or change the offset topic.
To help prevent failures related to missing resume tokens, xref:mongodb-optimal-oplog-config[optimize configuration of the oplog].
[id="debezium-mongodb-kafka-connect-process-stops-gracefully"]
=== Kafka Connect process stops gracefully
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off.
There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.
[id="debezium-mongodb-kafka-connect-process-crashes"]
=== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets.
When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes.
However, the MongoDB connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash.
The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
[NOTE]
====
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. {prodname} changes are idempotent, so a sequence of events always results in the same state.
{prodname} also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event's unique transaction identifier (`h`) and timestamp (`sec` and `ord`). Consumers can keep track of other of these values to know whether it has already seen a particular event.
====
[id="debezium-mongodb-kafka-process-becomes-unavailable"]
=== Kafka becomes unavailable
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency that you have specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
[id="debezium-mongodb-connector-is-stopped-for-a-long-interval"]
=== Connector fails after it is stopped for a long interval if `snapshot.mode` is set to `initial`
If the connector is gracefully stopped, users might continue to perform operations on replica set members.
Changes that occur while the connector is offline continue to be recorded in MongoDB's oplog.
In most cases, after the connector is restarted, it reads the offset value in the oplog to determine the last operation that it streamed for each replica set, and then resumes streaming changes from that point.
After the restart, database operations that occurred while the connector was stopped are emitted to Kafka as usual, and after some time, the connector catches up with the database.
The amount of time required for the connector to catch up depends on the capabilities and performance of Kafka and the volume of changes that occurred in the database.
However, if the connector remains stopped for a long enough interval, it can occur that MongoDB purges the oplog during the time that the connector is inactive, resulting in the loss of information about the connector's last position.
After the connector restarts, it cannot resume streaming, because the oplog no longer contains the previous offset value that marks the last operation that the connector processed.
The connector also cannot perform a snapshot, as it typically would when the `snapshot.mode` property is set to `initial`, and no offset value is present.
In this case, a mismatch exists, because the oplog does not contain the value of the previous offset, but the offset value is present in the connector's internal Kafka offsets topic.
An error results and the connector fails.
To recover from the failure, delete the failed connector, and create a new connector with the same configuration but with a different connector name.
When you start the new connector, it performs a snapshot to ingest the state of database, and then resumes streaming.
[id="mongodb-crash-results-in-lost-commits"]
=== MongoDB loses writes
In certain failure situations, MongoDB can lose commits, which results in the MongoDB connector being unable to capture the lost changes.
For example, if the primary crashes suddenly after it applies a change and records the change to its oplog, the oplog might become unavailable before secondary nodes can read its contents.
As a result, the secondary node that is elected as the new primary node might be missing the most recent changes from its oplog.
At this time, there is no way to prevent this side effect in MongoDB.