DBZ-6774 MongoDB New Document State Extraction: nonexistent field for add.headers

This commit is contained in:
harveyyue 2023-08-15 11:06:06 +08:00 committed by Jiri Pechanec
parent e60f69b13d
commit 625eec5aa0
2 changed files with 31 additions and 22 deletions

View File

@ -63,8 +63,6 @@
*/ */
public class ExtractNewDocumentState<R extends ConnectRecord<R>> implements Transformation<R> { public class ExtractNewDocumentState<R extends ConnectRecord<R>> implements Transformation<R> {
private String addFieldsPrefix;
public enum ArrayEncoding implements EnumeratedValue { public enum ArrayEncoding implements EnumeratedValue {
ARRAY("array"), ARRAY("array"),
DOCUMENT("document"); DOCUMENT("document");
@ -356,8 +354,10 @@ private Headers makeHeaders(List<FieldReference> additionalHeaders, Struct origi
} }
continue; continue;
} }
headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue), if (fieldReference.exists(originalRecordValue.schema())) {
fieldReference.getSchema(originalRecordValue.schema())); headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue),
fieldReference.getSchema(originalRecordValue.schema()));
}
} }
return headers; return headers;
@ -398,9 +398,9 @@ public void configure(final Map<String, ?> map) {
converter = new MongoDataConverter( converter = new MongoDataConverter(
ArrayEncoding.parse(config.getString(ARRAY_ENCODING)), ArrayEncoding.parse(config.getString(ARRAY_ENCODING)),
FieldNameSelector.defaultNonRelationalSelector(fieldNameAdjuster), FieldNameSelector.defaultNonRelationalSelector(fieldNameAdjuster),
fieldNameAdjustmentMode != FieldNameAdjustmentMode.NONE ? true : false); fieldNameAdjustmentMode != FieldNameAdjustmentMode.NONE);
addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX); String addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX);
String addHeadersPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX); String addHeadersPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX);
additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS)); additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS));
additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS)); additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS));
@ -485,20 +485,18 @@ else if (parts.length == 2) {
* Determine the struct hosting the given unqualified field. * Determine the struct hosting the given unqualified field.
*/ */
private static String determineStruct(String simpleFieldName) { private static String determineStruct(String simpleFieldName) {
if (simpleFieldName.equals(Envelope.FieldName.OPERATION) || switch (simpleFieldName) {
simpleFieldName.equals(Envelope.FieldName.TIMESTAMP)) { case FieldName.OPERATION:
return null; case FieldName.TIMESTAMP:
} return null;
else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY) || case TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY:
simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY) || case TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY:
simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY)) { case TransactionMonitor.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY:
return Envelope.FieldName.TRANSACTION; return FieldName.TRANSACTION;
} case MongoDbFieldName.UPDATE_DESCRIPTION:
else if (simpleFieldName.equals(MongoDbFieldName.UPDATE_DESCRIPTION)) { return MongoDbFieldName.UPDATE_DESCRIPTION;
return MongoDbFieldName.UPDATE_DESCRIPTION; default:
} return FieldName.SOURCE;
else {
return Envelope.FieldName.SOURCE;
} }
} }
@ -536,5 +534,13 @@ Schema getSchema(Schema originalRecordSchema) {
return SchemaUtil.copySchemaBasics(schemaField.schema()).optional().build(); return SchemaUtil.copySchemaBasics(schemaField.schema()).optional().build();
} }
boolean exists(Schema originalRecordSchema) {
Schema parentSchema = struct != null ? originalRecordSchema.field(struct).schema() : originalRecordSchema;
org.apache.kafka.connect.data.Field schemaField = parentSchema.field(field);
return schemaField != null;
}
} }
} }

View File

@ -37,6 +37,7 @@
import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.model.CreateCollectionOptions;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.Module;
import io.debezium.connector.mongodb.MongoDbConnectorConfig; import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation; import io.debezium.data.Envelope.Operation;
@ -1869,12 +1870,12 @@ public void shouldSupportNestedArrays() throws InterruptedException {
} }
@Test @Test
@FixFor({ "DBZ-5834" }) @FixFor({ "DBZ-5834", "DBZ-6774" })
public void shouldAddUpdateDescription() throws Exception { public void shouldAddUpdateDescription() throws Exception {
waitForStreamingRunning(); waitForStreamingRunning();
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
props.put(ADD_HEADERS, "updateDescription.updatedFields"); props.put(ADD_HEADERS, "updateDescription.updatedFields,nonexistentField,version");
props.put(ADD_HEADERS_PREFIX, "prefix."); props.put(ADD_HEADERS_PREFIX, "prefix.");
transformation.configure(props); transformation.configure(props);
@ -1915,6 +1916,8 @@ public void shouldAddUpdateDescription() throws Exception {
// verify headers // verify headers
final String expectedUpdateFields = "{\"name\": \"Mary\", \"zipcode\": \"11111\"}"; final String expectedUpdateFields = "{\"name\": \"Mary\", \"zipcode\": \"11111\"}";
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.updateDescription_updatedFields")).isEqualTo(expectedUpdateFields); assertThat(getSourceRecordHeaderByKey(transformed, "prefix.updateDescription_updatedFields")).isEqualTo(expectedUpdateFields);
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.nonexistentField")).isNull();
assertThat(getSourceRecordHeaderByKey(transformed, "prefix.version")).isEqualTo(Module.version());
} }
@Test @Test