DBZ-1169 Support additional user fields

This commit is contained in:
Renato Mefi 2019-03-22 16:40:24 +01:00 committed by Jiri Pechanec
parent 2a8806e15c
commit c14ce54a6f
3 changed files with 205 additions and 5 deletions

View File

@ -7,6 +7,7 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
@ -20,8 +21,10 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static io.debezium.transforms.outbox.EventRouterConfigDefinition.parseAdditionalFieldsConfig;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
/** /**
@ -47,6 +50,7 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private String routeByField; private String routeByField;
private Schema valueSchema; private Schema valueSchema;
private List<AdditionalField> additionalFields;
@Override @Override
public R apply(R r) { public R apply(R r) {
@ -90,6 +94,17 @@ public R apply(R r) {
.put("eventType", eventType) .put("eventType", eventType)
.put("payload", payload); .put("payload", payload);
additionalFields.forEach((additionalField -> {
switch (additionalField.getPlacement()) {
case ENVELOPE:
value.put(additionalField.getAlias(), eventStruct.getString(additionalField.getField()));
break;
case HEADER:
headers.addString(additionalField.getAlias(), eventStruct.getString(additionalField.getField()));
break;
}
}));
R newRecord = r.newRecord( R newRecord = r.newRecord(
eventStruct.getString(routeByField).toLowerCase(), eventStruct.getString(routeByField).toLowerCase(),
null, null,
@ -163,9 +178,26 @@ public void configure(Map<String, ?> configMap) {
afterExtractor.configure(afterExtractorConfig); afterExtractor.configure(afterExtractorConfig);
valueSchema = SchemaBuilder.struct() additionalFields = parseAdditionalFieldsConfig(config);
valueSchema = buildValueSchema();
}
private Schema buildValueSchema() {
SchemaBuilder schemaBuilder = SchemaBuilder.struct();
// Add default fields
schemaBuilder
.field("eventType", Schema.STRING_SCHEMA) .field("eventType", Schema.STRING_SCHEMA)
.field("payload", Schema.STRING_SCHEMA) .field("payload", Schema.STRING_SCHEMA);
.build();
// Add additional fields
additionalFields.forEach((additionalField -> {
if (additionalField.getPlacement() == EventRouterConfigDefinition.AdditionalFieldPlacement.ENVELOPE) {
schemaBuilder.field(additionalField.getAlias(), Schema.STRING_SCHEMA);
}
}));
return schemaBuilder.build();
} }
} }

View File

@ -5,10 +5,13 @@
*/ */
package io.debezium.transforms.outbox; package io.debezium.transforms.outbox;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field; import io.debezium.config.Field;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import java.util.ArrayList;
import java.util.List;
/** /**
* Debezium Outbox Transform configuration definition * Debezium Outbox Transform configuration definition
@ -62,6 +65,74 @@ public static InvalidOperationBehavior parse(String value, String defaultValue)
} }
} }
public enum AdditionalFieldPlacement implements EnumeratedValue {
HEADER("header"),
ENVELOPE("envelope");
private final String value;
AdditionalFieldPlacement(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static AdditionalFieldPlacement parse(String value) {
if (value == null) return null;
value = value.trim();
for (AdditionalFieldPlacement option : AdditionalFieldPlacement.values()) {
if (option.getValue().equalsIgnoreCase(value)) return option;
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static AdditionalFieldPlacement parse(String value, String defaultValue) {
AdditionalFieldPlacement mode = parse(value);
if (mode == null && defaultValue != null) mode = parse(defaultValue);
return mode;
}
}
public static class AdditionalField {
private final AdditionalFieldPlacement placement;
private final String field;
private final String alias;
AdditionalField(AdditionalFieldPlacement placement, String field, String alias) {
this.placement = placement;
this.field = field;
this.alias = alias;
}
public AdditionalFieldPlacement getPlacement() {
return placement;
}
public String getField() {
return field;
}
public String getAlias() {
return alias;
}
}
static final Field FIELD_EVENT_ID = Field.create("table.field.event.id") static final Field FIELD_EVENT_ID = Field.create("table.field.event.id")
.withDisplayName("Event ID Field") .withDisplayName("Event ID Field")
.withType(ConfigDef.Type.STRING) .withType(ConfigDef.Type.STRING)
@ -109,6 +180,16 @@ public static InvalidOperationBehavior parse(String value, String defaultValue)
.withDefault("aggregateid") .withDefault("aggregateid")
.withDescription("The column which contains the Payload ID within the outbox table"); .withDescription("The column which contains the Payload ID within the outbox table");
static final Field FIELDS_ADDITIONAL_PLACEMENT = Field.create("table.fields.additional.placement")
.withDisplayName("Settings for each additional column in the outbox table")
.withType(ConfigDef.Type.LIST)
.withValidation(EventRouterConfigDefinition::isListOfStringPairs)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.HIGH)
.withDescription("Extra fields can be added as part of the event envelope or a message header, format" +
" is a list of colon-delimited pairs or trios when you desire to have aliases," +
" e.g. <code>id:header,field_name:envelope:alias</code> ");
static final Field ROUTE_BY_FIELD = Field.create("route.by.field") static final Field ROUTE_BY_FIELD = Field.create("route.by.field")
.withDisplayName("Field to route events by") .withDisplayName("Field to route events by")
.withType(ConfigDef.Type.STRING) .withType(ConfigDef.Type.STRING)
@ -160,7 +241,7 @@ public static ConfigDef configDef() {
Field.group( Field.group(
config, config,
"Table", "Table",
FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_EVENT_TIMESTAMP, FIELDS_ADDITIONAL_PLACEMENT
); );
Field.group( Field.group(
config, config,
@ -174,4 +255,38 @@ public static ConfigDef configDef() {
); );
return config; return config;
} }
public static List<AdditionalField> parseAdditionalFieldsConfig(Configuration config) {
String extraFieldsMapping = config.getString(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT);
List<AdditionalField> additionalFields = new ArrayList<>();
if (extraFieldsMapping == null) {
return additionalFields;
}
for (String field: extraFieldsMapping.split(",")) {
final String[] parts = field.split(":");
AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]);
additionalFields.add(
new AdditionalField(placement, parts[0], parts.length == 3 ? parts[2] : parts[0])
);
}
return additionalFields;
}
private static int isListOfStringPairs(Configuration config, Field field, Field.ValidationOutput problems) {
List<String> value = config.getStrings(field, ",");
int errors = 0;
for (String mapping : value) {
final String[] parts = mapping.split(":");
if (parts.length != 2 && parts.length != 3) {
problems.accept(field, value, "A comma-separated list of valid String pairs or trios " +
"is expected but got: " + value);
++errors;
}
}
return errors;
}
} }

View File

@ -10,6 +10,8 @@
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test; import org.junit.Test;
@ -246,6 +248,57 @@ public void canConfigureEveryTableField() {
assertThat(eventRouted).isNotNull(); assertThat(eventRouted).isNotNull();
assertThat(((Struct) eventRouted.value()).getString("payload")).isEqualTo("{}"); assertThat(((Struct) eventRouted.value()).getString("payload")).isEqualTo("{}");
Headers headers = eventRouted.headers();
assertThat(headers.size()).isEqualTo(1);
Header header = headers.iterator().next();
assertThat(header.key()).isEqualTo("id");
assertThat(header.value()).isEqualTo("da8d6de6-3b77-45ff-8f44-57db55a7a06c");
}
@Test
public void canSetPayloadTypeIntoTheEnvelope() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope");
router.configure(config);
final SourceRecord eventRecord = createEventRecord();
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(((Struct) eventRouted.value()).get("type")).isEqualTo("UserCreated");
}
@Test
public void canSetPayloadTypeIntoTheEnvelopeWithAlias() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type:envelope:aggregateType");
router.configure(config);
final SourceRecord eventRecord = createEventRecord();
final SourceRecord eventRouted = router.apply(eventRecord);
assertThat(((Struct) eventRouted.value()).get("aggregateType")).isEqualTo("UserCreated");
}
@Test
public void canSetMultipleFieldsIntoTheEnvelope() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(),
"type:envelope:payloadType,aggregateid:envelope:payloadId,type:header:payloadType"
);
router.configure(config);
final SourceRecord eventRecord = createEventRecord();
final SourceRecord eventRouted = router.apply(eventRecord);
Struct value = (Struct) eventRouted.value();
assertThat(value.get("payloadType")).isEqualTo("UserCreated");
assertThat(value.get("payloadId")).isEqualTo("10711fa5");
assertThat(eventRouted.headers().lastWithName("payloadType").value()).isEqualTo("UserCreated");
} }
private SourceRecord createEventRecord() { private SourceRecord createEventRecord() {