From e341910d2c17afd872acd29ae0bbaa50b7e64181 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Wed, 20 Sep 2023 13:04:42 +0800 Subject: [PATCH] DBZ-6910 Values being omitted from list of JSON object --- .../transforms/outbox/JsonSchemaData.java | 43 ++++++++++++++++--- .../transforms/outbox/JsonSchemaDataTest.java | 34 +++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/JsonSchemaData.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/JsonSchemaData.java index 9da3b4350..9e8d0748d 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/JsonSchemaData.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/JsonSchemaData.java @@ -6,15 +6,20 @@ package io.debezium.transforms.outbox; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.transforms.util.SchemaUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; @@ -82,18 +87,18 @@ private Schema toConnectSchemaWithCycles(String key, ArrayNode array) throws Con Schema schema = null; final JsonNode sample = getFirstArrayElement(array); if (sample.isObject()) { + final Set elementSchemas = new HashSet<>(); final Iterator elements = array.elements(); while (elements.hasNext()) { final JsonNode element = elements.next(); if (!element.isObject()) { continue; } - if (schema == null) { - schema = toConnectSchema(key, element); - continue; - } - // If the first element of Arrays is empty, will add missing fields. - schema = toConnectSchema(key, element); + elementSchemas.add(toConnectSchema(key, element)); + } + for (Schema element : elementSchemas) { + // TODO: need to support nested json object + schema = mergeSchema(schema, element); } } else { @@ -106,6 +111,32 @@ private Schema toConnectSchemaWithCycles(String key, ArrayNode array) throws Con return schema; } + private Schema mergeSchema(Schema left, Schema right) { + if (left == null) { + return right; + } + + Map fields = new HashMap<>(); + left.fields().forEach(field -> fields.put(field.name(), field)); + right.fields().forEach(field -> { + Field oldField = fields.get(field.name()); + if (oldField == null) { + fields.put(field.name(), field); + } + else { + if (!Objects.equals(oldField.schema(), field.schema()) + && oldField.schema().type() == Schema.Type.BYTES + && field.schema().type() != Schema.Type.BYTES) { + fields.put(field.name(), field); + } + } + }); + + SchemaBuilder newBuilder = SchemaUtil.copySchemaBasics(left); + fields.forEach((k, v) -> newBuilder.field(k, v.schema())); + return newBuilder.build(); + } + private JsonNode getFirstArrayElement(ArrayNode array) throws ConnectException { JsonNode refNode = NullNode.getInstance(); Schema refSchema = null; diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/JsonSchemaDataTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/JsonSchemaDataTest.java index ba99579ed..d640b1426 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/JsonSchemaDataTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/JsonSchemaDataTest.java @@ -42,6 +42,40 @@ public void setup() throws Exception { record = getFile("json/restaurants5.json"); } + @Test + @FixFor("DBZ-6910") + public void shouldCreateCorrectSchemaFromArrayJson() throws Exception { + String key = "test_arr"; + // remove description value from array json + JsonNode testNode = mapper + .readTree("[{\"code\":\"100\",\"description\":\"some description\"},{\"code\":\"200\",\"description\":\"another description\"},{\"code\":\"300\"}]"); + Schema arraySchema = jsonSchemaData.toConnectSchema(key, testNode); + + assertThat(arraySchema.valueSchema().field("code")).isNotNull(); + assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING); + + // set null value for description + testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\":\"another description\"},{\"code\":\"300\"}]"); + arraySchema = jsonSchemaData.toConnectSchema(key, testNode); + + assertThat(arraySchema.valueSchema().field("code")).isNotNull(); + assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING); + + // treat null value as bytes type + jsonSchemaData = new JsonSchemaData(JsonPayloadNullFieldBehavior.OPTIONAL_BYTES); + testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\": null},{\"code\":\"300\"}]"); + arraySchema = jsonSchemaData.toConnectSchema(key, testNode); + + assertThat(arraySchema.valueSchema().field("code")).isNotNull(); + assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.BYTES); + + testNode = mapper.readTree("[{\"code\":\"100\",\"description\": null},{\"code\":\"200\",\"description\": \"another description\"},{\"code\":\"300\"}]"); + arraySchema = jsonSchemaData.toConnectSchema(key, testNode); + + assertThat(arraySchema.valueSchema().field("code")).isNotNull(); + assertThat(arraySchema.valueSchema().field("description").schema().type()).isEqualTo(Schema.Type.STRING); + } + @Test @FixFor("DBZ-5475") public void failSchemaCheckForArrayWithDifferentNumberTypes() throws Exception {