From 8b689d7f0376b61ee519d5e0cadb7e19a66eceee Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Mon, 18 Mar 2019 20:33:46 +0100 Subject: [PATCH] DBZ-1169 Implement RegexRouter based on config --- .../postgresql/OutboxEventRouterIT.java | 2 +- .../transforms/outbox/EventRouter.java | 21 ++++-- .../outbox/EventRouterConfigDefinition.java | 10 +-- .../transforms/outbox/EventRouterTest.java | 75 ++++++++++++++++--- 4 files changed, 83 insertions(+), 25 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java index 87dcbcfac..320ff38e0 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/OutboxEventRouterIT.java @@ -106,6 +106,6 @@ public void shouldConsumeRecordsFromInsert() throws Exception { SourceRecord routedEvent = outboxEventRouter.apply(newEventRecord); assertThat(routedEvent).isNotNull(); - assertThat(routedEvent.topic()).isEqualTo("user"); + assertThat(routedEvent.topic()).isEqualTo("outbox.event.user"); } } diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 342c39e8b..997d6e3e5 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -14,6 +14,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.ExtractField; +import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.Transformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ public class EventRouter> implements Transformation afterExtractor = new ExtractField.Value<>(); + private final RegexRouter regexRouter = new RegexRouter<>(); private EventRouterConfigDefinition.InvalidOperationBehavior invalidOperationBehavior; private String fieldEventId; @@ -40,7 +42,8 @@ public class EventRouter> implements Transformation configMap) { fieldEventType = config.getString(EventRouterConfigDefinition.FIELD_EVENT_TYPE); fieldPayload = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD); fieldPayloadId = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_ID); - fieldPayloadType = config.getString(EventRouterConfigDefinition.FIELD_PAYLOAD_TYPE); + + routeByField = config.getString(EventRouterConfigDefinition.ROUTE_BY_FIELD); + + final Map regexRouterConfig = new HashMap<>(); + regexRouterConfig.put("regex", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX)); + regexRouterConfig.put("replacement", config.getString(EventRouterConfigDefinition.ROUTE_TOPIC_REPLACEMENT)); + + regexRouter.configure(regexRouterConfig); final Map afterExtractorConfig = new HashMap<>(); afterExtractorConfig.put("field", Envelope.FieldName.AFTER); diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java index 29879a312..5ceea20e4 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java @@ -101,14 +101,6 @@ public static InvalidOperationBehavior parse(String value, String defaultValue) .withDefault("aggregateid") .withDescription("The column which contains the Payload ID within the outbox table"); - static final Field FIELD_PAYLOAD_TYPE = Field.create("table.field.payload.type") - .withDisplayName("Event Payload Type Field") - .withType(ConfigDef.Type.STRING) - .withWidth(ConfigDef.Width.MEDIUM) - .withImportance(ConfigDef.Importance.LOW) - .withDefault("aggregatetype") - .withDescription("The column which contains the Payload Type within the outbox table"); - static final Field ROUTE_BY_FIELD = Field.create("route.by.field") .withDisplayName("Field to route events by") .withType(ConfigDef.Type.STRING) @@ -160,7 +152,7 @@ public static ConfigDef configDef() { Field.group( config, "Table", - FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID, FIELD_PAYLOAD_TYPE + FIELD_EVENT_ID, FIELD_EVENT_KEY, FIELD_EVENT_TYPE, FIELD_PAYLOAD, FIELD_PAYLOAD_ID ); Field.group( config, diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java index 46b2b3a2c..5d08c3462 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java @@ -119,20 +119,60 @@ public void failsOnInvalidSetMessageKey() { router.apply(eventRecord); } + @Test + public void canRouteBasedOnField() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put( + EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), + "aggregatetype" + ); + router.configure(config); + + final SourceRecord userEventRecord = createEventRecord(); + final SourceRecord userEventRouted = router.apply(userEventRecord); + + assertThat(userEventRouted).isNotNull(); + assertThat(userEventRouted.topic()).isEqualTo("outbox.event.user"); + + final SourceRecord userUpdatedEventRecord = createEventRecord( + "ab720dd3-176d-40a6-96f3-6cf961d7df6a", + "UserUpdate", + "10711fa5", + "User", + "{}" + ); + final SourceRecord userUpdatedEventRouted = router.apply(userUpdatedEventRecord); + + assertThat(userUpdatedEventRouted).isNotNull(); + assertThat(userUpdatedEventRouted.topic()).isEqualTo("outbox.event.user"); + + final SourceRecord addressCreatedEventRecord = createEventRecord( + "ab720dd3-176d-40a6-96f3-6cf961d7df6a", + "AddressCreated", + "10711fa5", + "Address", + "{}" + ); + final SourceRecord addressCreatedEventRouted = router.apply(addressCreatedEventRecord); + + assertThat(addressCreatedEventRouted).isNotNull(); + assertThat(addressCreatedEventRouted.topic()).isEqualTo("outbox.event.address"); + } + @Test public void canConfigureEveryTableField() { final EventRouter router = new EventRouter<>(); final Map config = new HashMap<>(); config.put(EventRouterConfigDefinition.FIELD_EVENT_ID.name(), "event_id"); - config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_TYPE.name(), "payload_type"); config.put(EventRouterConfigDefinition.FIELD_PAYLOAD_ID.name(), "payload_id"); config.put(EventRouterConfigDefinition.FIELD_EVENT_TYPE.name(), "event_type"); config.put(EventRouterConfigDefinition.FIELD_PAYLOAD.name(), "payload_body"); + config.put(EventRouterConfigDefinition.ROUTE_BY_FIELD.name(), "payload_id"); router.configure(config); final Schema recordSchema = SchemaBuilder.struct() .field("event_id", SchemaBuilder.string()) - .field("payload_type", SchemaBuilder.string()) .field("payload_id", SchemaBuilder.string()) .field("event_type", SchemaBuilder.string()) .field("payload_body", SchemaBuilder.string()) @@ -146,7 +186,6 @@ public void canConfigureEveryTableField() { final Struct before = new Struct(recordSchema); before.put("event_id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c"); - before.put("payload_type", "User"); before.put("payload_id", "10711fa5"); before.put("event_type", "UserCreated"); before.put("payload_body", "{}"); @@ -161,6 +200,22 @@ public void canConfigureEveryTableField() { } private SourceRecord createEventRecord() { + return createEventRecord( + "da8d6de6-3b77-45ff-8f44-57db55a7a06c", + "UserCreated", + "10711fa5", + "User", + "{}" + ); + } + + private SourceRecord createEventRecord( + String eventId, + String eventType, + String payloadId, + String payloadType, + String payload + ) { final Schema recordSchema = SchemaBuilder.struct() .field("id", SchemaBuilder.string()) .field("aggregatetype", SchemaBuilder.string()) @@ -176,13 +231,13 @@ private SourceRecord createEventRecord() { .build(); final Struct before = new Struct(recordSchema); - before.put("id", "da8d6de6-3b77-45ff-8f44-57db55a7a06c"); - before.put("aggregatetype", "User"); - before.put("aggregateid", "10711fa5"); - before.put("type", "UserCreated"); - before.put("payload", "{}"); + before.put("id", eventId); + before.put("aggregatetype", payloadType); + before.put("aggregateid", payloadId); + before.put("type", eventType); + before.put("payload", payload); - final Struct payload = envelope.create(before, null, System.nanoTime()); - return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), payload); + final Struct body = envelope.create(before, null, System.nanoTime()); + return new SourceRecord(new HashMap<>(), new HashMap<>(), "db.outbox", envelope.schema(), body); } }