DBZ-971 Ensure Mongo unwrap propagates headers

This commit is contained in:
Renato Mefi 2018-11-13 11:43:31 +01:00 committed by Gunnar Morling
parent 312b976c0d
commit 4b0a84f587
2 changed files with 36 additions and 11 deletions

View File

@ -282,17 +282,17 @@ public R apply(R r) {
if (flattenStruct) {
final R flattenRecord = recordFlattener.apply(r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema,
finalKeyStruct, finalValueSchema, finalValueStruct,r.timestamp()));
finalKeyStruct, finalValueSchema, finalValueStruct, r.timestamp(), r.headers()));
return flattenRecord;
}
else {
if (finalValueSchema.fields().isEmpty()) {
return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct, null, null,
r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema,
finalKeyStruct, null, null, r.timestamp(), r.headers());
}
else {
return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema, finalKeyStruct, finalValueSchema, finalValueStruct,
r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), finalKeySchema,
finalKeyStruct, finalValueSchema, finalValueStruct, r.timestamp());
}
}
}

View File

@ -7,16 +7,12 @@
import static org.fest.assertions.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonTimestamp;
import org.bson.Document;
@ -346,6 +342,35 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
assertThat(value).isNull();
}
@Test
public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document().append("$set", new Document("name", "Sally"));
// given
Document event = new Document().append("o", obj)
.append("o2", objId)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "u");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
record.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
// when
SourceRecord transformedRecord = transformation.apply(record);
assertThat(transformedRecord.headers()).hasSize(1);
Iterator<Header> headers = transformedRecord.headers().allWithName("application/debezium-test-header");
assertThat(headers.hasNext()).isTrue();
assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders");
}
@Test
public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");