DBZ-5475 Ensure Outbox JSON NUMBER arrays are of the same type
This commit is contained in:
parent
7256f62f6b
commit
8e0ad245e5
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.JsonNodeType;
|
||||||
import com.fasterxml.jackson.databind.node.NullNode;
|
import com.fasterxml.jackson.databind.node.NullNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -80,6 +81,7 @@ private static Schema jsonValueToSchema(JsonNode node) {
|
|||||||
/** */
|
/** */
|
||||||
private static JsonNode getFirstArrayElement(ArrayNode array) throws ConnectException {
|
private static JsonNode getFirstArrayElement(ArrayNode array) throws ConnectException {
|
||||||
JsonNode refNode = NullNode.getInstance();
|
JsonNode refNode = NullNode.getInstance();
|
||||||
|
Schema refSchema = null;
|
||||||
// Get first non-null element type and check other member types.
|
// Get first non-null element type and check other member types.
|
||||||
Iterator<JsonNode> elements = array.elements();
|
Iterator<JsonNode> elements = array.elements();
|
||||||
while (elements.hasNext()) {
|
while (elements.hasNext()) {
|
||||||
@ -100,6 +102,18 @@ private static JsonNode getFirstArrayElement(ArrayNode array) throws ConnectExce
|
|||||||
throw new ConnectException(String.format("Field is not a homogenous array (%s x %s).",
|
throw new ConnectException(String.format("Field is not a homogenous array (%s x %s).",
|
||||||
refNode.asText(), element.getNodeType().toString()));
|
refNode.asText(), element.getNodeType().toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We may return different schemas for NUMBER type, check here they are same.
|
||||||
|
if (refNode.getNodeType() == JsonNodeType.NUMBER) {
|
||||||
|
if (refSchema == null) {
|
||||||
|
refSchema = jsonValueToSchema(refNode);
|
||||||
|
}
|
||||||
|
Schema elementSchema = jsonValueToSchema(element);
|
||||||
|
if (refSchema != elementSchema) {
|
||||||
|
throw new ConnectException(String.format("Field is not a homogenous array (%s x %s), different number types (%s x %s)",
|
||||||
|
refNode.asText(), element.asText(), refSchema, elementSchema));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return refNode;
|
return refNode;
|
||||||
|
@ -0,0 +1,41 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.transforms.outbox;
|
||||||
|
|
||||||
|
import static org.fest.assertions.Assertions.assertThat;
|
||||||
|
|
||||||
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import io.debezium.doc.FixFor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author vjuranek
|
||||||
|
*/
|
||||||
|
public class SchemaBuilderUtilTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@FixFor("DBZ-5475")
|
||||||
|
public void failSchemaCheckForArrayWithDifferentNumberTypes() throws Exception {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
JsonNode testNode = mapper.readTree("{\"test\": [1, 2.0, 3.0]}");
|
||||||
|
|
||||||
|
RuntimeException expectedException = null;
|
||||||
|
try {
|
||||||
|
SchemaBuilderUtil.jsonNodeToSchema(testNode);
|
||||||
|
}
|
||||||
|
catch (ConnectException e) {
|
||||||
|
expectedException = e;
|
||||||
|
}
|
||||||
|
assertThat(expectedException).isNotNull();
|
||||||
|
assertThat(expectedException).isInstanceOf(ConnectException.class);
|
||||||
|
assertThat(expectedException).hasMessage("Field is not a homogenous array (1 x 2.0), different number types (Schema{INT32} x Schema{FLOAT64})");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user