DBZ-1169 Schema should inherit from original type

To allow more flexibility we should proxy the new Event Schema fields
based on the original table columns which Debezium is capturing.
The Schema is now built one time during the first Record in order to
detect those types which are only available within the Record.
This commit is contained in:
Renato Mefi 2019-03-24 14:08:14 +01:00 committed by Jiri Pechanec
parent c14ce54a6f
commit 4aec4d5e92
2 changed files with 94 additions and 19 deletions

View File

@ -36,6 +36,9 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class); private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class);
private static final String ENVELOPE_EVENT_TYPE = "eventType";
private static final String ENVELOPE_PAYLOAD = "payload";
private final ExtractField<R> afterExtractor = new ExtractField.Value<>(); private final ExtractField<R> afterExtractor = new ExtractField.Value<>();
private final RegexRouter<R> regexRouter = new RegexRouter<>(); private final RegexRouter<R> regexRouter = new RegexRouter<>();
private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior; private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior;
@ -77,30 +80,42 @@ public R apply(R r) {
final R afterRecord = afterExtractor.apply(r); final R afterRecord = afterExtractor.apply(r);
Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event"); Struct eventStruct = requireStruct(afterRecord.value(), "Read Outbox Event");
Schema eventValueSchema = afterRecord.valueSchema();
Long timestamp = fieldEventTimestamp == null Long timestamp = fieldEventTimestamp == null
? debeziumEventValue.getInt64("ts_ms") ? debeziumEventValue.getInt64("ts_ms")
: eventStruct.getInt64(fieldEventTimestamp); : eventStruct.getInt64(fieldEventTimestamp);
String eventId = eventStruct.getString(fieldEventId); Object eventId = eventStruct.get(fieldEventId);
String eventType = eventStruct.getString(fieldEventType); Object eventType = eventStruct.get(fieldEventType);
String payload = eventStruct.getString(fieldPayload); Object payload = eventStruct.get(fieldPayload);
String payloadId = eventStruct.getString(fieldPayloadId); Object payloadId = eventStruct.get(fieldPayloadId);
Headers headers = r.headers(); Headers headers = r.headers();
headers.addString("id", eventId); headers.add("id", eventId, eventValueSchema.field(fieldEventId).schema());
if (valueSchema == null) {
valueSchema = buildValueSchema(eventValueSchema);
}
Struct value = new Struct(valueSchema) Struct value = new Struct(valueSchema)
.put("eventType", eventType) .put(ENVELOPE_EVENT_TYPE, eventType)
.put("payload", payload); .put(ENVELOPE_PAYLOAD, payload);
additionalFields.forEach((additionalField -> { additionalFields.forEach((additionalField -> {
switch (additionalField.getPlacement()) { switch (additionalField.getPlacement()) {
case ENVELOPE: case ENVELOPE:
value.put(additionalField.getAlias(), eventStruct.getString(additionalField.getField())); value.put(
additionalField.getAlias(),
eventStruct.get(additionalField.getField())
);
break; break;
case HEADER: case HEADER:
headers.addString(additionalField.getAlias(), eventStruct.getString(additionalField.getField())); headers.add(
additionalField.getAlias(),
eventStruct.get(additionalField.getField()),
eventValueSchema.field(additionalField.getField()).schema()
);
break; break;
} }
})); }));
@ -119,10 +134,10 @@ public R apply(R r) {
return regexRouter.apply(newRecord); return regexRouter.apply(newRecord);
} }
private String defineRecordKey(Struct eventStruct, String fallbackKey) { private Object defineRecordKey(Struct eventStruct, Object fallbackKey) {
String eventKey = null; Object eventKey = null;
if (fieldEventKey != null) { if (fieldEventKey != null) {
eventKey = eventStruct.getString(fieldEventKey); eventKey = eventStruct.get(fieldEventKey);
} }
return (eventKey != null) ? eventKey : fallbackKey; return (eventKey != null) ? eventKey : fallbackKey;
@ -179,22 +194,23 @@ public void configure(Map<String, ?> configMap) {
afterExtractor.configure(afterExtractorConfig); afterExtractor.configure(afterExtractorConfig);
additionalFields = parseAdditionalFieldsConfig(config); additionalFields = parseAdditionalFieldsConfig(config);
valueSchema = buildValueSchema();
} }
private Schema buildValueSchema() { private Schema buildValueSchema(Schema debeziumEventSchema) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct(); SchemaBuilder schemaBuilder = SchemaBuilder.struct();
// Add default fields // Add default fields
schemaBuilder schemaBuilder
.field("eventType", Schema.STRING_SCHEMA) .field(ENVELOPE_EVENT_TYPE, debeziumEventSchema.field(fieldEventType).schema())
.field("payload", Schema.STRING_SCHEMA); .field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema());
// Add additional fields // Add additional fields while keeping the schema inherited from Debezium based on the table column type
additionalFields.forEach((additionalField -> { additionalFields.forEach((additionalField -> {
if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) { if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) {
schemaBuilder.field(additionalField.getAlias(), Schema.STRING_SCHEMA); schemaBuilder.field(
additionalField.getAlias(),
debeziumEventSchema.field(additionalField.getField()).schema()
);
} }
})); }));

View File

@ -256,6 +256,65 @@ public void canConfigureEveryTableField() {
assertThat(header.value()).isEqualTo("da8d6de6-3b77-45ff-8f44-57db55a7a06c"); assertThat(header.value()).isEqualTo("da8d6de6-3b77-45ff-8f44-57db55a7a06c");
} }
@Test
public void canInfluenceTableColumnTypes() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "my_route_field");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "some_boolean:envelope:bool");
router.configure(config);
final Schema recordSchema = SchemaBuilder.struct()
.field("event_id", SchemaBuilder.int32())
.field("payload_id", SchemaBuilder.int32())
.field("my_route_field", SchemaBuilder.string())
.field("event_type", SchemaBuilder.bytes())
.field("payload_body", SchemaBuilder.bytes())
.field("some_boolean", SchemaBuilder.bool())
.build();
Envelope envelope = Envelope.defineSchema()
.withName("event.Envelope")
.withRecord(recordSchema)
.withSource(SchemaBuilder.struct().build())
.build();
final Struct before = new Struct(recordSchema);
before.put("event_id", 2);
before.put("payload_id", 1232);
before.put("event_type", "CoolSchemaCreated".getBytes());
before.put("my_route_field", "routename");
before.put("payload_body", "{}".getBytes());
before.put("some_boolean", true);
final Struct payload = envelope.create(before, null, System.nanoTime());
final SourceRecord eventRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), payload);
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(eventRouted).isNotNull();
assertThat(eventRouted.topic()).isEqualTo("outbox.event.routename");
// validate the valueSchema
Schema valueSchema = eventRouted.valueSchema();
assertThat(valueSchema.field("eventType").schema().type()).isEqualTo(SchemaBuilder.bytes().type());
assertThat(valueSchema.field("payload").schema().type()).isEqualTo(SchemaBuilder.bytes().type());
assertThat(valueSchema.field("bool").schema().type()).isEqualTo(SchemaBuilder.bool().type());
assertThat(((Struct) eventRouted.value()).get("payload")).isEqualTo("{}".getBytes());
assertThat(eventRouted.key()).isEqualTo(1232);
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(1);
Header header = headers.iterator().next();
assertThat(header.key()).isEqualTo("id");
assertThat(header.value()).isEqualTo(2);
}
@Test @Test
public void canSetPayloadTypeIntoTheEnvelope() { public void canSetPayloadTypeIntoTheEnvelope() {
final EventRouter<SourceRecord> router = new EventRouter<>(); final EventRouter<SourceRecord> router = new EventRouter<>();