DBZ-6486 Check schema fields
This commit is contained in:
parent
08122371eb
commit
b72f084eac
@ -517,12 +517,10 @@ Object getValue(Struct originalRecordValue) {
|
||||
}
|
||||
|
||||
private Object getWithoutDefault(Struct parentStruct, Struct originalRecordValue) {
|
||||
try {
|
||||
return parentStruct.getWithoutDefault(field);
|
||||
}
|
||||
catch (DataException e) { // In case field was added by other SMT and is in the main payload object.
|
||||
return originalRecordValue.getWithoutDefault(field);
|
||||
}
|
||||
// In case field was added by other SMT and is in the main payload
|
||||
// object.
|
||||
return isInSchema(parentStruct.schema()) ? parentStruct.getWithoutDefault(field)
|
||||
: originalRecordValue.getWithoutDefault(field);
|
||||
}
|
||||
|
||||
Schema getSchema(Schema originalRecordSchema) {
|
||||
@ -532,7 +530,7 @@ Schema getSchema(Schema originalRecordSchema) {
|
||||
|
||||
if (schemaField == null) {
|
||||
LOGGER.debug("Field {} not found in {}. Trying in main payload", field, struct);
|
||||
if (!isInPayload(originalRecordSchema)) {
|
||||
if (!isInSchema(originalRecordSchema)) {
|
||||
throw new IllegalArgumentException("Unexpected field name: " + field);
|
||||
}
|
||||
schemaField = originalRecordSchema.field(field);
|
||||
@ -541,11 +539,8 @@ Schema getSchema(Schema originalRecordSchema) {
|
||||
return SchemaUtil.copySchemaBasics(schemaField.schema()).optional().build();
|
||||
}
|
||||
|
||||
private boolean isInPayload(Schema originalRecordSchema) {
|
||||
return originalRecordSchema.fields().stream()
|
||||
.map(org.apache.kafka.connect.data.Field::name)
|
||||
.collect(Collectors.toList())
|
||||
.contains(field);
|
||||
private boolean isInSchema(Schema originalRecordSchema) {
|
||||
return originalRecordSchema.field(field) != null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user