DBZ-6910 Values being omitted from list of JSON object

This commit is contained in:
harveyyue 2023-09-20 13:04:42 +08:00 committed by Jiri Pechanec
parent 68eb4b2df2
commit e341910d2c
2 changed files with 71 additions and 6 deletions

View File

@ -6,15 +6,20 @@
package io.debezium.transforms.outbox;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
@ -82,18 +87,18 @@ private Schema toConnectSchemaWithCycles(String key, ArrayNode array) throws Con
Schema schema = null;
final JsonNode sample = getFirstArrayElement(array);
if (sample.isObject()) {
final Set<Schema> elementSchemas = new HashSet<>();
final Iterator<JsonNode> elements = array.elements();
while (elements.hasNext()) {
final JsonNode element = elements.next();
if (!element.isObject()) {
continue;
}
if (schema == null) {
schema = toConnectSchema(key, element);
continue;
}
// If the first element of Arrays is empty, will add missing fields.
schema = toConnectSchema(key, element);
elementSchemas.add(toConnectSchema(key, element));
}
for (Schema element : elementSchemas) {
// TODO: need to support nested json object
schema = mergeSchema(schema, element);
}
}
else {
@ -106,6 +111,32 @@ private Schema toConnectSchemaWithCycles(String key, ArrayNode array) throws Con
return schema;
}
private Schema mergeSchema(Schema left, Schema right) {
if (left == null) {
return right;
}
Map<String, Field> fields = new HashMap<>();
left.fields().forEach(field -> fields.put(field.name(), field));
right.fields().forEach(field -> {
Field oldField = fields.get(field.name());
if (oldField == null) {
fields.put(field.name(), field);
}
else {
if (!Objects.equals(oldField.schema(), field.schema())
&& oldField.schema().type() == Schema.Type.BYTES
&& field.schema().type() != Schema.Type.BYTES) {
fields.put(field.name(), field);
}
}
});
SchemaBuilder newBuilder = SchemaUtil.copySchemaBasics(left);
fields.forEach((k, v) -> newBuilder.field(k, v.schema()));
return newBuilder.build();
}
private JsonNode getFirstArrayElement(ArrayNode array) throws ConnectException {
JsonNode refNode = NullNode.getInstance();
Schema refSchema = null;

View File

@ -42,6 +42,40 @@ public void setup() throws Exception {
record = getFile("json/restaurants5.json");
}
@Test
@FixFor("DBZ-6910")
public void shouldCreateCorrectSchemaFromArrayJson() throws Exception {
String key = "test_arr";
// remove description value from array json
JsonNode testNode = mapper
.readTree("[{\"code\":\"100\",\"description\":\"some description\"},{\"code\":\"200\",\"description\":\"another description\"},{\"code\":\"300\"}]");
Schema arraySchema = jsonSchemaData.toConnectSchema(key, testNode);
assertThat(arraySchema.valueSchema().field("code")).isNotNull();
assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING);
// set null value for description
testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\":\"another description\"},{\"code\":\"300\"}]");
arraySchema = jsonSchemaData.toConnectSchema(key, testNode);
assertThat(arraySchema.valueSchema().field("code")).isNotNull();
assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING);
// treat null value as bytes type
jsonSchemaData = new JsonSchemaData(JsonPayloadNullFieldBehavior.OPTIONAL_BYTES);
testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\": null},{\"code\":\"300\"}]");
arraySchema = jsonSchemaData.toConnectSchema(key, testNode);
assertThat(arraySchema.valueSchema().field("code")).isNotNull();
assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.BYTES);
testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\": \"another description\"},{\"code\":\"300\"}]");
arraySchema = jsonSchemaData.toConnectSchema(key, testNode);
assertThat(arraySchema.valueSchema().field("code")).isNotNull();
assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING);
}
@Test
@FixFor("DBZ-5475")
public void failSchemaCheckForArrayWithDifferentNumberTypes() throws Exception {