DBZ-1385 eventType can be configured as add field

This commit is contained in:
Jiri Pechanec 2019-08-07 12:14:01 +02:00 committed by Gunnar Morling
parent c07c994434
commit 0991c70ac9
3 changed files with 54 additions and 16 deletions

View File

@ -137,6 +137,43 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
}
@Test
public void shouldSendEventTypeAsHeader() throws Exception {
startConnectorWithInitialSnapshotRecord();
TestHelper.execute(createEventInsert(
UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"),
"UserCreated",
"User",
"10711fa5",
"{}",
""
));
final Map<String, String> config = new HashMap<>();
config.put(
"table.fields.additional.placement",
"type:header:eventType"
);
outboxEventRouter.configure(config);
SourceRecords actualRecords = consumeRecordsByTopic(1);
assertThat(actualRecords.topics().size()).isEqualTo(1);
SourceRecord newEventRecord = actualRecords.recordsForTopic(topicName("outboxsmtit.outbox")).get(0);
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
Struct valueStruct = requireStruct(routedEvent.value(), "test payload");
assertThat(routedEvent.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
assertThat(valueStruct.schema().field("eventType")).isNull();
JsonNode payload = (new ObjectMapper()).readTree(valueStruct.getString("payload"));
assertThat(payload.get("email")).isEqualTo(null);
}
@Test
public void shouldRespectJsonFormatAsString() throws Exception {
startConnectorWithInitialSnapshotRecord();
@ -202,8 +239,8 @@ public void shouldSupportAllFeatures() throws Exception {
// Validate metadata
Schema expectedSchema = SchemaBuilder.struct()
.version(1)
.field("eventType", Schema.STRING_SCHEMA)
.field("payload", Json.builder().optional().build())
.field("eventType", Schema.STRING_SCHEMA)
.field("eventVersion", Schema.INT32_SCHEMA)
.field("aggregateType", Schema.STRING_SCHEMA)
.field("someBoolType", Schema.BOOLEAN_SCHEMA)

View File

@ -41,7 +41,7 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class);
private static final String ENVELOPE_EVENT_TYPE = "eventType";
public static final String ENVELOPE_EVENT_TYPE = "eventType";
private static final String ENVELOPE_PAYLOAD = "payload";
private static final String RECORD_ENVELOPE_SCHEMA_NAME_SUFFIX = ".Envelope";
@ -51,7 +51,6 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private String fieldEventId;
private String fieldEventKey;
private String fieldEventType;
private String fieldEventTimestamp;
private String fieldPayload;
private String fieldPayloadId;
@ -106,7 +105,6 @@ public R apply(R r) {
: eventStruct.getInt64(fieldEventTimestamp);
Object eventId = eventStruct.get(fieldEventId);
Object eventType = eventStruct.get(fieldEventType);
Object payload = eventStruct.get(fieldPayload);
Object payloadId = eventStruct.get(fieldPayloadId);
@ -118,7 +116,6 @@ public R apply(R r) {
: getValueSchema(eventValueSchema, eventStruct.getInt32(fieldSchemaVersion));
Struct value = new Struct(valueSchema)
.put(ENVELOPE_EVENT_TYPE, eventType)
.put(ENVELOPE_PAYLOAD, payload);
additionalFields.forEach((additionalField -> {
@ -211,7 +208,6 @@ public void configure(Map<String, ?> configMap) {
fieldEventId = config.getString(EventRouterConfigDefinition.FIELD_EVENT_ID);
fieldEventKey = config.getString(EventRouterConfigDefinition.FIELD_EVENT_KEY);
fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE);
fieldEventTimestamp = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TIMESTAMP);
fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);
@ -257,7 +253,6 @@ private SchemaBuilder getSchemaBuilder(Schema debeziumEventSchema) {
// Add default fields
schemaBuilder
.field(ENVELOPE_EVENT_TYPE, debeziumEventSchema.field(fieldEventType).schema())
.field(ENVELOPE_PAYLOAD, debeziumEventSchema.field(fieldPayload).schema());
// Add additional fields while keeping the schema inherited from Debezium based on the table column type

View File

@ -272,19 +272,25 @@ public static ConfigDef configDef() {
static List<AdditionalField> parseAdditionalFieldsConfig(Configuration config) {
String extraFieldsMapping = config.getString(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT);
String eventTypeColumn = config.getString(FIELD_EVENT_TYPE);
List<AdditionalField> additionalFields = new ArrayList<>();
if (extraFieldsMapping == null) {
return additionalFields;
boolean eventTypeMappingProvided = false;
if (extraFieldsMapping != null) {
for (String field : extraFieldsMapping.split(",")) {
final String[] parts = field.split(":");
final String fieldName = parts[0];
AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]);
final AdditionalField addField = new AdditionalField(placement, fieldName, parts.length == 3 ? parts[2] : fieldName);
additionalFields.add(addField);
if (EventRouter.ENVELOPE_EVENT_TYPE.equals(addField.getAlias())) {
eventTypeMappingProvided = true;
}
}
}
for (String field : extraFieldsMapping.split(",")) {
final String[] parts = field.split(":");
AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]);
additionalFields.add(
new AdditionalField(placement, parts[0], parts.length == 3 ? parts[2] : parts[0])
);
if (!eventTypeMappingProvided) {
additionalFields.add(0, new AdditionalField(AdditionalFieldPlacement.ENVELOPE, eventTypeColumn, EventRouter.ENVELOPE_EVENT_TYPE));
}
return additionalFields;