diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/MongoDataConverter.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/MongoDataConverter.java index 10baba947..03962a4d6 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/MongoDataConverter.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/MongoDataConverter.java @@ -8,23 +8,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; + +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.bson.BsonDocument; import org.bson.BsonType; import org.bson.BsonValue; - -import org.slf4j.LoggerFactory; import org.slf4j.Logger; -import org.apache.kafka.connect.data.Schema; +import org.slf4j.LoggerFactory; /** * MongoDataConverter handles translating MongoDB strings to Kafka Connect schemas and row data to Kafka * Connect records. + * * @author Sairam Polavarapu */ public class MongoDataConverter { - static SchemaBuilder builder = SchemaBuilder.struct(); private static final Logger LOG = LoggerFactory.getLogger(MongoDataConverter.class); @@ -189,7 +189,6 @@ public static void convertFieldValue(Entry keyvalueforStruct, } public static void addFieldSchema(Entry keyValuesforSchema, SchemaBuilder builder) { - String key = keyValuesforSchema.getKey(); BsonType type = keyValuesforSchema.getValue().getBsonType(); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/StringToJSON.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/StringToJSON.java index 4ebb97d8a..4faceab24 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/StringToJSON.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/StringToJSON.java @@ -5,6 +5,12 @@ */ package io.debezium.connector.mongodb.transforms; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; @@ -20,24 +26,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.Map.Entry; - /** * Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse - * the string and insert the document as it is represented in the Source. so a user use this SMT to parse the String - * and insert the MongoDB document in the JSON format.. + * the string and insert the document as it is represented in the Source. so a user use this SMT to parse the String + * and insert the MongoDB document in the JSON format. + * * @param the subtype of {@link ConnectRecord} on which this transformation will operate - * @author Sairam Polavarapu. + * @author Sairam Polavarapu */ public class StringToJSON> implements Transformation { - final ExtractField afterExtractor = new ExtractField.Value(); - final ExtractField patchExtractor = new ExtractField.Value(); - final ExtractField keyExtractor = new ExtractField.Key(); + private final ExtractField afterExtractor = new ExtractField.Value(); + private final ExtractField patchExtractor = new ExtractField.Value(); + private final ExtractField keyExtractor = new ExtractField.Key(); @Override public R apply(R r) { @@ -50,21 +51,22 @@ public R apply(R r) { final R afterRecord = afterExtractor.apply(r); if (afterRecord.value() == null) { - final R patchRecord = patchExtractor.apply(r); - final R key = keyExtractor.apply(r); - ObjectNode patchEventWKey = mapper.createObjectNode(); - JsonNode patchEvent = null; - try { - patchEvent = mapper.readTree(patchRecord.value().toString()); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } + final R patchRecord = patchExtractor.apply(r); + final R key = keyExtractor.apply(r); + ObjectNode patchEventWKey = mapper.createObjectNode(); + JsonNode patchEvent = null; + + try { + patchEvent = mapper.readTree(patchRecord.value().toString()); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } patchEventWKey.set("$set",patchEvent.path("$set")); boolean hasId = patchEvent.has("_id"); if(hasId) { - patchEventWKey.set("_id", patchEvent.path("_id")); + patchEventWKey.set("_id", patchEvent.path("_id")); } else { patchEventWKey.set("_id", mapper.convertValue(key.key(), JsonNode.class)); } @@ -72,7 +74,7 @@ public R apply(R r) { value = BsonDocument.parse(patchEventWKey.toString()); BsonDocument doc = new BsonDocument().append("id", value.get("_id")); Key = BsonDocument.parse(doc.toString()); - + } else { value = BsonDocument.parse(afterRecord.value().toString()); BsonDocument doc = new BsonDocument().append("id", value.get("_id")); @@ -88,10 +90,10 @@ public R apply(R r) { Set> keyValuesforSetSchema = val1.entrySet(); for (Entry keyValuesforSetSchemaEntry : keyValuesforSetSchema) { MongoDataConverter.addFieldSchema(keyValuesforSetSchemaEntry, schemabuilder); - } - } else { - MongoDataConverter.addFieldSchema(valuePairsforSchema, schemabuilder); - } + } + } else { + MongoDataConverter.addFieldSchema(valuePairsforSchema, schemabuilder); + } } for (Entry keyPairsforSchema : keyPairs) { @@ -109,12 +111,12 @@ public R apply(R r) { Set> keyvalueforSetStruct = val1.entrySet(); for (Entry keyvalueforSetStructEntry : keyvalueforSetStruct) { MongoDataConverter.convertRecord(keyvalueforSetStructEntry, finalValueSchema, finalValueStruct); - } - } else { - MongoDataConverter.convertRecord(valuePairsforStruct, finalValueSchema, finalValueStruct); + } + } else { + MongoDataConverter.convertRecord(valuePairsforStruct, finalValueSchema, finalValueStruct); } } - + for (Entry keyPairsforStruct : keyPairs) { MongoDataConverter.convertRecord(keyPairsforStruct, finalKeySchema, finalKeyStruct); } @@ -128,6 +130,7 @@ public ConfigDef config() { return new ConfigDef(); } + @Override public void close() { }