diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 19eeb1c9d..4c24215f0 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -36,6 +36,9 @@ public class EventRouter> implements Transformation afterExtractor = new ExtractField.Value<>(); private final RegexRouter regexRouter = new RegexRouter<>(); private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior; @@ -77,30 +80,42 @@ public R apply(R r) { final R afterRecord = afterExtractor.apply(r); Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event"); + Schema eventValueSchema = afterRecord.valueSchema(); Long timestamp = fieldEventTimestamp == null ? debeziumEventValue.getInt64("ts_ms") : eventStruct.getInt64(fieldEventTimestamp); - String eventId = eventStruct.getString(fieldEventId); - String eventType = eventStruct.getString(fieldEventType); - String payload = eventStruct.getString(fieldPayload); - String payloadId = eventStruct.getString(fieldPayloadId); + Object eventId = eventStruct.get(fieldEventId); + Object eventType = eventStruct.get(fieldEventType); + Object payload = eventStruct.get(fieldPayload); + Object payloadId = eventStruct.get(fieldPayloadId); Headers headers = r.headers(); - headers.addString("id", eventId); + headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema()); + + if (valueSchema == null) { + valueSchema = buildValueSchema(eventValueSchema); + } Struct value = new Struct(valueSchema) - .put("eventType", eventType) - .put("payload", payload); + .put(ENVELOPE_EVENT_TYPE, eventType) + .put(ENVELOPE_PAYLOAD, payload); additionalFields.forEach((additionalField -> { switch (additionalField.getPlacement()) { case ENVELOPE: - value.put(additionalField.getAlias(), eventStruct.getString(additionalField.getField())); + value.put( + additionalField.getAlias(), + eventStruct.get(additionalField.getField()) + ); break; case HEADER: - headers.addString(additionalField.getAlias(), eventStruct.getString(additionalField.getField())); + headers.add( + additionalField.getAlias(), + eventStruct.get(additionalField.getField()), + eventValueSchema.field(additionalField.getField()).schema() + ); break; } })); @@ -119,10 +134,10 @@ public R apply(R r) { return regexRouter.apply(newRecord); } - private String defineRecordKey(Struct eventStruct, String fallbackKey) { - String eventKey = null; + private Object defineRecordKey(Struct eventStruct, Object fallbackKey) { + Object eventKey = null; if (fieldEventKey != null) { - eventKey = eventStruct.getString(fieldEventKey); + eventKey = eventStruct.get(fieldEventKey); } return (eventKey != null) ? eventKey : fallbackKey; @@ -179,22 +194,23 @@ public void configure(Map configMap) { afterExtractor.configure(afterExtractorConfig); additionalFields = parseAdditionalFieldsConfig(config); - - valueSchema = buildValueSchema(); } - private Schema buildValueSchema() { + private Schema buildValueSchema(Schema debeziumEventSchema) { SchemaBuilder schemaBuilder = SchemaBuilder.struct(); // Add default fields schemaBuilder - .field("eventType", Schema.STRING_SCHEMA) - .field("payload", Schema.STRING_SCHEMA); + .field(ENVELOPE_EVENT_TYPE, debeziumEventSchema.field(fieldEventType).schema()) + .field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema()); - // Add additional fields + // Add additional fields while keeping the schema inherited from Debezium based on the table column type additionalFields.forEach((additionalField -> { if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) { - schemaBuilder.field(additionalField.getAlias(), Schema.STRING_SCHEMA); + schemaBuilder.field( + additionalField.getAlias(), + debeziumEventSchema.field(additionalField.getField()).schema() + ); } })); diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java index 60975f6a6..87e19563b 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java @@ -256,6 +256,65 @@ public void canConfigureEveryTableField() { assertThat(header.value()).isEqualTo("da8d6de6-3b77-45ff-8f44-57db55a7a06c"); } + @Test + public void canInfluenceTableColumnTypes() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id"); + config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id"); + config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type"); + config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body"); + config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field"); + config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "some_boolean:envelope:bool"); + router.configure(config); + + final Schema recordSchema = SchemaBuilder.struct() + .field("event_id", SchemaBuilder.int32()) + .field("payload_id", SchemaBuilder.int32()) + .field("my_route_field", SchemaBuilder.string()) + .field("event_type", SchemaBuilder.bytes()) + .field("payload_body", SchemaBuilder.bytes()) + .field("some_boolean", SchemaBuilder.bool()) + .build(); + + Envelope envelope = Envelope.defineSchema() + .withName("event.Envelope") + .withRecord(recordSchema) + .withSource(SchemaBuilder.struct().build()) + .build(); + + final Struct before = new Struct(recordSchema); + before.put("event_id", 2); + before.put("payload_id", 1232); + before.put("event_type", "CoolSchemaCreated".getBytes()); + before.put("my_route_field", "routename"); + before.put("payload_body", "{}".getBytes()); + before.put("some_boolean", true); + + final Struct payload = envelope.create(before, null, System.nanoTime()); + final SourceRecord eventRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), payload); + + final SourceRecord eventRouted = router.apply(eventRecord); + + assertThat(eventRouted).isNotNull(); + assertThat(eventRouted.topic()).isEqualTo("outbox.event.routename"); + + // validate the valueSchema + Schema valueSchema = eventRouted.valueSchema(); + assertThat(valueSchema.field("eventType").schema().type()).isEqualTo(SchemaBuilder.bytes().type()); + assertThat(valueSchema.field("payload").schema().type()).isEqualTo(SchemaBuilder.bytes().type()); + assertThat(valueSchema.field("bool").schema().type()).isEqualTo(SchemaBuilder.bool().type()); + + assertThat(((Struct) eventRouted.value()).get("payload")).isEqualTo("{}".getBytes()); + assertThat(eventRouted.key()).isEqualTo(1232); + + Headers headers = eventRouted.headers(); + assertThat(headers.size()).isEqualTo(1); + Header header = headers.iterator().next(); + assertThat(header.key()).isEqualTo("id"); + assertThat(header.value()).isEqualTo(2); + } + @Test public void canSetPayloadTypeIntoTheEnvelope() { final EventRouter router = new EventRouter<>();