DBZ-409 Indentation fix
This commit is contained in:
parent
d6dbf02f4c
commit
1cfc8c3596
@ -8,23 +8,23 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||
import org.apache.kafka.connect.data.Struct;
|
||||
import org.bson.BsonDocument;
|
||||
import org.bson.BsonType;
|
||||
import org.bson.BsonValue;
|
||||
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* MongoDataConverter handles translating MongoDB strings to Kafka Connect schemas and row data to Kafka
|
||||
* Connect records.
|
||||
*
|
||||
* @author Sairam Polavarapu
|
||||
*/
|
||||
public class MongoDataConverter {
|
||||
static SchemaBuilder builder = SchemaBuilder.struct();
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MongoDataConverter.class);
|
||||
|
||||
@ -189,7 +189,6 @@ public static void convertFieldValue(Entry<String, BsonValue> keyvalueforStruct,
|
||||
}
|
||||
|
||||
public static void addFieldSchema(Entry<String, BsonValue> keyValuesforSchema, SchemaBuilder builder) {
|
||||
|
||||
String key = keyValuesforSchema.getKey();
|
||||
BsonType type = keyValuesforSchema.getValue().getBsonType();
|
||||
|
||||
|
@ -5,6 +5,12 @@
|
||||
*/
|
||||
package io.debezium.connector.mongodb.transforms;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.connect.connector.ConnectRecord;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
@ -20,24 +26,19 @@
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
/**
|
||||
* Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse
|
||||
* the string and insert the document as it is represented in the Source. so a user use this SMT to parse the String
|
||||
* and insert the MongoDB document in the JSON format..
|
||||
* the string and insert the document as it is represented in the Source. so a user use this SMT to parse the String
|
||||
* and insert the MongoDB document in the JSON format.
|
||||
*
|
||||
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
|
||||
* @author Sairam Polavarapu.
|
||||
* @author Sairam Polavarapu
|
||||
*/
|
||||
public class StringToJSON<R extends ConnectRecord<R>> implements Transformation<R> {
|
||||
|
||||
final ExtractField<R> afterExtractor = new ExtractField.Value<R>();
|
||||
final ExtractField<R> patchExtractor = new ExtractField.Value<R>();
|
||||
final ExtractField<R> keyExtractor = new ExtractField.Key<R>();
|
||||
private final ExtractField<R> afterExtractor = new ExtractField.Value<R>();
|
||||
private final ExtractField<R> patchExtractor = new ExtractField.Value<R>();
|
||||
private final ExtractField<R> keyExtractor = new ExtractField.Key<R>();
|
||||
|
||||
@Override
|
||||
public R apply(R r) {
|
||||
@ -50,21 +51,22 @@ public R apply(R r) {
|
||||
final R afterRecord = afterExtractor.apply(r);
|
||||
|
||||
if (afterRecord.value() == null) {
|
||||
final R patchRecord = patchExtractor.apply(r);
|
||||
final R key = keyExtractor.apply(r);
|
||||
ObjectNode patchEventWKey = mapper.createObjectNode();
|
||||
JsonNode patchEvent = null;
|
||||
try {
|
||||
patchEvent = mapper.readTree(patchRecord.value().toString());
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
final R patchRecord = patchExtractor.apply(r);
|
||||
final R key = keyExtractor.apply(r);
|
||||
ObjectNode patchEventWKey = mapper.createObjectNode();
|
||||
JsonNode patchEvent = null;
|
||||
|
||||
try {
|
||||
patchEvent = mapper.readTree(patchRecord.value().toString());
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
patchEventWKey.set("$set",patchEvent.path("$set"));
|
||||
boolean hasId = patchEvent.has("_id");
|
||||
if(hasId) {
|
||||
patchEventWKey.set("_id", patchEvent.path("_id"));
|
||||
patchEventWKey.set("_id", patchEvent.path("_id"));
|
||||
} else {
|
||||
patchEventWKey.set("_id", mapper.convertValue(key.key(), JsonNode.class));
|
||||
}
|
||||
@ -72,7 +74,7 @@ public R apply(R r) {
|
||||
value = BsonDocument.parse(patchEventWKey.toString());
|
||||
BsonDocument doc = new BsonDocument().append("id", value.get("_id"));
|
||||
Key = BsonDocument.parse(doc.toString());
|
||||
|
||||
|
||||
} else {
|
||||
value = BsonDocument.parse(afterRecord.value().toString());
|
||||
BsonDocument doc = new BsonDocument().append("id", value.get("_id"));
|
||||
@ -88,10 +90,10 @@ public R apply(R r) {
|
||||
Set<Entry<String, BsonValue>> keyValuesforSetSchema = val1.entrySet();
|
||||
for (Entry<String, BsonValue> keyValuesforSetSchemaEntry : keyValuesforSetSchema) {
|
||||
MongoDataConverter.addFieldSchema(keyValuesforSetSchemaEntry, schemabuilder);
|
||||
}
|
||||
} else {
|
||||
MongoDataConverter.addFieldSchema(valuePairsforSchema, schemabuilder);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
MongoDataConverter.addFieldSchema(valuePairsforSchema, schemabuilder);
|
||||
}
|
||||
}
|
||||
|
||||
for (Entry<String, BsonValue> keyPairsforSchema : keyPairs) {
|
||||
@ -109,12 +111,12 @@ public R apply(R r) {
|
||||
Set<Entry<String, BsonValue>> keyvalueforSetStruct = val1.entrySet();
|
||||
for (Entry<String, BsonValue> keyvalueforSetStructEntry : keyvalueforSetStruct) {
|
||||
MongoDataConverter.convertRecord(keyvalueforSetStructEntry, finalValueSchema, finalValueStruct);
|
||||
}
|
||||
} else {
|
||||
MongoDataConverter.convertRecord(valuePairsforStruct, finalValueSchema, finalValueStruct);
|
||||
}
|
||||
} else {
|
||||
MongoDataConverter.convertRecord(valuePairsforStruct, finalValueSchema, finalValueStruct);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
for (Entry<String, BsonValue> keyPairsforStruct : keyPairs) {
|
||||
MongoDataConverter.convertRecord(keyPairsforStruct, finalKeySchema, finalKeyStruct);
|
||||
}
|
||||
@ -128,6 +130,7 @@ public ConfigDef config() {
|
||||
return new ConfigDef();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user