DBZ-1169 Implement RegexRouter based on config

This commit is contained in:
Renato Mefi 2019-03-18 20:33:46 +01:00 committed by Jiri Pechanec
parent f0f13ed4a4
commit 8b689d7f03
4 changed files with 83 additions and 25 deletions

View File

@ -106,6 +106,6 @@ public void shouldConsumeRecordsFromInsert() throws Exception {
SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord); SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord);
assertThat(routedEvent).isNotNull(); assertThat(routedEvent).isNotNull();
assertThat(routedEvent.topic()).isEqualTo("user"); assertThat(routedEvent.topic()).isEqualTo("outbox.event.user");
} }
} }

View File

@ -14,6 +14,7 @@
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -33,6 +34,7 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class); private static final Logger LOGGER = LoggerFactory.getLogger(EventRouter.class);
private final ExtractField<R> afterExtractor = new ExtractField.Value<>(); private final ExtractField<R> afterExtractor = new ExtractField.Value<>();
private final RegexRouter<R> regexRouter = new RegexRouter<>();
private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior; private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior;
private String fieldEventId; private String fieldEventId;
@ -40,7 +42,8 @@ public class EventRouter<R extends ConnectRecord<R>> implements Transformation<R
private String fieldEventType; private String fieldEventType;
private String fieldPayload; private String fieldPayload;
private String fieldPayloadId; private String fieldPayloadId;
private String fieldPayloadType;
private String routeByField;
private Schema valueSchema; private Schema valueSchema;
@ -76,7 +79,6 @@ public R apply(R r) {
String eventType = eventStruct.getString(fieldEventType); String eventType = eventStruct.getString(fieldEventType);
String payload = eventStruct.getString(fieldPayload); String payload = eventStruct.getString(fieldPayload);
String payloadId = eventStruct.getString(fieldPayloadId); String payloadId = eventStruct.getString(fieldPayloadId);
String payloadType = eventStruct.getString(fieldPayloadType);
Headers headers = r.headers(); Headers headers = r.headers();
headers.addString("id", eventId); headers.addString("id", eventId);
@ -85,8 +87,8 @@ public R apply(R r) {
.put("eventType", eventType) .put("eventType", eventType)
.put("payload", payload); .put("payload", payload);
return r.newRecord( R newRecord = r.newRecord(
payloadType.toLowerCase(), eventStruct.getString(routeByField).toLowerCase(),
null, null,
Schema.STRING_SCHEMA, Schema.STRING_SCHEMA,
defineRecordKey(eventStruct, payloadId), defineRecordKey(eventStruct, payloadId),
@ -95,6 +97,8 @@ public R apply(R r) {
timestamp, timestamp,
headers headers
); );
return regexRouter.apply(newRecord);
} }
private String defineRecordKey(Struct eventStruct, String fallbackKey) { private String defineRecordKey(Struct eventStruct, String fallbackKey) {
@ -141,7 +145,14 @@ public void configure(Map<String, ?> configMap) {
fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE); fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE);
fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD); fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD);
fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID); fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID);
fieldPayloadType = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_TYPE);
routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD);
final Map<String, String> regexRouterConfig = new HashMap<>();
regexRouterConfig.put("regex", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX));
regexRouterConfig.put("replacement", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT));
regexRouter.configure(regexRouterConfig);
final Map<String, String> afterExtractorConfig = new HashMap<>(); final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", Envelope.FieldName.AFTER); afterExtractorConfig.put("field", Envelope.FieldName.AFTER);

View File

@ -101,14 +101,6 @@ public static InvalidOperationBehavior parse(String value, String defaultValue)
.withDefault("aggregateid") .withDefault("aggregateid")
.withDescription("The column which contains the Payload ID within the outbox table"); .withDescription("The column which contains the Payload ID within the outbox table");
static final Field FIELD_PAYLOAD_TYPE = Field.create("table.field.payload.type")
.withDisplayName("Event Payload Type Field")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withDefault("aggregatetype")
.withDescription("The column which contains the Payload Type within the outbox table");
static final Field ROUTE_BY_FIELD = Field.create("route.by.field") static final Field ROUTE_BY_FIELD = Field.create("route.by.field")
.withDisplayName("Field to route events by") .withDisplayName("Field to route events by")
.withType(ConfigDef.Type.STRING) .withType(ConfigDef.Type.STRING)
@ -160,7 +152,7 @@ public static ConfigDef configDef() {
Field.group( Field.group(
config, config,
"Table", "Table",
FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_PAYLOAD_TYPE FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID
); );
Field.group( Field.group(
config, config,

View File

@ -119,20 +119,60 @@ public void failsOnInvalidSetMessageKey() {
router.apply(eventRecord); router.apply(eventRecord);
} }
@Test
public void canRouteBasedOnField() {
final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>();
config.put(
EventRouterConfigDefinition.ROUTE_BY_FIELD.name(),
"aggregatetype"
);
router.configure(config);
final SourceRecord userEventRecord = createEventRecord();
final SourceRecord userEventRouted = router.apply(userEventRecord);
assertThat(userEventRouted).isNotNull();
assertThat(userEventRouted.topic()).isEqualTo("outbox.event.user");
final SourceRecord userUpdatedEventRecord = createEventRecord(
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
"UserUpdate",
"10711fa5",
"User",
"{}"
);
final SourceRecord userUpdatedEventRouted = router.apply(userUpdatedEventRecord);
assertThat(userUpdatedEventRouted).isNotNull();
assertThat(userUpdatedEventRouted.topic()).isEqualTo("outbox.event.user");
final SourceRecord addressCreatedEventRecord = createEventRecord(
"ab720dd3-176d-40a6-96f3-6cf961d7df6a",
"AddressCreated",
"10711fa5",
"Address",
"{}"
);
final SourceRecord addressCreatedEventRouted = router.apply(addressCreatedEventRecord);
assertThat(addressCreatedEventRouted).isNotNull();
assertThat(addressCreatedEventRouted.topic()).isEqualTo("outbox.event.address");
}
@Test @Test
public void canConfigureEveryTableField() { public void canConfigureEveryTableField() {
final EventRouter<SourceRecord> router = new EventRouter<>(); final EventRouter<SourceRecord> router = new EventRouter<>();
final Map<String, String> config = new HashMap<>(); final Map<String, String> config = new HashMap<>();
config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id"); config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id");
config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_TYPE.name(), "payload_type");
config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id"); config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id");
config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type"); config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type");
config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body"); config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body");
config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "payload_id");
router.configure(config); router.configure(config);
final Schema recordSchema = SchemaBuilder.struct() final Schema recordSchema = SchemaBuilder.struct()
.field("event_id", SchemaBuilder.string()) .field("event_id", SchemaBuilder.string())
.field("payload_type", SchemaBuilder.string())
.field("payload_id", SchemaBuilder.string()) .field("payload_id", SchemaBuilder.string())
.field("event_type", SchemaBuilder.string()) .field("event_type", SchemaBuilder.string())
.field("payload_body", SchemaBuilder.string()) .field("payload_body", SchemaBuilder.string())
@ -146,7 +186,6 @@ public void canConfigureEveryTableField() {
final Struct before = new Struct(recordSchema); final Struct before = new Struct(recordSchema);
before.put("event_id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c"); before.put("event_id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c");
before.put("payload_type", "User");
before.put("payload_id", "10711fa5"); before.put("payload_id", "10711fa5");
before.put("event_type", "UserCreated"); before.put("event_type", "UserCreated");
before.put("payload_body", "{}"); before.put("payload_body", "{}");
@ -161,6 +200,22 @@ public void canConfigureEveryTableField() {
} }
private SourceRecord createEventRecord() { private SourceRecord createEventRecord() {
return createEventRecord(
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
"UserCreated",
"10711fa5",
"User",
"{}"
);
}
private SourceRecord createEventRecord(
String eventId,
String eventType,
String payloadId,
String payloadType,
String payload
) {
final Schema recordSchema = SchemaBuilder.struct() final Schema recordSchema = SchemaBuilder.struct()
.field("id", SchemaBuilder.string()) .field("id", SchemaBuilder.string())
.field("aggregatetype", SchemaBuilder.string()) .field("aggregatetype", SchemaBuilder.string())
@ -176,13 +231,13 @@ private SourceRecord createEventRecord() {
.build(); .build();
final Struct before = new Struct(recordSchema); final Struct before = new Struct(recordSchema);
before.put("id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c"); before.put("id", eventId);
before.put("aggregatetype", "User"); before.put("aggregatetype", payloadType);
before.put("aggregateid", "10711fa5"); before.put("aggregateid", payloadId);
before.put("type", "UserCreated"); before.put("type", eventType);
before.put("payload", "{}"); before.put("payload", payload);
final Struct payload = envelope.create(before, null, System.nanoTime()); final Struct body = envelope.create(before, null, System.nanoTime());
return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), payload); return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), body);
} }
} }