DBZ-5367 Fixed the problem where JSON expanding failed for Outbox event payloads that contained nested arrays where first array was empty and further array contained a Struct

With proposed changes empty arrays are skipped, as it allows us to delay defining the schema for them until an array with elements is found, as it seems that there is no way to modify a schema that was already defined for a given field
This commit is contained in:
Paweł Malon 2022-07-06 17:26:12 +02:00 committed by Chris Cranford
parent a2d4ac470b
commit 707a43d3bc
2 changed files with 87 additions and 5 deletions

View File

@ -75,7 +75,8 @@ private static Schema jsonValueToSchema(JsonNode node) {
}
return Schema.OPTIONAL_FLOAT64_SCHEMA;
case ARRAY:
return SchemaBuilder.array(findArrayMemberSchema((ArrayNode) node)).optional().build();
ArrayNode arrayNode = (ArrayNode) node;
return arrayNode.isEmpty() ? null : SchemaBuilder.array(findArrayMemberSchema(arrayNode)).optional().build();
case OBJECT:
return jsonNodeToSchema(node);
default:
@ -108,10 +109,6 @@ private static JsonNode getFirstArrayElement(ArrayNode array) throws ConnectExce
}
private static Schema findArrayMemberSchema(ArrayNode array) throws ConnectException {
if (array.isEmpty()) {
return Schema.OPTIONAL_STRING_SCHEMA;
}
final JsonNode sample = getFirstArrayElement(array);
if (sample.isObject()) {
return buildDocumentUnionSchema(array);

View File

@ -987,6 +987,91 @@ public void canExpandJsonPayloadIfConfigured() {
assertThat(asStruct(petObjects.get(1)).get("type")).isEqualTo("cat");
}
@Test
public void canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(),
"true");
router.configure(config);
final SourceRecord eventRecord = createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{\"fullName\": \"John Doe\", \"petObjects\": [{\"type\": \"dog\", \"colors\": []}, {\"type\": \"cat\", \"colors\": [{\"name\": \"white\"}]}]}",
new HashMap<>(),
new HashMap<>());
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(eventRouted).isNotNull();
Schema valueSchema = eventRouted.valueSchema();
assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type());
assertThat(valueSchema.fields().size()).isEqualTo(2);
assertThat(valueSchema.field("fullName").schema().type().getName()).isEqualTo("string");
assertThat(valueSchema.field("petObjects").schema().type().getName()).isEqualTo("array");
assertThat(valueSchema.field("petObjects").schema().valueSchema().fields().size()).isEqualTo(2);
assertThat(valueSchema.field("petObjects").schema().valueSchema().type().getName()).isEqualTo("struct");
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("type").schema().type().getName()).isEqualTo("string");
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().type().getName()).isEqualTo("array");
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().type().getName()).isEqualTo("struct");
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().fields().size()).isEqualTo(1);
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().field("name").schema().type().getName())
.isEqualTo("string");
Struct valueStruct = (Struct) eventRouted.value();
assertThat(valueStruct.get("fullName")).isEqualTo("John Doe");
List<Object> petObjects = valueStruct.getArray("petObjects");
assertThat(petObjects.size()).isEqualTo(2);
assertThat(asStruct(petObjects.get(0)).get("type")).isEqualTo("dog");
assertThat(asStruct(petObjects.get(0)).getArray("colors")).hasSize(0);
assertThat(asStruct(petObjects.get(1)).get("type")).isEqualTo("cat");
assertThat(asStruct(petObjects.get(1)).getArray("colors")).hasSize(1);
List<Object> colors = asStruct(petObjects.get(1)).getArray("colors");
assertThat(asStruct(colors.get(0)).get("name")).isEqualTo("white");
}
@Test
public void shouldExpandJSONPayloadWithEmptyArrayAndRemoveThatArray() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(),
"true");
router.configure(config);
final SourceRecord eventRecord = createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{\"fullName\": \"John Doe\", \"petObjects\": [{\"type\": \"dog\", \"colors\": []}]}",
new HashMap<>(),
new HashMap<>());
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(eventRouted).isNotNull();
Schema valueSchema = eventRouted.valueSchema();
assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type());
assertThat(valueSchema.fields().size()).isEqualTo(2);
assertThat(valueSchema.field("fullName").schema().type().getName()).isEqualTo("string");
assertThat(valueSchema.field("petObjects").schema().type().getName()).isEqualTo("array");
assertThat(valueSchema.field("petObjects").schema().valueSchema().fields().size()).isEqualTo(1);
assertThat(valueSchema.field("petObjects").schema().valueSchema().field("type").schema().type().getName()).isEqualTo("string");
Struct valueStruct = (Struct) eventRouted.value();
assertThat(valueStruct.get("fullName")).isEqualTo("John Doe");
List<Object> petObjects = valueStruct.getArray("petObjects");
assertThat(petObjects.size()).isEqualTo(1);
assertThat(asStruct(petObjects.get(0)).get("type")).isEqualTo("dog");
}
@Test
public void shouldNotExpandJSONPayloadIfNotConfigured() {
final EventRouter<SourceRecord> router = new EventRouter<>();