From 625eec5aa0ccb77b64c78ec3be2376f69339ebf5 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Tue, 15 Aug 2023 11:06:06 +0800 Subject: [PATCH] DBZ-6774 MongoDB New Document State Extraction: nonexistent field for add.headers --- .../transforms/ExtractNewDocumentState.java | 46 +++++++++++-------- .../ExtractNewDocumentStateTestIT.java | 7 ++- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java index bbb522b43..643e29fc1 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java @@ -63,8 +63,6 @@ */ public class ExtractNewDocumentState> implements Transformation { - private String addFieldsPrefix; - public enum ArrayEncoding implements EnumeratedValue { ARRAY("array"), DOCUMENT("document"); @@ -356,8 +354,10 @@ private Headers makeHeaders(List additionalHeaders, Struct origi } continue; } - headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue), - fieldReference.getSchema(originalRecordValue.schema())); + if (fieldReference.exists(originalRecordValue.schema())) { + headers.add(fieldReference.getNewFieldName(), fieldReference.getValue(originalRecordValue), + fieldReference.getSchema(originalRecordValue.schema())); + } } return headers; @@ -398,9 +398,9 @@ public void configure(final Map map) { converter = new MongoDataConverter( ArrayEncoding.parse(config.getString(ARRAY_ENCODING)), FieldNameSelector.defaultNonRelationalSelector(fieldNameAdjuster), - fieldNameAdjustmentMode != FieldNameAdjustmentMode.NONE ? true : false); + fieldNameAdjustmentMode != FieldNameAdjustmentMode.NONE); - addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX); + String addFieldsPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS_PREFIX); String addHeadersPrefix = config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS_PREFIX); additionalHeaders = FieldReference.fromConfiguration(addHeadersPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_HEADERS)); additionalFields = FieldReference.fromConfiguration(addFieldsPrefix, config.getString(ExtractNewRecordStateConfigDefinition.ADD_FIELDS)); @@ -485,20 +485,18 @@ else if (parts.length == 2) { * Determine the struct hosting the given unqualified field. */ private static String determineStruct(String simpleFieldName) { - if (simpleFieldName.equals(Envelope.FieldName.OPERATION) || - simpleFieldName.equals(Envelope.FieldName.TIMESTAMP)) { - return null; - } - else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY) || - simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY) || - simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY)) { - return Envelope.FieldName.TRANSACTION; - } - else if (simpleFieldName.equals(MongoDbFieldName.UPDATE_DESCRIPTION)) { - return MongoDbFieldName.UPDATE_DESCRIPTION; - } - else { - return Envelope.FieldName.SOURCE; + switch (simpleFieldName) { + case FieldName.OPERATION: + case FieldName.TIMESTAMP: + return null; + case TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY: + case TransactionMonitor.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY: + case TransactionMonitor.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY: + return FieldName.TRANSACTION; + case MongoDbFieldName.UPDATE_DESCRIPTION: + return MongoDbFieldName.UPDATE_DESCRIPTION; + default: + return FieldName.SOURCE; } } @@ -536,5 +534,13 @@ Schema getSchema(Schema originalRecordSchema) { return SchemaUtil.copySchemaBasics(schemaField.schema()).optional().build(); } + + boolean exists(Schema originalRecordSchema) { + Schema parentSchema = struct != null ? originalRecordSchema.field(struct).schema() : originalRecordSchema; + + org.apache.kafka.connect.data.Field schemaField = parentSchema.field(field); + + return schemaField != null; + } } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentStateTestIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentStateTestIT.java index 6265d011e..c95469bf2 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentStateTestIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentStateTestIT.java @@ -37,6 +37,7 @@ import com.mongodb.client.model.CreateCollectionOptions; import io.debezium.config.Configuration; +import io.debezium.connector.mongodb.Module; import io.debezium.connector.mongodb.MongoDbConnectorConfig; import io.debezium.data.Envelope; import io.debezium.data.Envelope.Operation; @@ -1869,12 +1870,12 @@ public void shouldSupportNestedArrays() throws InterruptedException { } @Test - @FixFor({ "DBZ-5834" }) + @FixFor({ "DBZ-5834", "DBZ-6774" }) public void shouldAddUpdateDescription() throws Exception { waitForStreamingRunning(); final Map props = new HashMap<>(); - props.put(ADD_HEADERS, "updateDescription.updatedFields"); + props.put(ADD_HEADERS, "updateDescription.updatedFields,nonexistentField,version"); props.put(ADD_HEADERS_PREFIX, "prefix."); transformation.configure(props); @@ -1915,6 +1916,8 @@ public void shouldAddUpdateDescription() throws Exception { // verify headers final String expectedUpdateFields = "{\"name\": \"Mary\", \"zipcode\": \"11111\"}"; assertThat(getSourceRecordHeaderByKey(transformed, "prefix.updateDescription_updatedFields")).isEqualTo(expectedUpdateFields); + assertThat(getSourceRecordHeaderByKey(transformed, "prefix.nonexistentField")).isNull(); + assertThat(getSourceRecordHeaderByKey(transformed, "prefix.version")).isEqualTo(Module.version()); } @Test