// Category: debezium-using // Type: assembly // Title: Configuring notifications to report connector status // ModuleID: configuring-notifications-to-report-connector-status [id="debezium-notification"] = {prodname} notifications ifdef::community[] :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] == Overview endif::community[] {prodname} notifications provide a mechanism to obtain status information about the connector. Notifications can be sent to the following channels: SinkNotificationChannel:: Sends notifications through the Connect API to a configured topic. LogNotificationChannel:: Notifications are appended to the log. JmxNotificationChannel:: Notifications are exposed as an attribute in a JMX bean. ifdef::community[] Custom:: Notifications are sent to a xref:debezium-notification-custom-channel[custom channel] that you implement. endif::community[] ifdef::product[] For details about {prodname} notifications, see the following topics:: * xref:debezium-notifications-description-of-the-format-of-debezium-notifications[] * xref:debezium-notifications-types-of-debezium-notifications[] * xref:debezium-notifications-enabling-debezium-to-emit-events-to-notification-channels[] // * xref:debezium-notification-custom-channel[] endif::product[] // Type: concept // ModuleID: debezium-notifications-description-of-the-format-of-debezium-notifications // Title: Description of the format of {prodname} notifications [id="debezium-notification-format"] == {prodname} notification format Notification messages contain the following information: |=== |Property |Description |id |A unique identifier that is assigned to the notification. For incremental snapshot notifications, the `id` is the same sent with the `execute-snapshot` signal. |aggregate_type |The data type of the aggregate root to which a notification is related. In domain-driven design, exported events should always refer to an aggregate. |type |Provides status information about the event specified in the `aggregate_type` field. |additional_data |A Map with detailed information about the notification. For an example, see xref:debezium-notifications-about-the-progress-of-incremental-snapshots[{prodname} notifications about the progress of incremental snapshots]. |timestamp |The time when the notification has been created. Epoch unix timestamp in milliseconds |=== // Type: assembly // Title: Types of {prodname} notifications // ModuleID: debezium-notifications-types-of-debezium-notifications [id="debezium-available-notifications"] == Available notifications {prodname} notifications deliver information about the progress of xref:debezium-notifications-about-the-status-of-an-initial-snapshot[initial snapshots] or xref:debezium-notifications-about-the-progress-of-incremental-snapshots[incremental snapshots]. // Title: Example: {prodname} notification that reports on the status of an initial snapshot [id="debezium-notifications-about-the-status-of-an-initial-snapshot"] === {prodname} notifications about the status of an initial snapshot The following example shows a typical notification that provides the status of an initial snapshot: [source, json] ---- { "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99", "aggregate_type": "Initial Snapshot", "type": "COMPLETED", // <1> "additional_data" : { "connector_name": "myConnector" }, "timestamp": "1695817046353" } ---- [cols="1,7a",options="header",subs="+attributes"] |=== |Item |Description |1 |The `type` field can contain one of the following values: * `COMPLETED` * `ABORTED` * `SKIPPED` |=== The following table shows examples of the different payloads that might be present in notifications that report the status of initial snapshots: |=== |Status|Payload |STARTED a|[source, json] ---- { "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"STARTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |IN_PROGRESS a|[source, json] ---- { "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Initial Snapshot", "type":"IN_PROGRESS", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2", "current_collection_in_progress":"table1" }, "timestamp": "1695817046353" } ---- Field `data_collection` are currently not supported for Mongo connector |TABLE_SCAN_COMPLETED a|[source, json] ---- { "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Initial Snapshot", "type":"TABLE_SCAN_COMPLETED", "additional_data":{ "connector_name":"my-connector", "data_collection":"table1, table2", "scanned_collection":"table1", "total_rows_scanned":"100", "status":"SUCCEEDED" }, "timestamp": "1695817046353" } ---- In the preceding example, the `additional_data.status` field can contain one of the following values: `SQL_EXCEPTION`:: A SQL exception occurred while performing the snapshot. `SUCCEEDED`:: The snapshot completed successfully. Fields `total_rows_scanned` and `data_collection` are currently not supported for Mongo connector |COMPLETED a|[source, json] ---- { "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"COMPLETED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |ABORTED a|[source, json] ---- { "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"ABORTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |SKIPPED a|[source, json] ---- { "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Initial Snapshot", "type":"SKIPPED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |=== // Type: reference // Title: Example: {prodname} notifications that report on the progress of incremental snapshots [id="debezium-notifications-about-the-progress-of-incremental-snapshots"] === {prodname} notifications about the progress of incremental snapshots The following table shows examples of the different payloads that might be present in notifications that report the status of incremental snapshots: |=== |Status|Payload |Start a|[source, json] ---- { "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f", "aggregate_type":"Incremental Snapshot", "type":"STARTED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } ---- |Paused a|[source, json] ---- { "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51", "aggregate_type":"Incremental Snapshot", "type":"PAUSED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } ---- |Resumed a|[source, json] ---- { "id":"a9468204-769d-430f-96d2-b0933d4839f3", "aggregate_type":"Incremental Snapshot", "type":"RESUMED", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2" }, "timestamp": "1695817046353" } ---- |Stopped a|[source, json] ---- { "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c", "aggregate_type":"Incremental Snapshot", "type":"ABORTED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |Processing chunk a|[source, json] ---- { "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744", "aggregate_type":"Incremental Snapshot", "type":"IN_PROGRESS", "additional_data":{ "connector_name":"my-connector", "data_collections":"table1, table2", "current_collection_in_progress":"table1", "maximum_key":"100", "last_processed_key":"50" }, "timestamp": "1695817046353" } ---- |Snapshot completed for a table a|[source, json] ---- { "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Incremental Snapshot", "type":"TABLE_SCAN_COMPLETED", "additional_data":{ "connector_name":"my-connector", "data_collection":"table1, table2", "scanned_collection":"table1", "total_rows_scanned":"100", "status":"SUCCEEDED" }, "timestamp": "1695817046353" } ---- In the preceding example, the `additional_data.status` field can contain one of the following values: `EMPTY`:: The table contains no values. `NO_PRIMARY_KEY`:: Cannot complete snapshot; table has no primary key. `SKIPPED`:: Cannot complete a snapshots for this type of table. Refer to the logs for details. `SQL_EXCEPTION`:: A SQL exception occurred while performing the snapshot. `SUCCEEDED`:: The snapshot completed successfully. `UNKNOWN_SCHEMA`:: Could not find a schema for the table. Check the logs for the list of known tables. |Completed a|[source, json] ---- { "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d", "aggregate_type":"Incremental Snapshot", "type":"COMPLETED", "additional_data":{ "connector_name":"my-connector" }, "timestamp": "1695817046353" } ---- |=== // Type: assembly // ModuleID: debezium-notifications-enabling-debezium-to-emit-events-to-notification-channels // Title: Enabling {prodname} to emit events to notification channels [id="enabling-debezium-notifications"] == Enabling {prodname} notifications To enable {prodname} to emit notifications, specify a list of notification channels by setting the `notification.enabled.channels` configuration property. By default, the following notification channels are available: * `sink` * `log` * `jmx` [IMPORTANT] ==== To use the `sink` notification channel, you must also set the `notification.sink.topic.name` configuration property to the name of the topic where you want {prodname} to send notifications. ==== // Type: procedure // ModuleID: enabling-debezium-notifications-to-report-events-exposed-through-jmx-beans // Title: Enabling {prodname} notifications to report events exposed through JMX beans [id="access-debezium-jmx-notifications"] === Access to {prodname} JMX notifications To enable {prodname} to report events that are exposed through JMX beans, complete the following configuration steps: 1. {link-prefix}:{link-debezium-monitoring}#monitoring-debezium[Enable the JMX MBean Server] to expose the notification bean. 2. Add `jmx` to the `notification.enabled.channels` property in the connector configuration. 3. Connect your preferred JMX client to the MBean Server. Notifications are exposed through the `Notifications` attribute of a bean with the name `debezium.____.management.notifications.____`. The following image shows a notification that reports the start of an incremental snapshot: image::jmx-notification-attribute.png[Fields in the JMX `Notifications` attribute] To discard a notification, call the `reset` operation on the bean. The notifications are also exposed as a JMX notification with type `debezium.notification`. To enable an application to listen for the JMX notifications that an MBean emits, link:https://docs.oracle.com/javase/tutorial/jmx/notifs/index.html[subscribe the application to the notifications]. ifdef::community[] // Type: assembly // Title: Setting up custom channels to deliver {prodname} notifications // ModuleID: debezium-notifications-setting-up-custom-channels-to-deliver-notifications [id="debezium-notification-custom-channel"] == Custom notification channels The notification mechanism is designed to be extensible. You can implement channels as needed to deliver notifications in a manner that works best in your environment. Adding a notification channel involves several steps: 1. xref:debezium-configuring-custom-notification-channels[Create a Java project for the channel] to implement the channel, and xref:debezium-core-module-dependency[add `{prodname} Core` as a dependency]. 2. xref:deploying-a-debezium-custom-notification-channel[Deploy the notification channel]. 3. xref:configuring-connectors-to-use-a-custom-notification-channel[Enable connectors to use the custom notification channel by modifying the connector configuration]. // Type: procedure // ModuleID: debezium-notifications-configuring-custom-notification-channels // Title: Configuring {prodname} custom notification channels [id="debezium-configuring-custom-notification-channels"] === Configuring custom notification channels Custom notification channels are Java classes that implement the `io.debezium.pipeline.notification.channels.NotificationChannel` service provider interface (SPI). For example: [source,java,indent=0] ---- public interface NotificationChannel { String name(); // <1> void init(CommonConnectorConfig config); // <2> void send(Notification notification); // <3> void close(); // <4> } ---- [cols="1,7a",options="header",subs="+attributes"] |=== |Item |Description |1 |The name of the channel. To enable {prodname} to use the channel, specify this name in the connector's `notification.enabled.channels` property. |2 |Initializes specific configuration, variables, or connections that the channel requires. |3 |ends the notification on the channel. {prodname} calls this method to report its status. |4 |Closes all allocated resources. {prodname} calls this method when the connector is stopped. |=== // Type: concept [id="debezium-core-module-dependency"] === {prodname} core module dependencies A custom notification channel Java project has compile dependencies on the {prodname} core module. You must include these compile dependencies in your project's `pom.xml` file, as shown in the following example: [source,xml] ---- io.debezium debezium-core ${version.debezium} // <1> ---- [cols="1,7",options="header",subs="+attributes"] |=== |Item |Description |1 |`${version.debezium}` represents the version of the {prodname} connector. |=== Declare your implementation in the `META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel` file. // Type: procedure [id="deploying-a-debezium-custom-notification-channel"] === Deploying a custom notification channel .Prerequisites * You have a custom notification channel Java program. .Procedure * To use a notification channel with a {prodname} connector, export the Java project to a JAR file, and copy the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. + + For example, in a typical deployment, the {prodname} connector files are stored in subdirectories of a Kafka Connect directory (`/kafka/connect`), with each connector JAR in its own subdirectory (`/kafka/connect/debezium-connector-db2`, `/kafka/connect/debezium-connector-mysql`, and so forth). To use a signaling channel with a connector, add the converter JAR file to the connector's subdirectory. NOTE: To use a custom notification channel with multiple connectors, you must place a copy of the notification channel JAR file in each connector subdirectory. // Type: procedure [id="configuring-connectors-to-use-a-custom-notification-channel"] === Configuring connectors to use a custom notification channel In the connector configuration, add the name of the custom notification channel to the `notification.enabled.channels` property. endif::community[]