From 0af8dcfde0e6203f9a0358e0ee71f78f393182fa Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Tue, 13 Nov 2018 12:30:59 +0100 Subject: [PATCH] DBZ-971 Ensure Envelope unwrap propagates headers --- .../transforms/UnwrapFromEnvelopeTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/debezium-core/src/test/java/io/debezium/transforms/UnwrapFromEnvelopeTest.java b/debezium-core/src/test/java/io/debezium/transforms/UnwrapFromEnvelopeTest.java index 5e394bffc..9e05a5a26 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/UnwrapFromEnvelopeTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/UnwrapFromEnvelopeTest.java @@ -8,11 +8,13 @@ import static org.fest.assertions.Assertions.assertThat; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; @@ -218,4 +220,23 @@ public void testIgnoreUnknownRecord() { assertThat(transform.apply(unnamedSchemaRecord)).isEqualTo(unnamedSchemaRecord); } } + + @Test + public void testUnwrapPropagatesRecordHeaders() { + try (final UnwrapFromEnvelope transform = new UnwrapFromEnvelope<>()) { + final Map props = new HashMap<>(); + transform.configure(props); + + final SourceRecord createRecord = createCreateRecord(); + createRecord.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders"); + + final SourceRecord unwrapped = transform.apply(createRecord); + assertThat(((Struct)unwrapped.value()).getInt8("id")).isEqualTo((byte) 1); + + assertThat(unwrapped.headers()).hasSize(1); + Iterator
headers = unwrapped.headers().allWithName("application/debezium-test-header"); + assertThat(headers.hasNext()).isTrue(); + assertThat(headers.next().value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders"); + } + } }