DBZ-6793 Add timestamp for notifications into documentation

This commit is contained in:
mfvitale 2023-09-27 15:14:12 +02:00 committed by Chris Cranford
parent 993413955c
commit 56795732b5
3 changed files with 54 additions and 16 deletions

View File

@ -8,9 +8,11 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -32,11 +34,15 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.MongoDbConnectorConfig.SnapshotMode;
@ -130,10 +136,12 @@ public void notificationCorrectlySentOnItsTopic() {
Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("STARTED");
Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
sourceRecord = notifications.get(1);
Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")).isEqualTo("Initial Snapshot");
Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("COMPLETED");
Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
}
@Test
@ -186,10 +194,12 @@ public void notificationCorrectlySentOnJmx()
assertThat(notifications).hasSize(2);
assertThat(notifications.get(0))
.hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot")
.hasFieldOrPropertyWithValue("type", "STARTED");
.hasFieldOrPropertyWithValue("type", "STARTED")
.hasFieldOrProperty("timestamp");
assertThat(notifications.get(1))
.hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot")
.hasFieldOrPropertyWithValue("type", "COMPLETED");
.hasFieldOrPropertyWithValue("type", "COMPLETED")
.hasFieldOrProperty("timestamp");
resetNotifications();
@ -200,9 +210,10 @@ public void notificationCorrectlySentOnJmx()
@Test
public void emittingDebeziumNotificationWillGenerateAJmxNotification()
throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException,
MBeanException, InterruptedException {
MBeanException, InterruptedException, JsonProcessingException {
// Testing.Print.enable();
ObjectMapper mapper = new ObjectMapper();
startConnector(config -> config
.with(CommonConnectorConfig.SNAPSHOT_DELAY_MS, 10000)
@ -220,11 +231,21 @@ public void emittingDebeziumNotificationWillGenerateAJmxNotification()
assertThat(jmxNotifications).hasSize(2);
assertThat(jmxNotifications.get(0)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(0).getUserData())
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"STARTED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}");
Notification notification = mapper.readValue(jmxNotifications.get(0).getUserData().toString(), Notification.class);
assertThat(notification)
.hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot")
.hasFieldOrPropertyWithValue("type", "STARTED")
.hasFieldOrPropertyWithValue("additionalData", Map.of("connector_name", "mongo1"));
assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
assertThat(jmxNotifications.get(1)).hasFieldOrPropertyWithValue("message", "Initial Snapshot generated a notification");
assertThat(jmxNotifications.get(1).getUserData())
.isEqualTo("{\"aggregateType\":\"Initial Snapshot\",\"type\":\"COMPLETED\",\"additionalData\":{\"connector_name\":\"mongo1\"}}");
notification = mapper.readValue(jmxNotifications.get(1).getUserData().toString(), Notification.class);
assertThat(notification)
.hasFieldOrPropertyWithValue("aggregateType", "Initial Snapshot")
.hasFieldOrPropertyWithValue("type", "COMPLETED")
.hasFieldOrPropertyWithValue("additionalData", Map.of("connector_name", "mongo1"));
assertThat(notification.getTimestamp()).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1));
}
private List<Notification> readNotificationFromJmx()

View File

@ -50,6 +50,7 @@ private SourceRecord buildRecord() {
.field("type", SchemaBuilder.STRING_SCHEMA)
.field("aggregate_type", SchemaBuilder.STRING_SCHEMA)
.field("additional_data", SchemaBuilder.map(SchemaBuilder.STRING_SCHEMA, SchemaBuilder.STRING_SCHEMA))
.field("timestamp", Schema.INT64_SCHEMA)
.build();
Struct key = new Struct(keySchema).put("id", NOTIFICATION_ID);
@ -57,7 +58,8 @@ private SourceRecord buildRecord() {
.put("id", NOTIFICATION_ID)
.put("type", "Test")
.put("aggregate_type", "Test")
.put("additional_data", Map.of("k1", "v1"));
.put("additional_data", Map.of("k1", "v1"))
.put("timestamp", 1695817046353L);
return new SourceRecord(Map.of(), Map.of(), "notificationTopic", null, keySchema, key, valueSchema, value);
}
@ -99,6 +101,7 @@ public void notificationSentOnKafkaChannelWillBeCorrectlyProcessed() {
.withType("Test")
.withAggregateType("Test")
.withAdditionalData(Map.of("k1", "v1"))
.withTimestamp(1695817046353L)
.build());
assertThat(isConsumerCalled).isTrue();

View File

@ -56,6 +56,9 @@ In domain-driven design, exported events should always refer to an aggregate.
|additional_data
|A Map<String,String> with detailed information about the notification.
For an example, see xref:debezium-notifications-about-the-progress-of-incremental-snapshots[{prodname} notifications about the progress of incremental snapshots].
|timestamp
|The time when the notification has been created. Epoch unix timestamp in milliseconds
|===
// Type: assembly
@ -77,7 +80,11 @@ The following example shows a typical notification that provides the status of a
{
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
"aggregate_type": "Initial Snapshot",
"type": "COMPLETED" <1>
"type": "COMPLETED" <1>,
"additional_data" : {
"connector_name": "myConnector"
},
"timestamp": "1695817046353"
}
----
<1> The type field can contain one of the following values:
@ -106,7 +113,8 @@ a|[source, json]
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
}
},
"timestamp": "1695817046353"
}
----
|Paused
@ -119,7 +127,8 @@ a|[source, json]
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
}
},
"timestamp": "1695817046353"
}
----
|Resumed
@ -132,7 +141,8 @@ a|[source, json]
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
}
},
"timestamp": "1695817046353"
}
----
|Stopped
@ -144,7 +154,8 @@ a|[source, json]
"type":"ABORTED",
"additional_data":{
"connector_name":"my-connector"
}
},
"timestamp": "1695817046353"
}
----
|Processing chunk
@ -160,7 +171,8 @@ a|[source, json]
"current_collection_in_progress":"table1",
"maximum_key":"100",
"last_processed_key":"50"
}
},
"timestamp": "1695817046353"
}
----
|Snapshot completed for a table
@ -176,7 +188,8 @@ a|[source, json]
"scanned_collection":"table1",
"total_rows_scanned":"100",
"status":"SUCCEEDED" // <1>
}
},
"timestamp": "1695817046353"
}
----
<1> The possible values are:
@ -195,7 +208,8 @@ a|[source, json]
"type":"COMPLETED",
"additional_data":{
"connector_name":"my-connector"
}
},
"timestamp": "1695817046353"
}
----
|===