diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java index 227e8896f..0a822644a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java @@ -146,7 +146,7 @@ public void shouldSendEventTypeAsHeader() throws Exception { "UserCreated", "User", "10711fa5", - "{}", + "{\"email\": \"gh@mefi.in\"}", "" )); @@ -166,11 +166,11 @@ public void shouldSendEventTypeAsHeader() throws Exception { assertThat(routedEvent).isNotNull(); assertThat(routedEvent.topic()).isEqualTo("outbox.event.user"); - Struct valueStruct = requireStruct(routedEvent.value(), "test payload"); + Object value = routedEvent.value(); assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated"); - assertThat(valueStruct.schema().field("eventType")).isNull(); - JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload")); - assertThat(payload.get("email")).isEqualTo(null); + assertThat(value).isInstanceOf(String.class); + JsonNode payload = (new ObjectMapper()).readTree((String) value); + assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in"); } diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 0d484e08d..2209be2ca 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -64,6 +64,8 @@ public class EventRouter> implements Transformation versionedValueSchema = new HashMap<>(); + private boolean onlyHeadersInOutputMessage = false; + @Override public R apply(R r) { // Ignoring tombstones @@ -99,6 +101,7 @@ public R apply(R r) { final R afterRecord = afterExtractor.apply(r); Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event"); Schema eventValueSchema = afterRecord.valueSchema(); + Schema payloadSchema = eventValueSchema.field(fieldPayload).schema(); Long timestamp = fieldEventTimestamp == null ? debeziumEventValue.getInt64("ts_ms") @@ -111,17 +114,18 @@ public R apply(R r) { Headers headers = r.headers(); headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema()); - Schema valueSchema = (fieldSchemaVersion == null) + final Schema structValueSchema = onlyHeadersInOutputMessage ? null : + (fieldSchemaVersion == null) ? getValueSchema(eventValueSchema) : getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion)); - Struct value = new Struct(valueSchema) - .put(ENVELOPE_PAYLOAD, payload); + final Struct structValue = onlyHeadersInOutputMessage ? null : + new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload); additionalFields.forEach((additionalField -> { switch (additionalField.getPlacement()) { case ENVELOPE: - value.put( + structValue.put( additionalField.getAlias(), eventStruct.get(additionalField.getField()) ); @@ -136,17 +140,23 @@ public R apply(R r) { } })); + boolean isDeleteEvent = payload == null || payload.toString().trim().isEmpty(); - Struct updatedValue; + Object updatedValue; Schema updatedSchema; + if (isDeleteEvent && routeTombstoneOnEmptyPayload) { updatedValue = null; updatedSchema = null; } + else if (onlyHeadersInOutputMessage) { + updatedValue = payload; + updatedSchema = payloadSchema; + } else { - updatedValue = value; - updatedSchema = valueSchema; + updatedValue = structValue; + updatedSchema = structValueSchema; } R newRecord = r.newRecord( @@ -227,6 +237,7 @@ public void configure(Map configMap) { afterExtractor.configure(afterExtractorConfig); additionalFields = parseAdditionalFieldsConfig(config); + onlyHeadersInOutputMessage = !additionalFields.stream().anyMatch(field -> field.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE); } private Schema getValueSchema(Schema debeziumEventSchema) { @@ -251,7 +262,7 @@ private Schema getValueSchema(Schema debeziumEventSchema, Integer version) { private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) { SchemaBuilder schemaBuilder = SchemaBuilder.struct(); - // Add default fields + // Add payload field schemaBuilder .field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema());