DBZ-971 Ensure Envelope unwrap propagates headers

This commit is contained in:
Renato Mefi 2018-11-13 12:30:59 +01:00 committed by Gunnar Morling
parent de7c065b43
commit 0af8dcfde0

View File

@ -8,11 +8,13 @@
import static org.fest.assertions.Assertions.assertThat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;
@ -218,4 +220,23 @@ public void testIgnoreUnknownRecord() {
assertThat(transform.apply(unnamedSchemaRecord)).isEqualTo(unnamedSchemaRecord);
}
}
@Test
public void testUnwrapPropagatesRecordHeaders() {
try (final UnwrapFromEnvelope<SourceRecord> transform = new UnwrapFromEnvelope<>()) {
final Map<String, String> props = new HashMap<>();
transform.configure(props);
final SourceRecord createRecord = createCreateRecord();
createRecord.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
final SourceRecord unwrapped = transform.apply(createRecord);
assertThat(((Struct)unwrapped.value()).getInt8("id")).isEqualTo((byte) 1);
assertThat(unwrapped.headers()).hasSize(1);
Iterator<Header> headers = unwrapped.headers().allWithName("application/debezium-test-header");
assertThat(headers.hasNext()).isTrue();
assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders");
}
}
}