DBZ-3528 Misc. clean-up

This commit is contained in:
Gunnar Morling 2021-11-17 15:11:53 +01:00
parent 78c92c3772
commit ca95b424fb
2 changed files with 15 additions and 18 deletions

View File

@ -74,7 +74,7 @@ public static interface RecordConverter<R> {
private boolean onlyHeadersInOutputMessage = false;
private boolean expandJSONPayload;
private boolean expandJsonPayload;
private ObjectMapper objectMapper;
private SmtManager<R> smtManager;
@ -136,7 +136,7 @@ public R apply(R r, RecordConverter<R> recordConverter) {
headers.add("id", eventId, eventIdField.schema());
// Check to expand JSON string into real JSON.
if (expandJSONPayload) {
if (expandJsonPayload) {
if (!(payload instanceof String)) {
LOGGER.warn("Expand JSON payload is turned on but payload is not a string in {}", r.key());
}
@ -145,13 +145,13 @@ public R apply(R r, RecordConverter<R> recordConverter) {
try {
// Parse and get Jackson JsonNode.
final JsonNode jsonPayload = parseJSONPayload(payloadString);
final JsonNode jsonPayload = parseJsonPayload(payloadString);
// Build a new Schema and new payload Struct that replace existing ones.
payloadSchema = SchemaBuilderUtil.jsonNodeToSchema(jsonPayload);
payload = StructBuilderUtil.jsonNodeToStruct(jsonPayload, payloadSchema);
}
catch (Exception e) {
LOGGER.warn("ExpandJSONPayload: " + e.getMessage(), e);
LOGGER.warn("JSON expansion failed", e);
}
}
}
@ -283,10 +283,7 @@ private void handleUnexpectedOperation(R r) {
}
}
/**
*
*/
private JsonNode parseJSONPayload(String jsonString) throws Exception {
private JsonNode parseJsonPayload(String jsonString) throws Exception {
if (jsonString.startsWith("{") || jsonString.startsWith("[")) {
return objectMapper.readTree(jsonString);
}
@ -317,8 +314,8 @@ public void configure(Map<String, ?> configMap) {
invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(
config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR));
expandJSONPayload = config.getBoolean(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
if (expandJSONPayload) {
expandJsonPayload = config.getBoolean(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
if (expandJsonPayload) {
objectMapper = new ObjectMapper();
}

View File

@ -132,25 +132,25 @@ To apply the default MongoDB outbox event router SMT configuration, your outbox
|`id`
|Contains the unique ID of the event. In an outbox message, this value is a header. You can use this ID, for example, to remove duplicate messages. +
+
+
To obtain the unique ID of the event from a different outbox collection field, set the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-collection-field-event-id[`collection.field.event.id` SMT option] in the connector configuration.
|[[route-by-field-example]]`aggregatetype`
|Contains a value that the SMT appends to the name of the topic to which the connector emits an outbox message. The default behavior is that this value replaces the default `pass:[${routedByValue}]` variable in the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-route-topic-replacement[`route.topic.replacement`] SMT option. +
+
+
For example, in a default configuration, the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-route-by-field[`route.by.field`] SMT option is set to `aggregatetype` and the xref:mongodb-outbox-event-router-property-route-topic-replacement[`route.topic.replacement`] SMT option is set to `outbox.event.pass:[${routedByValue}]`.
Suppose that your application adds two documents to the outbox collection. In the first document, the value in the `aggregatetype` field is `customers`.
In the second document, the value in the `aggregatetype` field is `orders`.
The connector emits the first document to the `outbox.event.customers` topic.
The connector emits the second document to the `outbox.event.orders` topic. +
+
+
To obtain this value from a different outbox collection field, set the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-route-by-field[`route.by.field` SMT option] in the connector configuration.
|`aggregateid`
|Contains the event key, which provides an ID for the payload.
The SMT uses this value as the key in the emitted outbox message.
This is important for maintaining correct order in Kafka partitions. +
+
+
To obtain the event key from a different outbox collection field, set the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-collection-field-event-key[`collection.field.event.key` SMT option] in the connector configuration.
|`payload`
@ -159,12 +159,12 @@ The default structure is JSON.
By default, the Kafka message value is solely comprised of the `payload` value.
However, if the outbox event is configured to include additional fields, the Kafka message value contains an envelope encapsulating both payload and the additional fields, and each field is represented separately.
For more information, see xref:emitting-messages-with-additional-fields[Emitting messages with additional fields]. +
+
+
To obtain the event payload from a different outbox collection field, set the {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-property-collection-field-event-payload[`collection.field.event.payload` SMT option] in the connector configuration.
|Additional custom fields
|Any additional fields from the outbox collection can be {link-prefix}:{link-mongodb-outbox-event-router}#mongodb-outbox-event-router-emitting-messages-with-additional-fields[added to outbox events] either within the payload section or as a message header. +
+
+
One example could be a field `eventType` which conveys a user-defined value that helps to categorize or organize events.
|===
@ -194,7 +194,7 @@ Because the structure of these other messages differs from the structure of the
You can use one of the following methods to configure the connector to apply the SMT selectively:
* {link-prefix}:{link-smt-predicates}#applying-transformation-selectively[Configure an SMT predicate for the transformation].
* Use the xref:mongodb-outbox-event-router-property-route-topic-regex[route.topic.regex] configuration option for the SMT.
* Use the xref:mongodb-outbox-event-router-property-route-topic-regex[`route.topic.regex`] configuration option for the SMT.
ifdef::community[]
// Type: concept
@ -388,7 +388,7 @@ The default topic name is `outbox.event.` followed by the `aggregatetype` field
To change the topic name, you can: +
* Set the xref:mongodb-outbox-event-router-property-route-by-field[`route.by.field`] option to a different field.
* Set the xref:mongodb-outbox-event-router-property-route-topic-regex[route.topic.regex] option to a different regular expression.
* Set the xref:mongodb-outbox-event-router-property-route-topic-regex[`route.topic.regex`] option to a different regular expression.
|[[mongodb-outbox-event-router-property-route-tombstone-on-empty-payload]]<<mongodb-outbox-event-router-property-route-tombstone-on-empty-payload, `route.tombstone.on.empty.payload`>>
|`false`