776851157f
Use the official name to refer to the MongoDB product.
476 lines
15 KiB
Plaintext
476 lines
15 KiB
Plaintext
// Category: debezium-using
|
|
// Type: assembly
|
|
// Title: Configuring notifications to report connector status
|
|
// ModuleID: configuring-notifications-to-report-connector-status
|
|
[id="debezium-notification"]
|
|
= {prodname} notifications
|
|
ifdef::community[]
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
== Overview
|
|
endif::community[]
|
|
|
|
{prodname} notifications provide a mechanism to obtain status information about the connector.
|
|
Notifications can be sent to the following channels:
|
|
|
|
SinkNotificationChannel:: Sends notifications through the Connect API to a configured topic.
|
|
LogNotificationChannel:: Notifications are appended to the log.
|
|
JmxNotificationChannel:: Notifications are exposed as an attribute in a JMX bean.
|
|
ifdef::community[]
|
|
Custom:: Notifications are sent to a xref:debezium-notification-custom-channel[custom channel] that you implement.
|
|
endif::community[]
|
|
ifdef::product[]
|
|
For details about {prodname} notifications, see the following topics::
|
|
|
|
* xref:debezium-notifications-description-of-the-format-of-debezium-notifications[]
|
|
* xref:debezium-notifications-types-of-debezium-notifications[]
|
|
* xref:debezium-notifications-enabling-debezium-to-emit-events-to-notification-channels[]
|
|
// * xref:debezium-notification-custom-channel[]
|
|
endif::product[]
|
|
|
|
// Type: concept
|
|
// ModuleID: debezium-notifications-description-of-the-format-of-debezium-notifications
|
|
// Title: Description of the format of {prodname} notifications
|
|
[id="debezium-notification-format"]
|
|
== {prodname} notification format
|
|
|
|
Notification messages contain the following information:
|
|
|
|
|===
|
|
|Property |Description
|
|
|
|
|id
|
|
|A unique identifier that is assigned to the notification. For incremental snapshot notifications, the `id` is the same sent with the `execute-snapshot` signal.
|
|
|
|
|aggregate_type
|
|
|The data type of the aggregate root to which a notification is related.
|
|
In domain-driven design, exported events should always refer to an aggregate.
|
|
|
|
|type
|
|
|Provides status information about the event specified in the `aggregate_type` field.
|
|
|
|
|additional_data
|
|
|A Map<String,String> with detailed information about the notification.
|
|
For an example, see xref:debezium-notifications-about-the-progress-of-incremental-snapshots[{prodname} notifications about the progress of incremental snapshots].
|
|
|
|
|timestamp
|
|
|The time when the notification was created.
|
|
The value represents the number of milliseconds since the UNIX epoch.
|
|
|===
|
|
|
|
// Type: assembly
|
|
// Title: Types of {prodname} notifications
|
|
// ModuleID: debezium-notifications-types-of-debezium-notifications
|
|
[id="debezium-available-notifications"]
|
|
== Available notifications
|
|
|
|
{prodname} notifications deliver information about the progress of xref:debezium-notifications-about-the-status-of-an-initial-snapshot[initial snapshots] or xref:debezium-notifications-about-the-progress-of-incremental-snapshots[incremental snapshots].
|
|
|
|
// Title: Example: {prodname} notification that reports on the status of an initial snapshot
|
|
[id="debezium-notifications-about-the-status-of-an-initial-snapshot"]
|
|
=== {prodname} notifications about the status of an initial snapshot
|
|
|
|
The following example shows a typical notification that provides the status of an initial snapshot:
|
|
|
|
[source, json]
|
|
----
|
|
{
|
|
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
|
|
"aggregate_type": "Initial Snapshot",
|
|
"type": "COMPLETED", // <1>
|
|
"additional_data" : {
|
|
"connector_name": "myConnector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
[cols="1,7a",options="header",subs="+attributes"]
|
|
|===
|
|
|Item |Description
|
|
|
|
|1
|
|
|The `type` field can contain one of the following values:
|
|
|
|
* `COMPLETED`
|
|
* `ABORTED`
|
|
* `SKIPPED`
|
|
|
|
|===
|
|
|
|
|
|
The following table shows examples of the different payloads that might be present in notifications that report the status of initial snapshots:
|
|
|
|
|===
|
|
|Status|Payload
|
|
|
|
|STARTED
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"STARTED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|IN_PROGRESS
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"IN_PROGRESS",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collections":"table1, table2",
|
|
"current_collection_in_progress":"table1"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
Field `data_collection` are currently not supported for MongoDB connector
|
|
|
|
|TABLE_SCAN_COMPLETED
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"TABLE_SCAN_COMPLETED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collection":"table1, table2",
|
|
"scanned_collection":"table1",
|
|
"total_rows_scanned":"100",
|
|
"status":"SUCCEEDED"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
In the preceding example, the `additional_data.status` field can contain one of the following values:
|
|
|
|
`SQL_EXCEPTION`:: A SQL exception occurred while performing the snapshot.
|
|
`SUCCEEDED`:: The snapshot completed successfully.
|
|
|
|
Fields `total_rows_scanned` and `data_collection` are currently not supported for MongoDB connector
|
|
|COMPLETED
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"COMPLETED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|ABORTED
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"ABORTED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|SKIPPED
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
|
|
"aggregate_type":"Initial Snapshot",
|
|
"type":"SKIPPED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|===
|
|
|
|
// Type: reference
|
|
// Title: Example: {prodname} notifications that report on the progress of incremental snapshots
|
|
[id="debezium-notifications-about-the-progress-of-incremental-snapshots"]
|
|
=== {prodname} notifications about the progress of incremental snapshots
|
|
|
|
The following table shows examples of the different payloads that might be present in notifications that report the status of incremental snapshots:
|
|
|
|
|===
|
|
|Status|Payload
|
|
|
|
|Start
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"STARTED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collections":"table1, table2"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|Paused
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"PAUSED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collections":"table1, table2"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|Resumed
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"a9468204-769d-430f-96d2-b0933d4839f3",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"RESUMED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collections":"table1, table2"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|Stopped
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"ABORTED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|Processing chunk
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"IN_PROGRESS",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collections":"table1, table2",
|
|
"current_collection_in_progress":"table1",
|
|
"maximum_key":"100",
|
|
"last_processed_key":"50"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|Snapshot completed for a table
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"TABLE_SCAN_COMPLETED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector",
|
|
"data_collection":"table1, table2",
|
|
"scanned_collection":"table1",
|
|
"total_rows_scanned":"100",
|
|
"status":"SUCCEEDED"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
In the preceding example, the `additional_data.status` field can contain one of the following values:
|
|
|
|
`EMPTY`:: The table contains no values.
|
|
`NO_PRIMARY_KEY`:: Cannot complete snapshot; table has no primary key.
|
|
`SKIPPED`:: Cannot complete a snapshots for this type of table.
|
|
Refer to the logs for details.
|
|
`SQL_EXCEPTION`:: A SQL exception occurred while performing the snapshot.
|
|
`SUCCEEDED`:: The snapshot completed successfully.
|
|
`UNKNOWN_SCHEMA`:: Could not find a schema for the table.
|
|
Check the logs for the list of known tables.
|
|
|
|
|Completed
|
|
a|[source, json]
|
|
----
|
|
{
|
|
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
|
|
"aggregate_type":"Incremental Snapshot",
|
|
"type":"COMPLETED",
|
|
"additional_data":{
|
|
"connector_name":"my-connector"
|
|
},
|
|
"timestamp": "1695817046353"
|
|
}
|
|
----
|
|
|===
|
|
|
|
// Type: assembly
|
|
// ModuleID: debezium-notifications-enabling-debezium-to-emit-events-to-notification-channels
|
|
// Title: Enabling {prodname} to emit events to notification channels
|
|
[id="enabling-debezium-notifications"]
|
|
== Enabling {prodname} notifications
|
|
|
|
To enable {prodname} to emit notifications, specify a list of notification channels by setting the `notification.enabled.channels` configuration property.
|
|
By default, the following notification channels are available:
|
|
|
|
* `sink`
|
|
* `log`
|
|
* `jmx`
|
|
|
|
[IMPORTANT]
|
|
====
|
|
To use the `sink` notification channel, you must also set the `notification.sink.topic.name` configuration property to the name of the topic where you want {prodname} to send notifications.
|
|
====
|
|
|
|
// Type: procedure
|
|
// ModuleID: enabling-debezium-notifications-to-report-events-exposed-through-jmx-beans
|
|
// Title: Enabling {prodname} notifications to report events exposed through JMX beans
|
|
[id="access-debezium-jmx-notifications"]
|
|
=== Access to {prodname} JMX notifications
|
|
|
|
To enable {prodname} to report events that are exposed through JMX beans, complete the following configuration steps:
|
|
|
|
1. {link-prefix}:{link-debezium-monitoring}#monitoring-debezium[Enable the JMX MBean Server] to expose the notification bean.
|
|
2. Add `jmx` to the `notification.enabled.channels` property in the connector configuration.
|
|
3. Connect your preferred JMX client to the MBean Server.
|
|
|
|
Notifications are exposed through the `Notifications` attribute of a bean with the name `debezium.__<connector-type>__.management.notifications.__<server>__`.
|
|
|
|
The following image shows a notification that reports the start of an incremental snapshot:
|
|
|
|
image::jmx-notification-attribute.png[Fields in the JMX `Notifications` attribute]
|
|
|
|
To discard a notification, call the `reset` operation on the bean.
|
|
|
|
The notifications are also exposed as a JMX notification with type `debezium.notification`.
|
|
To enable an application to listen for the JMX notifications that an MBean emits, link:https://docs.oracle.com/javase/tutorial/jmx/notifs/index.html[subscribe the application to the notifications].
|
|
|
|
ifdef::community[]
|
|
// Type: assembly
|
|
// Title: Setting up custom channels to deliver {prodname} notifications
|
|
// ModuleID: debezium-notifications-setting-up-custom-channels-to-deliver-notifications
|
|
[id="debezium-notification-custom-channel"]
|
|
== Custom notification channels
|
|
|
|
The notification mechanism is designed to be extensible.
|
|
You can implement channels as needed to deliver notifications in a manner that works best in your environment.
|
|
Adding a notification channel involves several steps:
|
|
|
|
1. xref:debezium-configuring-custom-notification-channels[Create a Java project for the channel] to implement the channel, and xref:debezium-core-module-dependency[add `{prodname} Core` as a dependency].
|
|
2. xref:deploying-a-debezium-custom-notification-channel[Deploy the notification channel].
|
|
3. xref:configuring-connectors-to-use-a-custom-notification-channel[Enable connectors to use the custom notification channel by modifying the connector configuration].
|
|
|
|
// Type: procedure
|
|
// ModuleID: debezium-notifications-configuring-custom-notification-channels
|
|
// Title: Configuring {prodname} custom notification channels
|
|
[id="debezium-configuring-custom-notification-channels"]
|
|
=== Configuring custom notification channels
|
|
|
|
Custom notification channels are Java classes that implement the `io.debezium.pipeline.notification.channels.NotificationChannel` service provider interface (SPI).
|
|
For example:
|
|
[source,java,indent=0]
|
|
----
|
|
public interface NotificationChannel {
|
|
|
|
String name(); // <1>
|
|
|
|
void init(CommonConnectorConfig config); // <2>
|
|
|
|
void send(Notification notification); // <3>
|
|
|
|
void close(); // <4>
|
|
}
|
|
----
|
|
[cols="1,7a",options="header",subs="+attributes"]
|
|
|===
|
|
|Item |Description
|
|
|
|
|1
|
|
|The name of the channel.
|
|
To enable {prodname} to use the channel, specify this name in the connector's `notification.enabled.channels` property.
|
|
|
|
|2
|
|
|Initializes specific configuration, variables, or connections that the channel requires.
|
|
|
|
|3
|
|
|ends the notification on the channel.
|
|
{prodname} calls this method to report its status.
|
|
|
|
|4
|
|
|Closes all allocated resources.
|
|
{prodname} calls this method when the connector is stopped.
|
|
|
|
|===
|
|
|
|
// Type: concept
|
|
[id="debezium-core-module-dependency"]
|
|
=== {prodname} core module dependencies
|
|
|
|
A custom notification channel Java project has compile dependencies on the {prodname} core module.
|
|
You must include these compile dependencies in your project's `pom.xml` file, as shown in the following example:
|
|
|
|
[source,xml]
|
|
----
|
|
<dependency>
|
|
<groupId>io.debezium</groupId>
|
|
<artifactId>debezium-core</artifactId>
|
|
<version>${version.debezium}</version> // <1>
|
|
</dependency>
|
|
----
|
|
[cols="1,7",options="header",subs="+attributes"]
|
|
|===
|
|
|Item |Description
|
|
|
|
|1
|
|
|`${version.debezium}` represents the version of the {prodname} connector.
|
|
|
|
|===
|
|
|
|
Declare your implementation in the `META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel` file.
|
|
|
|
// Type: procedure
|
|
[id="deploying-a-debezium-custom-notification-channel"]
|
|
=== Deploying a custom notification channel
|
|
|
|
.Prerequisites
|
|
* You have a custom notification channel Java program.
|
|
|
|
.Procedure
|
|
* To use a notification channel with a {prodname} connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. +
|
|
+
|
|
For example, in a typical deployment, the {prodname} connector files are stored in subdirectories of a Kafka Connect directory (`/kafka/connect`), with each connector JAR in its own subdirectory (`/kafka/connect/debezium-connector-db2`, `/kafka/connect/debezium-connector-mysql`, and so forth).
|
|
To use a signaling channel with a connector, add the converter JAR file to the connector's subdirectory.
|
|
|
|
NOTE: To use a custom notification channel with multiple connectors, you must place a copy of the notification channel JAR file in each connector subdirectory.
|
|
|
|
// Type: procedure
|
|
[id="configuring-connectors-to-use-a-custom-notification-channel"]
|
|
=== Configuring connectors to use a custom notification channel
|
|
|
|
In the connector configuration, add the name of the custom notification channel to the `notification.enabled.channels` property.
|
|
endif::community[]
|