DBZ-2316 Throw DataException with non-uniform array elements

While using array.encoding=array, array elements that do not have
uniform structures should throw a DataException with a meaningful
message rather than throw a NullPointerException.
This commit is contained in:
Chris Cranford 2020-09-29 10:49:17 -04:00 committed by Gunnar Morling
parent ab7db1ddca
commit 0040eca740
3 changed files with 63 additions and 1 deletions

View File

@ -12,10 +12,12 @@
import java.util.Map;
import java.util.Map.Entry;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonType;
@ -130,7 +132,11 @@ public void convertFieldValue(Entry<String, BsonValue> keyvalueforStruct, Struct
break;
case DOCUMENT:
Schema documentSchema = schema.field(keyvalueforStruct.getKey()).schema();
Field field = schema.field(keyvalueforStruct.getKey());
if (field == null) {
throw new DataException("Failed to find field '" + keyvalueforStruct.getKey() + "' in schema " + schema.name());
}
Schema documentSchema = field.schema();
Struct documentStruct = new Struct(documentSchema);
BsonDocument docs = keyvalueforStruct.getValue().asDocument();

View File

@ -6,20 +6,25 @@
package io.debezium.connector.mongodb.transforms;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
@ -36,6 +41,8 @@
import io.debezium.doc.FixFor;
import io.debezium.transforms.ExtractNewRecordStateConfigDefinition;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
/**
* Integration test for {@link ExtractNewDocumentState}. It sends operations into
@ -59,6 +66,7 @@ public class ExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentSta
private static final String ADD_FIELDS = "add.fields";
private static final String ADD_FIELDS_PREFIX = ADD_FIELDS + ".prefix";
private static final String ADD_HEADERS_PREFIX = ADD_HEADERS + ".prefix";
private static final String ARRAY_ENCODING = "array.encoding";
@Override
protected String getCollectionName() {
@ -1449,6 +1457,53 @@ public void testAddPatchFieldAfterUpdate() throws Exception {
assertThat(value.schema().fields()).hasSize(3);
}
@Test(expected = DataException.class)
@FixFor("DBZ-2316")
public void testShouldThrowExceptionWithElementsDifferingStructures() throws Exception {
waitForStreamingRunning();
final Map<String, String> props = new HashMap<>();
props.put(ARRAY_ENCODING, "array");
props.put(ADD_FIELDS, "op,source.ts_ms");
transformation.configure(props);
final SourceRecords records = createCreateRecordFromJson("dbz-2316.json");
for (SourceRecord record : records.allRecordsInOrder()) {
final SourceRecord transformed = transformation.apply(record);
}
}
private SourceRecords createCreateRecordFromJson(String pathOnClasspath) throws Exception {
final List<Document> documents = loadTestDocuments(pathOnClasspath);
primary().execute("Load JSON", client -> {
for (Document document : documents) {
client.getDatabase(DB_NAME).getCollection(getCollectionName()).insertOne(document);
}
});
final SourceRecords records = consumeRecordsByTopic(documents.size());
assertThat(records.recordsForTopic(topicName()).size()).isEqualTo(documents.size());
assertNoRecordsToConsume();
return records;
}
private List<Document> loadTestDocuments(String pathOnClasspath) {
final List<Document> documents = new ArrayList<>();
try (InputStream stream = Testing.Files.readResourceAsStream(pathOnClasspath)) {
assertThat(stream).isNotNull();
IoUtil.readLines(stream, line -> {
Document document = Document.parse(line);
assertThat(document.size()).isGreaterThan(0);
documents.add(document);
});
}
catch (IOException e) {
fail("Unable to find or read file '" + pathOnClasspath + "': " + e.getMessage());
}
return documents;
}
private SourceRecord createCreateRecord() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()

View File

@ -0,0 +1 @@
{ "_id" : ObjectId("5f05a9a979dd7300018a102b"), "pages" : [ { "ID" : 1, "Pagination" : { "Number" : 1, "Location" : 1, "Text" : "六月・绿树阴浓" }, "Property" : { "Height" : 3284, "Background" : { "ImageUrl" : "https://static.kid17.com//album/package/subject/A/A15.jpg", "Color" : "#709E06" }, "Type" : "hybrid-subject" }, "Elements" : [ { "Head" : { "Height" : 112, "Title" : { "Lines" : [ { "Text" : "六月・绿树阴浓", "XCoordinates" : [ 200, 312, 424, 536, 648, 760, 872 ], "YCoordinate" : 0 } ] } }, "Body" : { "Content" : { "Height" : 0, "Lines" : [ ] } }, "OriginID" : -1006, "Height" : 208, "Label" : { "Type" : "subject", "Content" : "", "XCoordinate" : 160, "YCoordinate" : 72, "Width" : 120, "Height" : 120 } }, { "OriginID" : 2, "Height" : 2900, "Label" : { "XCoordinate" : 240, "YCoordinate" : 456, "Width" : 200, "Height" : 176, "Type" : "feed", "Content" : "2020-06-22" }, "Head" : { "Icon" : { "Height" : 60, "CreatorType" : 4, "XCoordinate" : 260, "YCoordinate" : 116, "Width" : 60, "Text": "Hi" }, "Mark" : { "Lines" : [ { "XCoordinates" : [ 352, 400, 448 ], "YCoordinate" : 116, "Text" : "test1" } ] }, "QRcode" : null, "Tag" : null, "Height" : 156, "Title" : { "Lines" : [ { "YCoordinate" : 0, "Text" : "test2", "XCoordinates" : [ 260, 348, 436, 524 ] } ] } }, "Body" : { "Quote" : null, "Template" : null, "Video" : null, "Content" : { "Height" : 80, "Lines" : [ { "Text" : "个小故事", "XCoordinates" : [ 260, 308, 356, 404 ], "YCoordinate" : 216 } ] }, "Images" : { "Elements" : [ { "XCoordinate" : 661.37396, "YCoordinate" : 356, "Width" : 877.2521, "Height" : 1899.0883, "Rotation" : 0, "Type" : "image", "Bucket" : "tiny-main", "URL" : "https://static.bookoom.com/dev/organizations/0/groups/2/users/3/0ek0xr4k1ybw1592792275610.jpeg" }, { "Height" : 624.02466, "Rotation" : 0, "Type" : "image", "Bucket" : "tiny-main", "URL" : "https://static.bookoom.com/dev/organizations/0/groups/2/users/3/aoh1ssz9izv1592792275574.jpeg", "XCoordinate" : 661.37396, "YCoordinate" : 2295.0884, "Width" : 877.2521 } ] } } } ], "DividingLines" : [ ], "IsGraduationPage" : false }, { "Pagination" : { "Number" : 2, "Location" : 2, "Text" : "六月・绿树阴浓" }, "Property" : { "Background" : { "ImageUrl" : "", "Color" : "" }, "Type" : "unknown", "Height" : 80 }, "Elements" : [ ], "DividingLines" : [ ], "IsGraduationPage" : false, "ID" : 2 }, { "Elements" : [ { "Label" : { "Content" : "", "XCoordinate" : 160, "YCoordinate" : 468.64087, "Width" : 120, "Height" : 120, "Type" : "subject" }, "Head" : { "Height" : 112, "Title" : { "Lines" : [ { "Text" : "七月・荧光闪烁", "XCoordinates" : [ 200, 312, 424, 536, 648, 760, 872 ], "YCoordinate" : 0 } ] } }, "Body" : { "Content" : { "Height" : 0, "Lines" : [ ] } }, "OriginID" : -1007, "Height" : 208 }, { "OriginID" : 64, "Height" : 2106.7183, "Label" : { "Content" : "2020-07-07", "XCoordinate" : 240, "YCoordinate" : 852.64087, "Width" : 200, "Height" : 176, "Type" : "feed" }, "Head" : { "Height" : 156, "Title" : { "Lines" : [ { "Text" : "揭谛揭谛揭谛😊😬😬🤪", "XCoordinates" : [ 260, 348, 436, 524, 612, 700, 788, 876, 964, 1052 ], "YCoordinate" : 0 } ] }, "Icon" : { "CreatorType" : 4, "XCoordinate" : 260, "YCoordinate" : 116, "Width" : 60, "Height" : 60 }, "Mark" : { "Lines" : [ { "Text" : "家长记", "XCoordinates" : [ 352, 400, 448 ], "YCoordinate" : 116 } ] }, "QRcode" : null, "Tag" : null }, "Body" : { "Images" : { "Elements" : [ { "Height" : 1670.7183, "Rotation" : 0, "Type" : "image", "Bucket" : "tiny-main", "URL" : "https://static.bookoom.com/dev/organizations/0/groups/2/users/3/hluhuvrjxt1594101146620.jpeg", "XCoordinate" : 260, "YCoordinate" : 436, "Width" : 1680 } ] }, "Quote" : null, "Template" : null, "Video" : null, "Content" : { "Height" : 160, "Lines" : [ { "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596, 644, 692, 740, 788, 836, 884, 932, 980, 1028, 1076, 1124, 1172, 1220, 1268, 1316, 1364, 1412, 1460, 1508, 1556, 1604, 1652, 1700, 1748, 1796, 1844, 1892 ], "YCoordinate" : 216, "Text" : "😊😬🙂😬😌😬😌😬😬😥👶🏻😭😒😭🧎😥😁😁😁🧐🧎😭😄😭😑😐🤮👹😑💀😐💀😐😪" }, { "Text" : "😐😬😑🤕😐🤕😑😐", "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596 ], "YCoordinate" : 296 } ] } } } ], "DividingLines" : [ ], "IsGraduationPage" : false, "ID" : 3, "Pagination" : { "Number" : 3, "Location" : 1, "Text" : "七月・荧光闪烁" }, "Property" : { "Type" : "hybrid-subject", "Height" : 2490.7183, "Background" : { "ImageUrl" : "https://static.kid17.com//album/package/subject/A/A07.jpg", "Color" : "#BC9000" } } }, { "ID" : 4, "Pagination" : { "Number" : 4, "Location" : 2, "Text" : "七月・荧光闪烁" }, "Property" : { "Type" : "feed", "Height" : 2916, "Background" : { "ImageUrl" : "https://static.kid17.com//album/package/feed/A/%23BC9000_A01.jpg", "Color" : "#BC9000" } }, "Elements" : [ { "OriginID" : 22, "Height" : 2916, "Label" : { "Content" : "2020-07-03", "XCoordinate" : 240, "YCoordinate" : 296, "Width" : 200, "Height" : 176, "Type" : "feed" }, "Head" : { "Height" : 156, "Title" : { "Lines" : [ { "Text" : "自己开心看着开心开始考试", "XCoordinates" : [ 260, 348, 436, 524, 612, 700, 788, 876, 964, 1052, 1140, 1228 ], "YCoordinate" : 0 } ] }, "Icon" : { "CreatorType" : 4, "XCoordinate" : 260, "YCoordinate" : 116, "Width" : 60, "Height" : 60 }, "Mark" : { "Lines" : [ { "Text" : "家长记", "XCoordinates" : [ 352, 400, 448 ], "YCoordinate" : 116 } ] }, "QRcode" : null, "Tag" : null }, "Body" : { "Content" : { "Height" : 400, "Lines" : [ { "Text" : "苍井空曾经的开心巨蟹巨蟹巨蟹巨蟹巨蟹巨蟹的艰难的女生呢小姐姐现金自己想", "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596, 644, 692, 740, 788, 836, 884, 932, 980, 1028, 1076, 1124, 1172, 1220, 1268, 1316, 1364, 1412, 1460, 1508, 1556, 1604, 1652, 1700, 1748, 1796, 1844, 1892 ], "YCoordinate" : 216 }, { "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596, 644, 692, 740, 788, 836, 884, 932, 980, 1028, 1076, 1124, 1172, 1220, 1268, 1316, 1364, 1412, 1460, 1508, 1556, 1604, 1652, 1700, 1748, 1796, 1844, 1892 ], "YCoordinate" : 296, "Text" : "接电话生快生快生快消极的还不是开心年底晓不得就是你说呢细节大家张杰谢娜" }, { "YCoordinate" : 376, "Text" : "小舅子你上课这里说不定就只能就行呢欧洲史怒赞你搜搜你死准备四十八四十八", "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596, 644, 692, 740, 788, 836, 884, 932, 980, 1028, 1076, 1124, 1172, 1220, 1268, 1316, 1364, 1412, 1460, 1508, 1556, 1604, 1652, 1700, 1748, 1796, 1844, 1892 ] }, { "Text" : "四总部四十八继续继续开心就行你小姐妹自己,别的绝对绝对绝对绝对能睡觉睡", "XCoordinates" : [ 260, 308, 356, 404, 452, 500, 548, 596, 644, 692, 740, 788, 836, 884, 932, 980, 1028, 1076, 1124, 1172, 1220, 1268, 1316, 1364, 1412, 1460, 1508, 1556, 1604, 1652, 1700, 1748, 1796, 1844, 1892 ], "YCoordinate" : 456 }, { "YCoordinate" : 536, "Text" : "觉", "XCoordinates" : [ 260 ] } ] }, "Images" : { "Elements" : [ { "Width" : 1680, "Height" : 2240, "Rotation" : 0, "Type" : "image", "Bucket" : "tiny-main", "URL" : "https://static.bookoom.com/dev/organizations/0/groups/2/users/3/lhm63hrkxhl1593740343286.jpeg", "XCoordinate" : 260, "YCoordinate" : 676 } ] }, "Quote" : null, "Template" : null, "Video" : null } } ], "DividingLines" : [ ], "IsGraduationPage" : false }, { "ID" : 5, "Pagination" : { "Number" : 5, "Location" : 1, "Text" : "七月・荧光闪烁" }, "Property" : { "Type" : "unknown", "Height" : 0, "Background" : { "ImageUrl" : "", "Color" : "" } }, "Elements" : [ ], "DividingLines" : [ ], "IsGraduationPage" : false }, { "Element" : null, "ID" : 6, "Pagination" : { "Number" : 0, "Location" : 0, "Text" : "后记" }, "Property" : { "Type" : "postscript", "Height" : 3508, "Background" : { "ImageUrl" : "https://static.kid17.com//album/package/others/postscript.jpg", "Color" : "" } } } ] }