DBZ-6760 Outbox transformation can cause connector to crash

This commit is contained in:
harveyyue 2023-08-09 12:54:54 +08:00 committed by Jiri Pechanec
parent 7552068515
commit 063244a0ad
3 changed files with 66 additions and 28 deletions

View File

@ -12,6 +12,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -179,7 +180,7 @@ public void convertFieldValue(Entry<String, BsonValue> keyvalueforStruct, Struct
List<BsonValue> arrValues = keyvalueforStruct.getValue().asArray().getValues(); List<BsonValue> arrValues = keyvalueforStruct.getValue().asArray().getValues();
ArrayList<Object> list = new ArrayList<>(); ArrayList<Object> list = new ArrayList<>();
arrValues.stream().forEach(arrValue -> { arrValues.forEach(arrValue -> {
final Schema valueSchema; final Schema valueSchema;
if (Arrays.asList(BsonType.ARRAY, BsonType.DOCUMENT).contains(valueType)) { if (Arrays.asList(BsonType.ARRAY, BsonType.DOCUMENT).contains(valueType)) {
valueSchema = schema.field(key).schema().valueSchema(); valueSchema = schema.field(key).schema().valueSchema();
@ -453,10 +454,8 @@ private void subSchema(SchemaBuilder documentSchemaBuilder, Map<String, BsonType
if (prevType == null) { if (prevType == null) {
addFieldSchema(arrayDoc, documentSchemaBuilder); addFieldSchema(arrayDoc, documentSchemaBuilder);
} }
else if (prevType != arrayDoc.getValue().getBsonType()) { else {
throw new DebeziumException("Field " + key + " of schema " + documentSchemaBuilder.name() testArrayElementType(documentSchemaBuilder, arrayDoc, union);
+ " is not the same type for all documents in the array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
} }
} }
} }
@ -467,13 +466,7 @@ private void testType(SchemaBuilder builder, String key, BsonValue value, BsonTy
for (BsonValue element : value.asArray()) { for (BsonValue element : value.asArray()) {
final BsonDocument arrayDocs = element.asDocument(); final BsonDocument arrayDocs = element.asDocument();
for (Entry<String, BsonValue> arrayDoc : arrayDocs.entrySet()) { for (Entry<String, BsonValue> arrayDoc : arrayDocs.entrySet()) {
final String docKey = fieldNamer.fieldNameFor(arrayDoc.getKey()); testArrayElementType(builder, arrayDoc, union);
final BsonType prevType = union.putIfAbsent(docKey, arrayDoc.getValue().getBsonType());
if (prevType != null && prevType != arrayDoc.getValue().getBsonType()) {
throw new DebeziumException("Field " + docKey + " of schema " + builder.name()
+ " is not the same type for all documents in the array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
}
} }
} }
} }
@ -492,4 +485,25 @@ else if (valueType == BsonType.ARRAY) {
} }
} }
} }
private void testArrayElementType(SchemaBuilder builder, Entry<String, BsonValue> arrayDoc, Map<String, BsonType> union) {
final String docKey = fieldNamer.fieldNameFor(arrayDoc.getKey());
final BsonType currentType = arrayDoc.getValue().getBsonType();
final BsonType prevType = union.putIfAbsent(docKey, currentType);
if (prevType != null) {
if ((prevType == BsonType.NULL || currentType == BsonType.NULL)
&& !Objects.equals(prevType, currentType)) {
// set non-null type as real schema
if (prevType == BsonType.NULL) {
union.put(docKey, currentType);
}
}
else if (!Objects.equals(prevType, currentType)) {
throw new DebeziumException("Field " + docKey + " of schema " + builder.name()
+ " is not the same type for all documents in the array.\n"
+ "Check option 'struct' of parameter 'array.encoding'");
}
}
}
} }

View File

@ -103,7 +103,7 @@ public void configure(Map<String, ?> configMap) {
* *
* @param originalRecord an original Record from MongoDB Connector * @param originalRecord an original Record from MongoDB Connector
* @return a new Record of which <i>after</i> field is replaced with new one * @return a new Record of which <i>after</i> field is replaced with new one
* @throws Exception if <i>after</i> field of original Record is not an expected form * @throws IllegalStateException if <i>after</i> field of original Record is not an expected form
*/ */
private R expandAfterField(R originalRecord) throws IllegalStateException { private R expandAfterField(R originalRecord) throws IllegalStateException {
final R afterRecord = afterExtractor.apply(originalRecord); final R afterRecord = afterExtractor.apply(originalRecord);

View File

@ -20,6 +20,7 @@
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.ArrayEncoding; import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.ArrayEncoding;
import io.debezium.doc.FixFor;
/** /**
* Unit test for {@code MongoDataConverter} that verifies array types. * Unit test for {@code MongoDataConverter} that verifies array types.
@ -58,20 +59,32 @@ public class MongoArrayConverterTest {
private static final String HOMOGENOUS_ARRAYS = lines( private static final String HOMOGENOUS_ARRAYS = lines(
"{", "{",
" \"_id\": 1,", " \"_id\": 1,",
" \"a1\": [", " \"a1\": [",
" {", " {",
" \"a\": 1", " \"a\": 1",
" },", " },",
" {", " {",
" \"b\": \"c\"", " \"b\": \"c\"",
" }", " }",
" ],", " ],",
" \"a2\": [", " \"a2\": [",
" \"11\",", " \"11\",",
" \"abc\"", " \"abc\"",
" ],", " ],",
" \"empty\": []", " \"empty\": [],",
" \"additionalContacts\": [",
" {",
" \"firstName\": \"John\",",
" \"lastName\": \"Doe\",",
" \"comment\": null",
" },",
" {",
" \"firstName\": \"Jane\",",
" \"lastName\": \"Doe\",",
" \"comment\": \"A comment\"",
" }",
" ]",
"}"); "}");
private SchemaBuilder builder; private SchemaBuilder builder;
@ -100,6 +113,7 @@ public void shouldDetectHeterogenousDocumentInArray() throws Exception {
} }
@Test @Test
@FixFor("DBZ-6760")
public void shouldCreateSchemaForHomogenousArray() throws Exception { public void shouldCreateSchemaForHomogenousArray() throws Exception {
final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY); final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
final BsonDocument val = BsonDocument.parse(HOMOGENOUS_ARRAYS); final BsonDocument val = BsonDocument.parse(HOMOGENOUS_ARRAYS);
@ -118,10 +132,16 @@ public void shouldCreateSchemaForHomogenousArray() throws Exception {
.build()).optional().build()) .build()).optional().build())
.field("a2", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) .field("a2", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
.field("empty", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) .field("empty", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build())
.field("additionalContacts", SchemaBuilder.array(SchemaBuilder.struct().name("array.additionalContacts").optional()
.field("firstName", Schema.OPTIONAL_STRING_SCHEMA)
.field("lastName", Schema.OPTIONAL_STRING_SCHEMA)
.field("comment", Schema.OPTIONAL_STRING_SCHEMA)
.build()).optional().build())
.build()); .build());
} }
@Test @Test
@FixFor("DBZ-6760")
public void shouldCreateStructForHomogenousArray() throws Exception { public void shouldCreateStructForHomogenousArray() throws Exception {
final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY); final MongoDataConverter converter = new MongoDataConverter(ArrayEncoding.ARRAY);
final BsonDocument val = BsonDocument.parse(HOMOGENOUS_ARRAYS); final BsonDocument val = BsonDocument.parse(HOMOGENOUS_ARRAYS);
@ -147,7 +167,11 @@ public void shouldCreateStructForHomogenousArray() throws Exception {
"Struct{b=c}" + "Struct{b=c}" +
"]," + "]," +
"a2=[11, abc]," + "a2=[11, abc]," +
"empty=[]}"); "empty=[]," +
"additionalContacts=[" +
"Struct{firstName=John,lastName=Doe}, " +
"Struct{firstName=Jane,lastName=Doe,comment=A comment}" +
"]}");
// @formatter:on // @formatter:on
} }