{prodname}'s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics.
The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and awaiting the resolution of communications problems.
MongoDB's replication mechanism provides redundancy and high availability, and is the preferred way to run MongoDB in production.
MongoDB connector captures the changes in a replica set or sharded cluster.
A MongoDB _replica set_ consists of a set of servers that all have copies of the same data, and replication ensures that all changes made by clients to documents on the replica set's _primary_ are correctly applied to the other replica set's servers, called _secondaries_.
MongoDB replication works by having the primary record the changes in its _oplog_ (or operation log), and then each of the secondaries reads the primary's oplog and applies in order all of the operations to their own documents.
When a new server is added to a replica set, that server first performs an https://docs.mongodb.com/manual/core/replica-set-sync/[snapshot] of all of the databases and collections on the primary, and then reads the primary's oplog to apply all changes that might have been made since it began the snapshot.
And, when the connector sees a replica set for the first time, it looks at the oplog to get the last recorded transaction and then performs a snapshot of the primary's databases and collections.
When all the data is copied, the connector then starts streaming changes from the position it read earlier from the oplog. Operations in the MongoDB oplog are https://docs.mongodb.com/manual/core/replica-set-oplog/[idempotent], so no matter how many times the operations are applied, they result in the same end state.
As the MongoDB connector processes changes, it periodically records the position in the oplog where the event originated.
When the MongoDB connector stops, it records the last oplog position that it processed, so that upon restart it simply begins streaming from that position.
In other words, the connector can be stopped, upgraded or maintained, and restarted some time later, and it will pick up exactly where it left off without losing a single event.
Of course, MongoDB's oplogs are usually capped at a maximum size, which means that the connector should not be stopped for too long, or else some of the operations in the oplog might be purged before the connector has a chance to read them.
The MongoDB connector is also quite tolerant of changes in membership and leadership of the replica sets, of additions or removals of shards within a sharded cluster, and network problems that might cause communication failures.
The connector always uses the replica set's primary node to stream changes, so when the replica set undergoes an election and a different node becomes primary, the connector will immediately stop streaming changes, connect to the new primary, and start streaming changes using the new primary node.
Likewise, if connector experiences any problems communicating with the replica set primary, it will try to reconnect (using exponential backoff so as to not overwhelm the network or replica set) and continue streaming changes from where it last left off.
The MongoDB connector uses MongoDB's oplog to capture the changes, so the connector works only with MongoDB replica sets or with sharded clusters where each shard is a separate replica set.
See the MongoDB documentation for setting up a https://docs.mongodb.com/manual/replication/[replica set] or https://docs.mongodb.com/manual/sharding/[sharded cluster].
Also, be sure to understand how to enable https://docs.mongodb.com/manual/tutorial/deploy-replica-set-with-keyfile-access-control/#deploy-repl-set-with-auth[access control and authentication] with replica sets.
You must also have a MongoDB user that has the appropriate roles to read the `admin` database where the oplog can be read. Additionally, the user must also be able to read the `config` database in the configuration server of a sharded cluster and must have `listDatabases` privilege action.
To use the MongoDB connector with a replica set, provide the addresses of one or more replica set servers as _seed addresses_ through the connector's `mongodb.hosts` property.
The connector will use these seeds to connect to the replica set, and then once connected will get from the replica set the complete set of members and which member is primary.
The connector will start a task to connect to the primary and capture the changes from the primary's oplog.
When the replica set elects a new primary, the task will automatically switch over to the new primary.
When MongoDB is fronted by a proxy (such as with Docker on OS X or Windows), then when a client connects to the replica set and discovers the members, the MongoDB client will exclude the proxy as a valid member and will attempt and fail to connect directly to the members rather than go through the proxy.
In such a case, set the connector's optional `mongodb.members.auto.discover` configuration property to `false` to instruct the connector to forgo membership discovery and instead simply use the first seed address (specified via the `mongodb.hosts` property) as the primary node.
This may work, but still make cause issues when election occurs.
To use the MongoDB connector with a sharded cluster, configure the connector with the host addresses of the _configuration server_ replica set. When the connector connects to this replica set, it discovers that it is acting as the configuration server for a sharded cluster, discovers the information about each replica set used as a shard in the cluster, and will then start up a separate task to capture the changes from each replica set. If new shards are added to the cluster or existing shards removed, the connector will automatically adjust its tasks accordingly.
When a MongoDB connector is configured and deployed, it starts by connecting to the MongoDB servers at the seed addresses, and determines the details about each of the available replica sets.
Since each replica set has its own independent oplog, the connector will try to use a separate task for each replica set.
The connector can limit the maximum number of tasks it will use, and if not enough tasks are available the connector will assign multiple replica sets to each task, although the task will still use a separate thread for each replica set.
When running the connector against a sharded cluster, use a value of `tasks.max` that is greater than the number of replica sets.
This will allow the connector to create one task for each replica set, and will let Kafka Connect coordinate, distribute, and manage the tasks across all of the available worker processes.
The connector configuration property `mongodb.name` serves as a _logical name_ for the MongoDB replica set or sharded cluster.
The connector uses the logical name in a number of ways: as the prefix for all topic names, and as a unique identifier when recording the oplog position of each replica set.
When a task starts up using a replica set, it uses the connector's logical name and the replica set name to find an _offset_ that describes the position where the connector previously stopped reading changes.
If an offset can be found and it still exists in the oplog, then the task immediately proceeds with {link-prefix}:{link-mongodb-connector}#mongodb-streaming-changes[streaming changes], starting at the recorded offset position.
However, if no offset is found or if the oplog no longer contains that position, the task must first obtain the current state of the replica set contents by performing a _snapshot_.
This process starts by recording the current position of the oplog and recording that as the offset (along with a flag that denotes a snapshot has been started).
The task will then proceed to copy each collection, spawning as many threads as possible (up to the value of the `initial.sync.max.threads` configuration property) to perform this work in parallel.
The connector will record a separate _read event_ for each document it sees, and that read event will contain the object's identifier, the complete state of the object, and _source_ information about the MongoDB replica set where the object was found.
Try to avoid task reassignment and reconfiguration while the connector is performing a snapshot of any replica sets. The connector does log messages with the progress of the snapshot. For utmost control, run a separate cluster of Kafka Connect for each connector.
Once the connector task for a replica set has an offset, it uses the offset to determine the position in the oplog where it should start streaming changes.
The task will then connect to the replica set's primary node and start streaming changes from that position, processing all of the create, insert, and delete operations and converting them into {prodname} {link-prefix}:{link-mongodb-connector}#mongodb-events[change events]. Each change event includes the position in the oplog where the operation was found, and the connector periodically records this as its most recent offset. The interval at which the offset is recorded is governed by link:https://kafka.apache.org/documentation/#offset.flush.interval.ms[`offset.flush.interval.ms`], which is a Kafka Connect worker configuration property.
When the connector is stopped gracefully, the last offset processed is recorded so that, upon restart, the connector will continue exactly where it left off.
If the connector's tasks terminate unexpectedly, however, then the tasks may have processed and generated events after it last records the offset but before the last offset is recorded; upon restart, the connector begins at the last _recorded_ offset, possibly generating some the same events that were previously generated just prior to the crash.
When everything is operating nominally, Kafka consumers will actually see every message *_exactly once_*. However, when things go wrong Kafka can only guarantee consumers will see every message *_at least once_*. Therefore, your consumers need to anticipate seeing messages more than once.
As mentioned above, the connector tasks always use the replica set's primary node to stream changes from the oplog, ensuring that the connector sees the most up-to-date operations as possible and can capture the changes with lower latency than if secondaries were to be used instead. When the replica set elects a new primary, the connector immediately stops streaming changes, connects to the new primary, and starts streaming changes from the new primary node at the same position. Likewise, if the connector experiences any problems communicating with the replica set members, it trys to reconnect, by using exponential backoff so as to not overwhelm the replica set, and once connected it continues streaming changes from where it last left off. In this way, the connector is able to dynamically adjust to changes in replica set membership and automatically handle communication failures.
To summarize, the MongoDB connector continues running in most situations. Communication problems might cause the connector to wait until the problems are resolved.
The name of the Kafka topics always takes the form _logicalName_._databaseName_._collectionName_, where _logicalName_ is the {link-prefix}:{link-mongodb-connector}#mongodb-logical-connector-name[logical name] of the connector as specified with the `mongodb.name` configuration property, _databaseName_ is the name of the database where the operation occurred, and _collectionName_ is the name of the MongoDB collection in which the affected document existed.
For example, consider a MongoDB replica set with an `inventory` database that contains four collections: `products`, `products_on_hand`, `customers`, and `orders`.
If the connector monitoring this database were given a logical name of `fulfillment`, then the connector would produce events on these four Kafka topics:
Kafka maintains total order only for events written to a single topic partition.
Partitioning the events by key does mean that all events with the same key always go to the same partition. This ensures that all events for a specific document are always totally ordered.
The {prodname} MongoDB connector generates a data change event for each document-level operation that inserts, updates, or deletes data. Each event contains a key and a value. The structure of the key and the value depends on the collection that was changed.
{prodname} and Kafka Connect are designed around _continuous streams of event messages_. However, the structure of these events may change over time, which can be difficult for consumers to handle. To address this, each event contains the schema for its content or, if you are using a schema registry, a schema ID that a consumer can use to obtain the schema from the registry. This makes each event self-contained.
The following skeleton JSON shows the basic four parts of a change event. However, how you configure the Kafka Connect converter that you choose to use in your application determines the representation of these four parts in change events. A `schema` field is in a change event only when you configure the converter to produce it. Likewise, the event key and event payload are in a change event only if you configure a converter to produce it. If you use the JSON converver and you configure it to produce all four basic change event parts, change events have this structure:
[source,json,index=0]
----
{
"schema": { //<1>
...
},
"payload": { //<2>
...
},
"schema": { //<3>
...
},
"payload": { //<4>
...
},
}
----
.Overview of change event basic content
[cols="1,2,7",options="header"]
|===
|Item |Field name |Description
|1
|`schema`
|The first `schema` field is part of the event key. It specifies a Kafka Connect schema that describes what is in the event key's `payload` portion. In other words, the first `schema` field describes the structure of the key for the document that was changed.
|2
|`payload`
|The first `payload` field is part of the event key. It has the structure described by the previous `schema` field and it contains the key for the document that was changed.
|3
|`schema`
|The second `schema` field is part of the event value. It specifies the Kafka Connect schema that describes what is in the event value's `payload` portion. In other words, the second `schema` describes the structure of the document that was changed. Typically, this schema contains nested schemas.
|4
|`payload`
|The second `payload` field is part of the event value. It has the structure described by the previous `schema` field and it contains the actual data for the document that was changed.
|===
By default, the connector streams change event records to topics with names that are the same as the event's originating collection. See {link-prefix}:{link-mongodb-connector}#mongodb-topic-names[topic names].
[WARNING]
====
The MongoDB connector ensures that all Kafka Connect schema names adhere to the link:http://avro.apache.org/docs/current/spec.html#names[Avro schema name format]. This means that the logical server name must start with a Latin letter or an underscore, that is, a-z, A-Z, or \_. Each remaining character in the logical server name and each character in the database and collection names must be a Latin letter, a digit, or an underscore, that is, a-z, A-Z, 0-9, or \_. If there is an invalid character it is replaced with an underscore character.
This can lead to unexpected conflicts if the logical server name, a database name, or a collection name contains invalid characters, and the only characters that distinguish names from one another are invalid and thus replaced with underscores.
A change event's key contains the schema for the changed document's key and the changed document's actual key. For a given collection, both the schema and its corresponding payload contain a single `id` field.
The value of this field is the document's identifier represented as a string that is derived from link:https://docs.mongodb.com/manual/reference/mongodb-extended-json/[MongoDB extended JSON serialization strict mode].
Consider a connector with a logical name of `fulfillment`, a replica set containing an `inventory` database, and a `customers` collection that contains documents such as the following.
Every change event that captures a change to the `customers` collection has the same event key schema. For as long as the `customers` collection has the previous definition, every change event that captures a change to the `customers` collection has the following key structure. In JSON, it looks like this:
|The schema portion of the key specifies a Kafka Connect schema that describes what is in the key's `payload` portion.
|2
|`fulfillment.inventory.customers.Key`
a|Name of the schema that defines the structure of the key's payload. This schema describes the structure of the key for the document that was changed. Key schema names have the format _connector-name_._database-name_._collection-name_.`Key`. In this example: +
* `fulfillment` is the name of the connector that generated this event. +
* `inventory` is the database that contains the collection that was changed. +
* `customers` is the collection that contains the document that was updated.
|3
|`optional`
|Indicates whether the event key must contain a value in its `payload` field. In this example, a value in the key's payload is required. A value in the key's payload field is optional when a document does not have a key.
|Contains the key for the document for which this change event was generated. In this example, the key contains a single `id` field of type `string` whose value is `1004`.
|===
This example uses a document with an integer identifier, but any valid MongoDB document identifier works the same way, including a document identifier. For a document identifier, an event key's `payload.id` value is a string that represents the updated document's original `_id` field as a MongoDB extended JSON serialization that uses strict mode. The following table provides examples of how different types of `_id` fields are represented.
.Examples of representing document `_id` fields in event key payloads
The value in a change event is a bit more complicated than the key. Like the key, the value has a `schema` section and a `payload` section. The `schema` section contains the schema that describes the `Envelope` structure of the `payload` section, including its nested fields. Change events for operations that create, update or delete data all have a value payload with an envelope structure.
The following example shows the value portion of a change event that the connector generates for an operation that creates data in the `customers` collection:
|The value's schema, which describes the structure of the value's payload. A change event's value schema is the same in every change event that the connector generates for a particular collection.
a|In the `schema` section, each `name` field specifies the schema for a field in the value's payload. +
+
`io.debezium.data.Json` is the schema for the payload's `after`, `patch`, and `filter` fields. This schema is specific to the `customers` collection. A _create_ event is the only kind of event that contains an `after` field. An _update_ event contains a `filter` field and a `patch` field. A _delete_ event contains a `filter` field, but not an `after` field nor a `patch` field.
a|`io.debezium.connector.mongo.Source` is the schema for the payload's `source` field. This schema is specific to the MongoDB connector. The connector uses it for all events that it generates.
a|`dbserver1.inventory.customers.Envelope` is the schema for the overall structure of the payload, where `dbserver1` is the connector name, `inventory` is the database, and `customers` is the collection. This schema is specific to the collection.
It may appear that the JSON representations of the events are much larger than the documents they describe. This is because the JSON representation must include the schema and the payload portions of the message.
However, by using the {link-prefix}:{link-avro-serialization}[Avro converter], you can significantly decrease the size of the messages that the connector streams to Kafka topics.
|An optional field that specifies the state of the document after the event occurred. In this example, the `after` field contains the values of the new document's `\_id`, `first_name`, `last_name`, and `email` fields. The `after` value is always a string. By convention, it contains a JSON representation of the document. MongoDB's oplog entries contain the full state of a document only for _create_ events; in other words, a _create_ event is the only kind of event that contains an _after_ field.
a|Mandatory field that describes the source metadata for the event. This field contains information that you can use to compare this event with other events, with regard to the origin of the events, the order in which the events occurred, and whether events were part of the same transaction. The source metadata includes:
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contain the new document.
* If the event was part of a snapshot.
* Timestamp for when the event occurred and ordinal of the event within the timestamp.
* Unique identifier of the MongoDB operation, which depends on the version of MongoDB. It is either the `h` field in the oplog event, or a field named `stxnid`, which represents the `lsid` and `txnNumber` fields from the oplog event.
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `c` indicates that the operation created a document. Valid values are:
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.
The value of a change event for an update in the sample `customers` collection has the same schema as a _create_ event for that collection. Likewise, the event value's payload has the same structure. However, the event value payload contains different values in an _update_ event. An _update_ event does not have an `after` value. Instead, it has these two fields:
* `patch` is a string field that contains the JSON representation of the idempotent update operation
* `filter` is a string field that contains the JSON representation of the selection criteria for the update. The `filter` string can include multiple shard key fields for sharded collections.
Here is an example of a change event value in an event that the connector generates for an update in the `customers` collection:
a|Mandatory string that describes the type of operation that caused the connector to generate the event. In this example, `u` indicates that the operation updated a document.
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.
|Contains the JSON string representation of the actual MongoDB idempotent change to the document. In this example, the update changed the `first_name` field to a new value. +
+
An _update_ event value does not contain an `after` field.
|4
|`filter`
|Contains the JSON string representation of the MongoDB selection criteria that was used to identify the document to be updated.
a|Mandatory field that describes the source metadata for the event. This field contains the same information as a _create_ event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contain the updated document.
* If the event was part of a snapshot.
* Timestamp for when the event occurred and ordinal of the event within the timestamp.
* Unique identifier of the MongoDB operation, which depends on the version of MongoDB. It is either the `h` field in the oplog event, or a field named `stxnid`, which represents the `lsid` and `txnNumber` fields from the oplog event.
In a {prodname} change event, MongoDB provides the content of the `patch` field. The format of this field depends on the version of the MongoDB database. Consequently, be prepared for potential changes to the format when you upgrade to a newer MongoDB database version. Examples in this document were obtained from MongoDB 3.4, In your application, event formats might be different.
In MongoDB's oplog, _update_ events do not contain the _before_ or _after_ states of the changed document. Consequently, it is not possible for a {prodname} connector to provide this information. However, a {prodname} connector provides a document's starting state in _create_ and _read_ events. Downstream consumers of the stream can reconstruct document state by keeping the latest state for each document and comparing the state in a new event with the saved state. {prodname} connector's are not able to keep this state.
The value in a _delete_ change event has the same `schema` portion as _create_ and _update_ events for the same collection. The `payload` portion in a _delete_ event contains values that are different from _create_ and _update_ events for the same collection. In particular, a _delete_ event contains neither an `after` value nor a `patch` value. Here is an example of a _delete_ event for a document in the `customers` collection:
a|Optional field that displays the time at which the connector processed the event. The time is based on the system clock in the JVM running the Kafka Connect task.
a|Mandatory field that describes the source metadata for the event. This field contains the same information as a _create_ or _update_ event for the same collection, but the values are different since this event is from a different position in the oplog. The source metadata includes:
* Logical name of the MongoDB replica set, which forms a namespace for generated events and is used in Kafka topic names to which the connector writes.
* Names of the collection and database that contained the deleted document.
* If the event was part of a snapshot.
* Timestamp for when the event occurred and ordinal of the event within the timestamp.
* Unique identifier of the MongoDB operation, which depends on the version of MongoDB. It is either the `h` field in the `oplog` event, or a field named `stxnid`, which represents the `lsid` and `txnNumber` fields from the `oplog` event.
|===
MongoDB connector events are designed to work with link:{link-kafka-docs}/#compaction[Kafka log compaction]. Log compaction enables removal of some older messages as long as at least the most recent message for every key is kept. This lets Kafka reclaim storage space while ensuring that the topic contains a complete data set and can be used for reloading key-based state.
[id="mongodb-tombstone-events"]
.Tombstone events
All MongoDB connector events for a uniquely identified document have exactly the same key. When a document is deleted, the _delete_ event value still works with log compaction because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that key, the message value must be `null`. To make this possible, after {prodname}’s MongoDB connector emits a _delete_ event, the connector emits a special tombstone event that has the same key but a `null` value. A tombstone event informs Kafka that all messages with that same key can be removed.
* `id` - string representation of unique transaction identifier
* `event_count` (for `END` events) - total number of events emmitted by the transaction
* `data_collections` (for `END` events) - an array of pairs of `data_collection` and `event_count` that provides number of events emitted by changes originating from given data collection
With https://zookeeper.apache.org[Zookeeper], http://kafka.apache.org/[Kafka], and {link-kafka-docs}.html#connect[Kafka Connect] installed, the remaining tasks to deploy a {prodname} MongoDB connector are to
extract the JAR files into your Kafka Connect environment, and add the directory with the JAR files to {link-kafka-docs}/#connectconfigs[Kafka Connect's `plugin.path`]. You then need to
restart your Kafka Connect process to pick up the new JAR files.
If you need immutable containers, see link:https://hub.docker.com/r/debezium/[{prodname}'s Container images] for Zookeeper, Kafka, and Kafka Connect with the MongoDB connector already installed and ready to run. You can also link:https://debezium.io/blog/2016/05/31/Debezium-on-Kubernetes/[run {prodname} on Kubernetes and OpenShift].
The {prodname} xref:tutorial.adoc[tutorial] walks you through using these images, and this is a great way to learn about {prodname}.
To deploy a {prodname} MongoDB connector, install the {prodname} MongoDB connector archive, configure the connector, and start the connector by adding its configuration to Kafka Connect.
. Use link:https://access.redhat.com/products/red-hat-amq#streams[Red Hat AMQ Streams] to set up Apache Kafka and Kafka Connect on OpenShift. AMQ Streams offers operators and images that bring Kafka to OpenShift.
When the connector starts, it will perform a snapshot of the collections in your MongoDB replica sets and start reading the replica sets' oplogs, producing events for every inserted, updated, and deleted document.
Following is an example of the configuration for a MongoDB connector that monitors a MongoDB replica set `rs0` at port 27017 on 192.168.99.100, which we logically name `fullfillment`.
Typically, you configure the {prodname} MongoDB connector in a `.json` file using the configuration properties available for the connector.
<4> The _logical name_ of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
<5> A list of regular expressions that match the collection namespaces (for example, <dbName>.<collectionName>) of all collections to be monitored. This is optional.
Following is an example of the configuration for a MongoDB connector that monitors a MongoDB replica set `rs0` at port 27017 on 192.168.99.100, which we logically name `fullfillment`.
Typically, you configure the {prodname} MongoDB connector in a `.yaml` file using the configuration properties available for the connector.
<4> The _logical name_ of the MongoDB replica set, which forms a namespace for generated events and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro converter is used.
<5> A list of regular expressions that match the collection namespaces (for example, <dbName>.<collectionName>) of all collections to be monitored. This is optional.
See the {link-prefix}:{link-mongodb-connector}#mongodb-connector-properties[complete list of connector properties] that can be specified in these configurations.
This configuration can be sent via POST to a running Kafka Connect service, which will then record the configuration and start up the one connector task that will connect to the MongoDB replica set or sharded cluster, assign tasks for each replica set, perform a snapshot if necessary, read the oplog, and record events to Kafka topics.
To run a {prodname} MongoDB connector, create a connector configuration file and add it to your Kafka Connect cluster.
.Prerequisites
* MongoDB is set up to run a {prodname} connector.
* A {prodname} MongoDB connector is installed.
.Procedure
. Create a configuration file for the MongoDB connector.
. Use the link:{link-kafka-docs}/#connect_rest[Kafka Connect REST API] to add that connector configuration to your Kafka Connect cluster.
endif::community[]
ifdef::product[]
You can use a provided {prodname} container to deploy a {prodname} MongoDB connector. In this procedure, you build a custom Kafka Connect container image for {prodname}, configure the {prodname} connector as needed, and then add your connector configuration to your Kafka Connect environment.
.Prerequisites
* You have Podman installed and sufficient rights to create and manage containers.
* You installed the {prodname} MongoDB connector archive.
.Procedure
. Extract the {prodname} MongoDB connector archive to create a directory structure for the connector plug-in, for example:
+
[subs=+macros]
----
pass:quotes[*tree ./my-plugins/*]
./my-plugins/
├── debezium-connector-mongodb
│ ├── ...
----
. Create and publish a custom image for running your {prodname} connector:
.. Create a new `Dockerfile` by using `{DockerKafkaConnect}` as the base image. In the following example, you would replace _my-plugins_ with the name of your plug-ins directory:
+
[subs=+macros]
----
FROM registry.redhat.io/amq7/amq-streams-kafka-25:1.5.0
Before Kafka Connect starts running the connector, Kafka Connect loads any third-party plug-ins that are in the `/opt/kafka/plugins` directory.
.. Build the docker container image. For example, if you saved the docker file that you created in the previous step as `debezium-container-for-mongodb`, then you would run the following command:
.. Point to the new container image. Do one of the following:
+
* Edit the `spec.image` property of the `KafkaConnector` custom resource. If set, this property overrides the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable in the Cluster Operator. For example:
+
[source,yaml,subs="+attributes"]
----
apiVersion: {KafkaConnectApiVersion}
kind: KafkaConnector
metadata:
name: my-connect-cluster
spec:
#...
image: debezium-container-for-mongodb
----
+
* In the `install/cluster-operator/050-Deployment-strimzi-cluster-operator.yaml` file, edit the `STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE` variable to point to the new container image and reinstall the Cluster Operator. If you edit this file you must apply it to your OpenShift cluster.
. Create a `KafkaConnector` custom resource that defines your {prodname} MongoDB connector instance. See {LinkDebeziumUserGuide}#mongodb-example-configuration[the connector configuration example].
. Apply the connector instance, for example:
+
`oc apply -f inventory-connector.yaml`
+
This registers `inventory-connector` and the connector starts to run against the `inventory` database.
. Verify that the connector was created and has started to capture changes in the specified database. You can verify the connector instance by watching the Kafka Connect log output as, for example, `inventory-connector` starts.
.. Display the Kafka Connect log output:
+
[source,shell,options="nowrap"]
----
oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
----
.. Review the log output to verify that the initial snapshot has been executed. You should see something like the following lines:
+
[source,shell,options="nowrap"]
----
... INFO Starting snapshot for ...
... INFO Snapshot is using user 'debezium' ...
----
endif::product[]
.Results
When the connector starts, it {link-prefix}:{link-mongodb-connector}#mongodb-performing-a-snapshot[performs a consistent snapshot] of the MongoDB databases that the connector is configured for. The connector then starts generating data change events for document-level operations and streaming change event records to Kafka topics.
The {prodname} MongoDB connector has two metric types in addition to the built-in support for JMX metrics that Zookeeper, Kafka, and Kafka Connect have.
Please refer to the {link-prefix}:{link-debezium-monitoring}#monitoring-debezium[monitoring documentation] for details of how to expose these metrics via JMX.
|Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)
|The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If `mongodb.members.auto.discover` is set to `false`, then the host and port pair should be prefixed with the replica set name (e.g., `rs0/localhost:27017`).
|A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one {prodname} connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
|Database (authentication source) containing MongoDB credentials. This is required only when MongoDB is configured to use authentication with another authentication database than `admin`.
|When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If `true` the connection will not prevent man-in-the-middle attacks.
|An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in `database.include.list` is excluded from monitoring. By default all databases are monitored.
|An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in `database.exclude.list` is monitored.
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in `collection.include.list` is excluded from monitoring. Each identifier is of the form _databaseName_._collectionName_. By default the connector will monitor all collections except those in the `local` and `admin` databases.
|An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in `collection.exclude.list` is monitored. Each identifier is of the form _databaseName_._collectionName_.
|Specifies the criteria for running a snapshot upon startup of the connector. The default is *initial*, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The *never* option specifies that the connector should never use snapshots, instead the connector should proceed to tail the log.
|An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form _databaseName_._collectionName_._fieldName_._nestedFieldName_, where _databaseName_ and _collectionName_ may contain the wildcard (*) which matches any characters.
|An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form _databaseName_._collectionName_._fieldName_._nestedFieldName_:__newNestedFieldName__, where _databaseName_ and _collectionName_ may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.
|The maximum number of tasks that should be created for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.
|Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.
| Controls whether a tombstone event should be generated after a delete event. +
When `true` the delete operations are represented by a delete event and a subsequent tombstone event. When `false` only a delete event is sent. +
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.
The following _advanced_ configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector's configuration.
|Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the `max.batch.size` property.
|Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.
|Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.
|Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).
|Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).
|Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for `connect.backoff.initial.delay.ms` and `connect.backoff.max.delay.ms` results in just over 20 minutes of attempts before failing.
|Boolean value that specifies whether the addresses in 'mongodb.hosts' are seeds that should be used to discover all members of the cluster or replica set (`true`), or whether the address(es) in `mongodb.hosts` should be used as is (`false`). The default is `true` and should be used in all cases except where MongoDB is {link-prefix}:{link-mongodb-connector}#mongodb-replicaset[fronted by a proxy].
This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.
Set this parameter to `0` to not send heartbeat messages at all. +
|`true` when connector configuration explicitly specifies the `key.converter` or `value.converter` parameters to use Avro, otherwise defaults to `false`.
{prodname} is a distributed system that captures all changes in multiple upstream databases, and will never miss or lose an event. Of course, when the system is operating nominally or being administered carefully, then {prodname} provides _exactly once_ delivery of every change event. However, if a fault does happen then the system will still not lose any events, although while it is recovering from the fault it may repeat some change events. Thus, in these abnormal situations {prodname} (like Kafka) provides _at least once_ delivery of change events.
The connector will fail upon startup, report an error/exception in the log, and stop running when the connector's configuration is invalid, or when the connector repeatedly fails to connect to MongoDB using the specified connectivity parameters. Reconnection is done using exponential backoff, and the maximum number of attempts is configurable.
In these cases, the error will have more details about the problem and possibly a suggested work around. The connector can be restarted when the configuration has been corrected or the MongoDB problem has been addressed.
Once the connector is running, if the primary node of any of the MongoDB replica sets become unavailable or unreachable, the connector will repeatedly attempt to reconnect to the primary node, using exponential backoff to prevent saturating the network or servers. If the primary remains unavailable after the configurable number of connection attempts, the connector will fail.
The attempts to reconnect are controlled by three properties:
* `connect.backoff.initial.delay.ms` - The delay before attempting to reconnect for the first time, with a default of 1 second (1000 milliseconds).
* `connect.backoff.max.delay.ms` - The maximum delay before attempting to reconnect, with a default of 120 seconds (120,000 milliseconds).
* `connect.max.attempts` - The maximum number of attempts before an error is produced, with a default of 16.
Each delay is double that of the prior delay, up to the maximum delay. Given the default values, the following table shows the delay for each failed connection attempt and the total accumulated time before failure.
If Kafka Connect is being run in distributed mode, and a Kafka Connect process is stopped gracefully, then prior to shutdown of that processes Kafka Connect will migrate all of the process' connector tasks to another Kafka Connect process in that group, and the new connector tasks will pick up exactly where the prior tasks left off.
There is a short delay in processing while the connector tasks are stopped gracefully and restarted on the new processes.
If the group contains only one process and that process is stopped gracefully, then Kafka Connect will stop the connector and record the last offset for each replica set. Upon restart, the replica set tasks will continue exactly where they left off.
=== Kafka Connect process crashes
If the Kafka Connector process stops unexpectedly, then any connector tasks it was running will terminate without recording their most recently-processed offsets.
When Kafka Connect is being run in distributed mode, it will restart those connector tasks on other processes.
However, the MongoDB connectors will resume from the last offset _recorded_ by the earlier processes, which means that the new replacement tasks may generate some of the same change events that were processed just prior to the crash.
The number of duplicate events depends on the offset flush period and the volume of data changes just before the crash.
Because there is a chance that some events may be duplicated during a recovery from failure, consumers should always anticipate some events may be duplicated. {prodname} changes are idempotent, so a sequence of events always results in the same state.
{prodname} also includes with each change event message the source-specific information about the origin of the event, including the MongoDB event's unique transaction identifier (`h`) and timestamp (`sec` and `ord`). Consumers can keep track of other of these values to know whether it has already seen a particular event.
As the connector generates change events, the Kafka Connect framework records those events in Kafka using the Kafka producer API. Kafka Connect will also periodically record the latest offset that appears in those change events, at a frequency that you have specified in the Kafka Connect worker configuration. If the Kafka brokers become unavailable, the Kafka Connect worker process running the connectors will simply repeatedly attempt to reconnect to the Kafka brokers. In other words, the connector tasks will simply pause until a connection can be reestablished, at which point the connectors will resume exactly where they left off.
When the connector is restarted, it will resume streaming changes for each replica set where it last left off, recording change events for all of the changes that were made while the connector was stopped. If the connector is stopped long enough such that MongoDB purges from its oplog some operations that the connector has not read, then upon startup the connector will perform a snapshot.
Kafka Connect is written with Kafka best practices, and given enough resources will also be able to handle very large numbers of database change events. Because of this, when a connector has been restarted after a while, it is very likely to catch up with the database, though how quickly will depend upon the capabilities and performance of Kafka and the volume of changes being made to the data in MongoDB.
[NOTE]
====
If the connector remains stopped for long enough, MongoDB might purge older oplog files and the connector's last position may be lost.
In this case, when the connector configured with _initial_ snapshot mode (the default) is finally restarted, the MongoDB server will no longer have the starting point and the connector will fail with an error.
It is possible for MongoDB to lose commits in specific failure situations. For example, if the primary applies a change and records it in its oplog before it then crashes unexpectedly, the secondary nodes may not have had a chance to read those changes from the primary's oplog before the primary crashed. If one such secondary is then elected as primary, its oplog is missing the last changes that the old primary had recorded and no longer has those changes.
In these cases where MongoDB loses changes recorded in a primary's oplog, it is possible that the MongoDB connector may or may not capture these lost changes. At this time, there is no way to prevent this side effect of MongoDB.