diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 997d6e3e5..0e7bbd8f4 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -40,6 +40,7 @@ public class EventRouter> implements Transformation configMap) { fieldEventId = config.getString(EventRouterConfigDefinition.FIELD_EVENT_ID); fieldEventKey = config.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY); fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE); + fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP); fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD); fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID); diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java index 5ceea20e4..e16a5e684 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java @@ -85,6 +85,14 @@ public static InvalidOperationBehavior parse(String value, String defaultValue) .withDefault("type") .withDescription("The column which contains the Event Type within the outbox table"); + static final Field FIELD_EVENT_TIMESTAMP = Field.create("table.field.event.timestamp") + .withDisplayName("Event Timestamp Field") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Optionally you can override the Kafka message timestamp with a value from a chosen" + + " field, otherwise it'll be the debezium event processed timestamp."); + static final Field FIELD_PAYLOAD = Field.create("table.field.payload") .withDisplayName("Event Payload Field") .withType(ConfigDef.Type.STRING) @@ -152,7 +160,7 @@ public static ConfigDef configDef() { Field.group( config, "Table", - FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID + FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP ); Field.group( config, diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java index 5d08c3462..b8a78d0e6 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; import static org.fest.assertions.Assertions.assertThat; /** @@ -119,6 +120,54 @@ public void failsOnInvalidSetMessageKey() { router.apply(eventRecord); } + @Test + public void canSetTimestampFromDebeziumEnvelopeByDefault() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + router.configure(config); + + final SourceRecord userEventRecord = createEventRecord(); + final SourceRecord userEventRouted = router.apply(userEventRecord); + + Struct userEvent = requireStruct(userEventRecord.value(), "Test timestamp"); + Long expectedTimestamp = userEvent.getInt64("ts_ms"); + + assertThat(userEventRecord.timestamp()).isNull(); + assertThat(userEventRouted.timestamp()).isEqualTo(expectedTimestamp); + } + + @Test + public void canSetTimestampByUserDefinedConfiguration() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put( + EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "event_timestamp" + ); + router.configure(config); + + Long expectedTimestamp = 14222264625338L; + + Map extraFields = new HashMap<>(); + extraFields.put("event_timestamp", Schema.INT64_SCHEMA); + + Map extraValues = new HashMap<>(); + extraValues.put("event_timestamp", expectedTimestamp); + + final SourceRecord userEventRecord = createEventRecord( + "166080d9-3b0e-4a04-81fe-2058a7386f1f", + "UserCreated", + "420b186d", + "User", + "{}", + extraFields, + extraValues + ); + final SourceRecord userEventRouted = router.apply(userEventRecord); + + assertThat(userEventRecord.timestamp()).isNull(); + assertThat(userEventRouted.timestamp()).isEqualTo(expectedTimestamp); + } + @Test public void canRouteBasedOnField() { final EventRouter router = new EventRouter<>(); @@ -216,13 +265,36 @@ private SourceRecord createEventRecord( String payloadType, String payload ) { - final Schema recordSchema = SchemaBuilder.struct() + return createEventRecord( + eventId, + eventType, + payloadId, + payloadType, + payload, + new HashMap<>(), + new HashMap<>() + ); + } + + private SourceRecord createEventRecord( + String eventId, + String eventType, + String payloadId, + String payloadType, + String payload, + Map extraFields, + Map extraValues + ) { + SchemaBuilder schemaBuilder = SchemaBuilder.struct() .field("id", SchemaBuilder.string()) .field("aggregatetype", SchemaBuilder.string()) .field("aggregateid", SchemaBuilder.string()) .field("type", SchemaBuilder.string()) - .field("payload", SchemaBuilder.string()) - .build(); + .field("payload", SchemaBuilder.string()); + + extraFields.forEach(schemaBuilder::field); + + final Schema recordSchema = schemaBuilder.build(); Envelope envelope = Envelope.defineSchema() .withName("event.Envelope") @@ -237,6 +309,8 @@ private SourceRecord createEventRecord( before.put("type", eventType); before.put("payload", payload); + extraValues.forEach(before::put); + final Struct body = envelope.create(before, null, System.nanoTime()); return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), body); }