diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java index 5b3fc7c82..1c2d72624 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java @@ -121,8 +121,36 @@ public static ArrayEncoding parse(String value, String defaultValue) { private boolean flattenStruct; private String delimiter; + private R getTombstoneRecord(R r) { + SchemaBuilder keySchemaBuilder = SchemaBuilder.struct(); + + final R key = keyExtractor.apply(r); + BsonDocument keyDocument = BsonDocument.parse("{ \"id\" : " + key.key().toString() + "}"); + + Set> keyPairs = keyDocument.entrySet(); + + for (Entry keyPairsForSchema : keyPairs) { + converter.addFieldSchema(keyPairsForSchema, keySchemaBuilder); + } + + Schema finalKeySchema = keySchemaBuilder.build(); + Struct finalKeyStruct = new Struct(finalKeySchema); + + for (Entry keyPairsForStruct : keyPairs) { + converter.convertRecord(keyPairsForStruct, finalKeySchema, finalKeyStruct); + } + + return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct, + null, null, r.timestamp()); + } + @Override public R apply(R r) { + // Tombstone message + if (r.value() == null) { + return getTombstoneRecord(r); + } + String newValueSchemaName = r.valueSchema().name(); if (newValueSchemaName.endsWith(".Envelope")) { newValueSchemaName = newValueSchemaName.substring(0, newValueSchemaName.length() - 9);