From 43adc59292841a0ec129fea1bf68b8b9bb41186d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Cvsantonastaso=E2=80=9D?= <“vincenzo.santonastaso”@gmail.com> Date: Fri, 18 Aug 2023 12:01:29 +0200 Subject: [PATCH] DBZ-6742 use JSON format for JMX Notitication userData --- .../connector/mongodb/NotificationsIT.java | 4 ++-- .../pipeline/notification/Notification.java | 23 +++++++++++++++++++ .../channels/jmx/JmxNotificationChannel.java | 2 +- .../notification/AbstractNotificationsIT.java | 4 ++-- 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java index 46aeed396..0c5cf9045 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java @@ -221,10 +221,10 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification() assertThat(jmxNotifications).hasSize(2); assertThat(jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification"); assertThat(jmxNotifications.get(0).getUserData()) - .isEqualTo("{aggregateType='Initial Snapshot', type='STARTED', additionalData={connector_name=mongo1}}"); + .isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}"); assertThat(jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification"); assertThat(jmxNotifications.get(1).getUserData()) - .isEqualTo("{aggregateType='Initial Snapshot', type='COMPLETED', additionalData={connector_name=mongo1}}"); + .isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}"); } private List readNotificationFromJmx() diff --git a/debezium-core/src/main/java/io/debezium/pipeline/notification/Notification.java b/debezium-core/src/main/java/io/debezium/pipeline/notification/Notification.java index 5b0215416..11352fea9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/notification/Notification.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/notification/Notification.java @@ -9,13 +9,26 @@ import java.util.Map; import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.debezium.pipeline.notification.channels.jmx.JmxNotificationChannel; + public class Notification { + private static final Logger LOGGER = LoggerFactory.getLogger(JmxNotificationChannel.class); + public static final String ID_KEY = "id"; public static final String TYPE = "type"; public static final String AGGREGATE_TYPE = "aggregate_type"; public static final String ADDITIONAL_DATA = "additional_data"; + public static final ObjectMapper MAPPER = new ObjectMapper(); + @JsonIgnore private final String id; private final String aggregateType; private final String type; @@ -73,6 +86,16 @@ public int hashCode() { return Objects.hash(id, aggregateType, type, additionalData); } + public String toJson() { + try { + return MAPPER.writeValueAsString(this); + } + catch (JsonProcessingException e) { + LOGGER.warn("Error converting the notification object to json. Providing default toString value...", e); + return toString(); + } + } + public static final class Builder { private String id; private String aggregateType; diff --git a/debezium-core/src/main/java/io/debezium/pipeline/notification/channels/jmx/JmxNotificationChannel.java b/debezium-core/src/main/java/io/debezium/pipeline/notification/channels/jmx/JmxNotificationChannel.java index 9f20350d9..99efc6e0f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/notification/channels/jmx/JmxNotificationChannel.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/notification/channels/jmx/JmxNotificationChannel.java @@ -67,7 +67,7 @@ private javax.management.Notification buildJmxNotification(Notification notifica System.currentTimeMillis(), composeMessage(notification)); - n.setUserData(notification.toString()); + n.setUserData(notification.toJson()); return n; } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java index 36e361eee..a29c35ee1 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/notification/AbstractNotificationsIT.java @@ -189,10 +189,10 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification() assertThat(jmxNotifications).hasSize(2); assertThat(jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification"); assertThat(jmxNotifications.get(0).getUserData()) - .isEqualTo("{aggregateType='Initial Snapshot', type='STARTED', additionalData={connector_name=" + server() + "}}"); + .isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"" + server() + "\"}}"); assertThat(jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification"); assertThat(jmxNotifications.get(1).getUserData()) - .isEqualTo("{aggregateType='Initial Snapshot', type='COMPLETED', additionalData={connector_name=" + server() + "}}"); + .isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"" + server() + "\"}}"); } private List readNotificationFromJmx()