DBZ-6742 use JSON format for JMX Notitication userData

This commit is contained in:
“vsantonastaso” 2023-08-18 12:01:29 +02:00 committed by Jiri Pechanec
parent 0273667d27
commit 43adc59292
4 changed files with 28 additions and 5 deletions

View File

@ -221,10 +221,10 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification()
assertThat(jmxNotifications).hasSize(2);
assertThat(jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(0).getUserData())
.isEqualTo("{aggregateType='Initial Snapshot', type='STARTED', additionalData={connector_name=mongo1}}");
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}");
assertThat(jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(1).getUserData())
.isEqualTo("{aggregateType='Initial Snapshot', type='COMPLETED', additionalData={connector_name=mongo1}}");
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}");
}
private List<Notification> readNotificationFromJmx()

View File

@ -9,13 +9,26 @@
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.pipeline.notification.channels.jmx.JmxNotificationChannel;
public class Notification {
private static final Logger LOGGER = LoggerFactory.getLogger(JmxNotificationChannel.class);
public static final String ID_KEY = "id";
public static final String TYPE = "type";
public static final String AGGREGATE_TYPE = "aggregate_type";
public static final String ADDITIONAL_DATA = "additional_data";
public static final ObjectMapper MAPPER = new ObjectMapper();
@JsonIgnore
private final String id;
private final String aggregateType;
private final String type;
@ -73,6 +86,16 @@ public int hashCode() {
return Objects.hash(id, aggregateType, type, additionalData);
}
public String toJson() {
try {
return MAPPER.writeValueAsString(this);
}
catch (JsonProcessingException e) {
LOGGER.warn("Error converting the notification object to json. Providing default toString value...", e);
return toString();
}
}
public static final class Builder {
private String id;
private String aggregateType;

View File

@ -67,7 +67,7 @@ private javax.management.Notification buildJmxNotification(Notification notifica
System.currentTimeMillis(),
composeMessage(notification));
n.setUserData(notification.toString());
n.setUserData(notification.toJson());
return n;
}

View File

@ -189,10 +189,10 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification()
assertThat(jmxNotifications).hasSize(2);
assertThat(jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(0).getUserData())
.isEqualTo("{aggregateType='Initial Snapshot', type='STARTED', additionalData={connector_name=" + server() + "}}");
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"" + server() + "\"}}");
assertThat(jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(1).getUserData())
.isEqualTo("{aggregateType='Initial Snapshot', type='COMPLETED', additionalData={connector_name=" + server() + "}}");
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"" + server() + "\"}}");
}
private List<Notification> readNotificationFromJmx()