DBZ-1297 Expand JSON escaped string as JSON

This commit is contained in:
Laurent Broudoux 2021-09-17 16:49:15 +02:00 committed by Gunnar Morling
parent 3af50c66bb
commit 7b7a1f3064
6 changed files with 399 additions and 2 deletions

View File

@ -26,6 +26,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
@ -70,6 +73,9 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private boolean onlyHeadersInOutputMessage = false;
private boolean expandJSONPayload;
private ObjectMapper objectMapper;
private SmtManager<R> smtManager;
@Override
@ -127,6 +133,27 @@ public R apply(R r) {
Headers headers = r.headers();
headers.add("id", eventId, eventIdField.schema());
// Check to expand JSON string into real JSON.
if (expandJSONPayload) {
if (!(payload instanceof String)) {
LOGGER.warn("Expand JSON payload is turned on but payload is not a string in {}", r.key());
}
else {
final String payloadString = (String) payload;
try {
// Parse and get Jackson JsonNode.
final JsonNode jsonPayload = parseJSONPayload(payloadString);
// Build a new Schema and new payload Struct that replace existing ones.
payloadSchema = SchemaBuilderUtil.jsonNodeToSchema(jsonPayload);
payload = StructBuilderUtil.jsonNodeToStruct(jsonPayload, payloadSchema);
}
catch (Exception e) {
LOGGER.warn("ExpandJSONPayload: " + e.getMessage(), e);
}
}
}
final Schema structValueSchema = onlyHeadersInOutputMessage ? null
: (fieldSchemaVersion == null)
? getValueSchema(eventValueSchema, eventStruct.getString(routeByField))
@ -254,6 +281,14 @@ private void handleUnexpectedOperation(R r) {
}
}
/** */
private JsonNode parseJSONPayload(String jsonString) throws Exception {
if (jsonString.startsWith("{") || jsonString.startsWith("[")) {
return objectMapper.readTree(jsonString);
}
throw new Exception("Unable to parse payload starting with '" + jsonString.charAt(0) + "'");
}
@Override
public ConfigDef config() {
return EventRouterConfigDefinition.configDef();
@ -281,6 +316,11 @@ public void configure(Map<String, ?> configMap) {
invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(
config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR));
expandJSONPayload = config.getBoolean(EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
if (expandJSONPayload) {
objectMapper = new ObjectMapper();
}
fieldEventId = config.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
fieldEventKey = config.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);

View File

@ -227,6 +227,16 @@ public String getAlias() {
" in case something else is processed this transform can log it as warning, error or stop the" +
" process");
static final Field EXPAND_JSON_PAYLOAD = Field.create("debezium.expand.json.payload")
.withDisplayName("Expand Payload escaped string as real JSON")
.withType(ConfigDef.Type.BOOLEAN)
.withDefault(false)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Whether or not to try unescaping a JSON string and make it real JSON. It will infer schema information" +
" from payload and update the record schema accordingly. If content is not JSON, it just produces a warning" +
" and emits the record unchanged");
static final Field[] CONFIG_FIELDS = {
FIELD_EVENT_ID,
FIELD_EVENT_KEY,
@ -240,7 +250,8 @@ public String getAlias() {
ROUTE_TOPIC_REGEX,
ROUTE_TOPIC_REPLACEMENT,
ROUTE_TOMBSTONE_ON_EMPTY_PAYLOAD,
OPERATION_INVALID_BEHAVIOR
OPERATION_INVALID_BEHAVIOR,
EXPAND_JSON_PAYLOAD
};
/**
@ -265,7 +276,7 @@ public static ConfigDef configDef() {
Field.group(
config,
"Debezium",
OPERATION_INVALID_BEHAVIOR);
OPERATION_INVALID_BEHAVIOR, EXPAND_JSON_PAYLOAD);
Field.group(
config,
"Tracing",

View File

@ -0,0 +1,151 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.outbox;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.NullNode;
/**
* JSON payload SchemaBuilder util for Debezium Outbox Transform Event Router.
*
* @author Laurent Broudoux (laurent.broudoux@gmail.com)
*/
public class SchemaBuilderUtil {
/**
* Build a new connect Schema inferring structure and types from Json document.
* @param document A Jackson JsonNode to extract schema from
* @return A new Schema matching this Json node.
*/
public static Schema jsonNodeToSchema(JsonNode document) {
return jsonNodeToSchemaBuilder(document).build();
}
private static SchemaBuilder jsonNodeToSchemaBuilder(JsonNode document) {
final SchemaBuilder schemaBuilder = SchemaBuilder.struct().optional();
if (document != null) {
Iterator<Entry<String, JsonNode>> fieldsEntries = document.fields();
while (fieldsEntries.hasNext()) {
Entry<String, JsonNode> fieldEntry = fieldsEntries.next();
addFieldSchema(fieldEntry, schemaBuilder);
}
}
return schemaBuilder;
}
private static void addFieldSchema(Entry<String, JsonNode> fieldEntry, SchemaBuilder builder) {
final String fieldName = fieldEntry.getKey();
final JsonNode fieldValue = fieldEntry.getValue();
final Schema fieldSchema = jsonValueToSchema(fieldValue);
if (fieldSchema != null) {
builder.field(fieldName, fieldSchema);
}
}
private static Schema jsonValueToSchema(JsonNode node) {
switch (node.getNodeType()) {
case STRING:
return Schema.OPTIONAL_STRING_SCHEMA;
case BOOLEAN:
return Schema.OPTIONAL_BOOLEAN_SCHEMA;
case NUMBER:
if (node.isInt()) {
return Schema.OPTIONAL_INT32_SCHEMA;
}
if (node.isLong()) {
return Schema.OPTIONAL_INT64_SCHEMA;
}
if (node.isFloat()) {
return Schema.OPTIONAL_FLOAT64_SCHEMA;
}
if (node.isDouble()) {
return Schema.OPTIONAL_FLOAT64_SCHEMA;
}
return Schema.OPTIONAL_FLOAT64_SCHEMA;
case ARRAY:
return SchemaBuilder.array(findArrayMemberSchema((ArrayNode) node)).optional().build();
case OBJECT:
return jsonNodeToSchema(node);
default:
return null;
}
}
/** */
private static JsonNode getFirstArrayElement(ArrayNode array) throws ConnectException {
JsonNode refNode = NullNode.getInstance();
JsonNodeType refType = null;
// Get first non null element type and check other member types.
Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
JsonNode element = elements.next();
if (refType == null) {
refType = element.isNull() ? null : element.getNodeType();
refNode = element;
}
else {
if (!element.isNull() && element.getNodeType() != refType) {
throw new ConnectException(String.format("Field is not a homogenous array (%s x %s).",
refNode.asText(), element.getNodeType().toString()));
}
}
}
return refNode;
}
private static Schema findArrayMemberSchema(ArrayNode array) throws ConnectException {
if (array.isEmpty()) {
return Schema.OPTIONAL_STRING_SCHEMA;
}
final JsonNode sample = getFirstArrayElement(array);
if (sample.isObject()) {
return buildDocumentUnionSchema(array);
}
final Schema schema = jsonValueToSchema(sample);
if (schema == null) {
throw new ConnectException(String.format("Array '%s' has unrecognized member schema.",
array.asText()));
}
return schema;
}
private static Schema buildDocumentUnionSchema(ArrayNode array) {
SchemaBuilder builder = null;
Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
JsonNode element = elements.next();
if (!element.isObject()) {
continue;
}
if (builder == null) {
builder = jsonNodeToSchemaBuilder(element);
continue;
}
Iterator<Entry<String, JsonNode>> fieldsEntries = element.fields();
while (fieldsEntries.hasNext()) {
Entry<String, JsonNode> fieldEntry = fieldsEntries.next();
addFieldSchema(fieldEntry, builder);
}
}
return builder.build();
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.transforms.outbox;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
/**
* JSON payload Struct build util util for Debezium Outbox Transform Event Router.
*
* @author Laurent Broudoux (laurent.broudoux@gmail.com)
*/
public class StructBuilderUtil {
/**
* Convert a Jackson JsonNode into a new Struct according the schema.
* @param document The JSON document to convert
* @param schema The Schema for this document
* @return A new connect Struct for the JSON node.
*/
public static Struct jsonNodeToStruct(JsonNode document, Schema schema) {
if (document == null) {
return null;
}
return jsonNodeToStructInternal(document, schema);
}
private static Struct jsonNodeToStructInternal(JsonNode document, Schema schema) {
final Struct struct = new Struct(schema);
for (Field field : schema.fields()) {
if (document.has(field.name())) {
struct.put(field.name(),
getStructFieldValue(document.path(field.name()), field.schema()));
}
}
return struct;
}
private static Object getStructFieldValue(JsonNode node, Schema schema) {
switch (node.getNodeType()) {
case STRING:
return node.asText();
case BOOLEAN:
return node.asBoolean();
case NUMBER:
if (node.isFloat()) {
return node.floatValue();
}
if (node.isDouble()) {
return node.asDouble();
}
if (node.isInt()) {
return node.asInt();
}
if (node.isLong()) {
return node.asLong();
}
return node.decimalValue();
case ARRAY:
return getArrayAsList((ArrayNode) node, schema);
case OBJECT:
return jsonNodeToStructInternal(node, schema);
default:
return null;
}
}
private static List getArrayAsList(ArrayNode array, Schema schema) {
List arrayObjects = new ArrayList(array.size());
Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
arrayObjects.add(getStructFieldValue(elements.next(), schema.valueSchema()));
}
return arrayObjects;
}
}

View File

@ -897,6 +897,65 @@ public void noTombstoneIfNotConfigured() {
assertThat(eventRoutedTombstone.valueSchema()).isNotNull();
}
@Test
public void canExpandJSONPayloadIfConfigured() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(),
"true");
router.configure(config);
final SourceRecord eventRecord = createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42, \"pets\": [\"dog\", \"cat\"]}",
new HashMap<>(),
new HashMap<>());
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(eventRouted).isNotNull();
Schema valueSchema = eventRouted.valueSchema();
assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type());
assertThat(valueSchema.fields().size()).isEqualTo(4);
assertThat(valueSchema.field("fullName").schema().type().getName()).isEqualTo("string");
assertThat(valueSchema.field("rating").schema().type().getName()).isEqualTo("float64");
assertThat(valueSchema.field("age").schema().type().getName()).isEqualTo("int32");
assertThat(valueSchema.field("pets").schema().type().getName()).isEqualTo("array");
Struct valueStruct = (Struct) eventRouted.value();
assertThat(valueStruct.get("fullName")).isEqualTo("John Doe");
assertThat(valueStruct.get("rating")).isEqualTo(4.9);
assertThat(valueStruct.get("age")).isEqualTo(42);
assertThat(valueStruct.getArray("pets").size()).isEqualTo(2);
assertThat(valueStruct.getArray("pets").get(1)).isEqualTo("cat");
}
@Test
public void shouldNotExpandJSONPayloadIfNotConfigured() {
final EventRouter<SourceRecord> router = new EventRouter<>();
router.configure(new HashMap<>());
final SourceRecord eventRecord = createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42}",
new HashMap<>(),
new HashMap<>());
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(eventRouted).isNotNull();
assertThat(eventRouted.valueSchema().type()).isEqualTo(SchemaBuilder.string().type());
assertThat(eventRouted.value()).isEqualTo("{\"fullName\": \"John Doe\", \"rating\": 4.9, \"age\": 42}");
}
private SourceRecord createEventRecord() {
return createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",

View File

@ -264,6 +264,49 @@ transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=type:envelope:eventType
----
// Type: concept
// Title: Expanding escaped JSON String as JSON
// ModuleID: expanding-escaped-json-string-as-json
[[expanding-escaped-json-string-as-json]]
== Expanding escaped JSON String as JSON
You may have noticed that the Debezium outbox message contains the `payload` represented as a String.
So when this string, is actually JSON, it appears as escaped in the result Kafka message like shown below:
[source,javascript,indent=0]
----
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
----
The outbox event router allows you to expand this message content to "real" JSON with the companion schema
being deduced from the JSON document itself. That way the result in Kafka message looks like:
[source,javascript,indent=0]
----
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123}"
}
----
To enable this transformation, you have to set the `debezium.expand.json.payload` to true like below:
[source]
----
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.debezium.expand.json.payload=true
----
// Type: reference
// ModuleID: options-for-configuring-outbox-event-router-transformation
// Title: Options for configuring outbox event router transformation
@ -364,6 +407,12 @@ a|Determines the behavior of the SMT when there is an `UPDATE` operation on the
All changes in an outbox table are expected to be `INSERT` operations. That is, an outbox table functions as a queue; updates to records in an outbox table are not allowed.
The SMT automatically filters out `DELETE` operations on an outbox table.
|[[outbox-event-router-property-debezium-expand-json-payload]]<<outbox-event-router-property-debezium-expand-json-payload, `debezium.expand.json.payload`>>
|`false`
|{prodname}
a| Indicates whether the JSON expansion of a String payload should be done. If no content found or in case of parsing error, the content is kept "as is".
ifdef::community[]
|[[outbox-event-router-property-tracing-span-context-field]]<<outbox-event-router-property-tracing-span-context-field, `tracing.span.context.field`>>
|`tracingspancontext`