diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java index e2f75a069..2858e3034 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java @@ -282,17 +282,17 @@ public R apply(R r) { if (flattenStruct) { final R flattenRecord = recordFlattener.apply(r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, - finalKeyStruct, finalValueSchema, finalValueStruct,r.timestamp())); + finalKeyStruct, finalValueSchema, finalValueStruct, r.timestamp(), r.headers())); return flattenRecord; } else { if (finalValueSchema.fields().isEmpty()) { - return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct, null, null, - r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, + finalKeyStruct, null, null, r.timestamp(), r.headers()); } else { - return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct, finalValueSchema, finalValueStruct, - r.timestamp()); + return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, + finalKeyStruct, finalValueSchema, finalValueStruct, r.timestamp()); } } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java index 8d537a0a2..61a5e0268 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java @@ -7,16 +7,12 @@ import static org.fest.assertions.Assertions.assertThat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.bson.BsonTimestamp; import org.bson.Document; @@ -346,6 +342,35 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException { assertThat(value).isNull(); } + @Test + public void shouldPropagatePreviousRecordHeaders() throws InterruptedException { + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document().append("$set", new Document("name", "Sally")); + + // given + Document event = new Document().append("o", obj) + .append("o2", objId) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "u"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + SourceRecord record = produced.get(0); + record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders"); + + // when + SourceRecord transformedRecord = transformation.apply(record); + + assertThat(transformedRecord.headers()).hasSize(1); + Iterator
headers = transformedRecord.headers().allWithName("application/debezium-test-header"); + assertThat(headers.hasNext()).isTrue(); + assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders"); + } + @Test public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException { CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");