DBZ-1385 Elevate payload for only headers setup

This commit is contained in:
Jiri Pechanec 2019-08-09 11:48:21 +02:00 committed by Gunnar Morling
parent 0991c70ac9
commit 8adb5e48ca
2 changed files with 24 additions and 13 deletions

View File

@ -146,7 +146,7 @@ public void shouldSendEventTypeAsHeader() throws Exception {
"UserCreated",
"User",
"10711fa5",
"{}",
"{\"email\": \"gh@mefi.in\"}",
""
));
@ -166,11 +166,11 @@ public void shouldSendEventTypeAsHeader() throws Exception {
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
Object value = routedEvent.value();
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
assertThat(valueStruct.schema().field("eventType")).isNull();
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email")).isEqualTo(null);
assertThat(value).isInstanceOf(String.class);
JsonNode payload = (new ObjectMapper()).readTree((String) value);
assertThat(payload.get("email").getTextValue()).isEqualTo("gh@mefi.in");
}

View File

@ -64,6 +64,8 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private Schema defaultValueSchema;
private final Map<Integer, Schema> versionedValueSchema = new HashMap<>();
private boolean onlyHeadersInOutputMessage = false;
@Override
public R apply(R r) {
// Ignoring tombstones
@ -99,6 +101,7 @@ public R apply(R r) {
final R afterRecord = afterExtractor.apply(r);
Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event");
Schema eventValueSchema = afterRecord.valueSchema();
Schema payloadSchema = eventValueSchema.field(fieldPayload).schema();
Long timestamp = fieldEventTimestamp == null
? debeziumEventValue.getInt64("ts_ms")
@ -111,17 +114,18 @@ public R apply(R r) {
Headers headers = r.headers();
headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema());
Schema valueSchema = (fieldSchemaVersion == null)
final Schema structValueSchema = onlyHeadersInOutputMessage ? null :
(fieldSchemaVersion == null)
? getValueSchema(eventValueSchema)
: getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion));
Struct value = new Struct(valueSchema)
.put(ENVELOPE_PAYLOAD, payload);
final Struct structValue = onlyHeadersInOutputMessage ? null :
new Struct(structValueSchema).put(ENVELOPE_PAYLOAD, payload);
additionalFields.forEach((additionalField -> {
switch (additionalField.getPlacement()) {
case ENVELOPE:
value.put(
structValue.put(
additionalField.getAlias(),
eventStruct.get(additionalField.getField())
);
@ -136,17 +140,23 @@ public R apply(R r) {
}
}));
boolean isDeleteEvent = payload == null || payload.toString().trim().isEmpty();
Struct updatedValue;
Object updatedValue;
Schema updatedSchema;
if (isDeleteEvent && routeTombstoneOnEmptyPayload) {
updatedValue = null;
updatedSchema = null;
}
else if (onlyHeadersInOutputMessage) {
updatedValue = payload;
updatedSchema = payloadSchema;
}
else {
updatedValue = value;
updatedSchema = valueSchema;
updatedValue = structValue;
updatedSchema = structValueSchema;
}
R newRecord = r.newRecord(
@ -227,6 +237,7 @@ public void configure(Map<String, ?> configMap) {
afterExtractor.configure(afterExtractorConfig);
additionalFields = parseAdditionalFieldsConfig(config);
onlyHeadersInOutputMessage = !additionalFields.stream().anyMatch(field -> field.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE);
}
private Schema getValueSchema(Schema debeziumEventSchema) {
@ -251,7 +262,7 @@ private Schema getValueSchema(Schema debeziumEventSchema, Integer version) {
private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
// Add default fields
// Add payload field
schemaBuilder
.field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema());