DBZ-1169 Support setting Record timestamp

It's a good practice to depend on the Kafka metadata instead of custom
dates in the payload, this way for instance when using KStreams with
Tumbling Window the dates are correctly matched.
This commit is contained in:
Renato Mefi 2019-03-22 13:50:53 +01:00 committed by Jiri Pechanec
parent 8b689d7f03
commit 2a8806e15c
3 changed files with 91 additions and 5 deletions

View File

@ -40,6 +40,7 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private String fieldEventId;
private String fieldEventKey;
private String fieldEventType;
private String fieldEventTimestamp;
private String fieldPayload;
private String fieldPayloadId;
@ -73,7 +74,9 @@ public R apply(R r) {
final R afterRecord = afterExtractor.apply(r);
Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event");
Long timestamp = debeziumEventValue.getInt64("ts_ms");
Long timestamp = fieldEventTimestamp == null
? debeziumEventValue.getInt64("ts_ms")
: eventStruct.getInt64(fieldEventTimestamp);
String eventId = eventStruct.getString(fieldEventId);
String eventType = eventStruct.getString(fieldEventType);
@ -143,6 +146,7 @@ public void configure(Map<String, ?> configMap) {
fieldEventId = config.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
fieldEventKey = config.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE);
fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);

View File

@ -85,6 +85,14 @@ public static InvalidOperationBehavior parse(String value, String defaultValue)
.withDefault("type")
.withDescription("The column which contains the Event Type within the outbox table");
static final Field FIELD_EVENT_TIMESTAMP = Field.create("table.field.event.timestamp")
.withDisplayName("Event Timestamp Field")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Optionally you can override the Kafka message timestamp with a value from a chosen" +
" field, otherwise it'll be the debezium event processed timestamp.");
static final Field FIELD_PAYLOAD = Field.create("table.field.payload")
.withDisplayName("Event Payload Field")
.withType(ConfigDef.Type.STRING)
@ -152,7 +160,7 @@ public static ConfigDef configDef() {
Field.group(
config,
"Table",
FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID
FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP
);
Field.group(
config,

View File

@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.fest.assertions.Assertions.assertThat;
/**
@ -119,6 +120,54 @@ public void failsOnInvalidSetMessageKey() {
router.apply(eventRecord);
}
@Test
public void canSetTimestampFromDebeziumEnvelopeByDefault() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
router.configure(config);
final SourceRecord userEventRecord = createEventRecord();
final SourceRecord userEventRouted = router.apply(userEventRecord);
Struct userEvent = requireStruct(userEventRecord.value(), "Test timestamp");
Long expectedTimestamp = userEvent.getInt64("ts_ms");
assertThat(userEventRecord.timestamp()).isNull();
assertThat(userEventRouted.timestamp()).isEqualTo(expectedTimestamp);
}
@Test
public void canSetTimestampByUserDefinedConfiguration() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP.name(), "event_timestamp"
);
router.configure(config);
Long expectedTimestamp = 14222264625338L;
Map<String, Schema> extraFields = new HashMap<>();
extraFields.put("event_timestamp", Schema.INT64_SCHEMA);
Map<String, Object> extraValues = new HashMap<>();
extraValues.put("event_timestamp", expectedTimestamp);
final SourceRecord userEventRecord = createEventRecord(
"166080d9-3b0e-4a04-81fe-2058a7386f1f",
"UserCreated",
"420b186d",
"User",
"{}",
extraFields,
extraValues
);
final SourceRecord userEventRouted = router.apply(userEventRecord);
assertThat(userEventRecord.timestamp()).isNull();
assertThat(userEventRouted.timestamp()).isEqualTo(expectedTimestamp);
}
@Test
public void canRouteBasedOnField() {
final EventRouter<SourceRecord> router = new EventRouter<>();
@ -216,13 +265,36 @@ private SourceRecord createEventRecord(
String payloadType,
String payload
) {
final Schema recordSchema = SchemaBuilder.struct()
return createEventRecord(
eventId,
eventType,
payloadId,
payloadType,
payload,
new HashMap<>(),
new HashMap<>()
);
}
private SourceRecord createEventRecord(
String eventId,
String eventType,
String payloadId,
String payloadType,
String payload,
Map<String, Schema> extraFields,
Map<String, Object> extraValues
) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct()
.field("id", SchemaBuilder.string())
.field("aggregatetype", SchemaBuilder.string())
.field("aggregateid", SchemaBuilder.string())
.field("type", SchemaBuilder.string())
.field("payload", SchemaBuilder.string())
.build();
.field("payload", SchemaBuilder.string());
extraFields.forEach(schemaBuilder::field);
final Schema recordSchema = schemaBuilder.build();
Envelope envelope = Envelope.defineSchema()
.withName("event.Envelope")
@ -237,6 +309,8 @@ private SourceRecord createEventRecord(
before.put("type", eventType);
before.put("payload", payload);
extraValues.forEach(before::put);
final Struct body = envelope.create(before, null, System.nanoTime());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), body);
}