DBZ-989 Support tombstone on MongoDB unwrapper

This commit is contained in:
Renato Mefi 2018-11-18 17:36:00 +01:00 committed by Gunnar Morling
parent 488dfe4f72
commit 16ba3764e5

View File

@ -121,8 +121,36 @@ public static ArrayEncoding parse(String value, String defaultValue) {
private boolean flattenStruct;
private String delimiter;
private R getTombstoneRecord(R r) {
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
final R key = keyExtractor.apply(r);
BsonDocument keyDocument = BsonDocument.parse("{ \"id\" : " + key.key().toString() + "}");
Set<Entry<String, BsonValue>> keyPairs = keyDocument.entrySet();
for (Entry<String, BsonValue> keyPairsForSchema : keyPairs) {
converter.addFieldSchema(keyPairsForSchema, keySchemaBuilder);
}
Schema finalKeySchema = keySchemaBuilder.build();
Struct finalKeyStruct = new Struct(finalKeySchema);
for (Entry<String, BsonValue> keyPairsForStruct : keyPairs) {
converter.convertRecord(keyPairsForStruct, finalKeySchema, finalKeyStruct);
}
return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct,
null, null, r.timestamp());
}
@Override
public R apply(R r) {
// Tombstone message
if (r.value() == null) {
return getTombstoneRecord(r);
}
String newValueSchemaName = r.valueSchema().name();
if (newValueSchemaName.endsWith(".Envelope")) {
newValueSchemaName = newValueSchemaName.substring(0, newValueSchemaName.length() - 9);