From be9c70db64b3648d21322f9de4ac2cab09fa80db Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Sun, 24 Mar 2019 17:20:41 +0100 Subject: [PATCH] DBZ-1169 Ensure SMT configuration is valid Also had to update a validator --- .../transforms/outbox/EventRouter.java | 6 ++++ .../outbox/EventRouterConfigDefinition.java | 26 +++++++++++++-- .../transforms/outbox/EventRouterTest.java | 33 +++++++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java index 0084701c1..ae87cf1a2 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouter.java @@ -6,6 +6,7 @@ package io.debezium.transforms.outbox; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.data.Envelope; import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField; import org.apache.kafka.common.config.ConfigDef; @@ -13,6 +14,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.RegexRouter; @@ -171,6 +173,10 @@ public void close() { @Override public void configure(Map configMap) { final Configuration config = Configuration.from(configMap); + Field.Set allFields = Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS); + if (!config.validateAndRecord(allFields, LOGGER::error)) { + throw new ConnectException("Unable to validate config."); + } invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse( config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR) diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java index 592419250..ca81ee0c7 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterConfigDefinition.java @@ -235,6 +235,21 @@ public String getAlias() { " in case something else is processed this transform can log it as warning, error or stop the" + " process"); + static final Field[] CONFIG_FIELDS = { + FIELD_EVENT_ID, + FIELD_EVENT_KEY, + FIELD_EVENT_TYPE, + FIELD_PAYLOAD, + FIELD_PAYLOAD_ID, + FIELD_EVENT_TIMESTAMP, + FIELDS_ADDITIONAL_PLACEMENT, + FIELD_SCHEMA_VERSION, + ROUTE_BY_FIELD, + ROUTE_TOPIC_REGEX, + ROUTE_TOPIC_REPLACEMENT, + OPERATION_INVALID_BEHAVIOR + }; + /** * There are 3 configuration groups available: * - Table: Allows you to customize each of The column names in the outbox table for your convenience @@ -253,7 +268,7 @@ public static ConfigDef configDef() { Field.group( config, "Router", - ROUTE_BY_FIELD, ROUTE_TOPIC_REPLACEMENT + ROUTE_BY_FIELD, ROUTE_TOPIC_REGEX, ROUTE_TOPIC_REPLACEMENT ); Field.group( config, @@ -263,7 +278,7 @@ public static ConfigDef configDef() { return config; } - public static List parseAdditionalFieldsConfig(Configuration config) { + static List parseAdditionalFieldsConfig(Configuration config) { String extraFieldsMapping = config.getString(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT); List additionalFields = new ArrayList<>(); @@ -272,7 +287,7 @@ public static List parseAdditionalFieldsConfig(Configuration co return additionalFields; } - for (String field: extraFieldsMapping.split(",")) { + for (String field : extraFieldsMapping.split(",")) { final String[] parts = field.split(":"); AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]); additionalFields.add( @@ -286,6 +301,11 @@ public static List parseAdditionalFieldsConfig(Configuration co private static int isListOfStringPairs(Configuration config, Field field, Field.ValidationOutput problems) { List value = config.getStrings(field, ","); int errors = 0; + + if (value == null) { + return errors; + } + for (String mapping : value) { final String[] parts = mapping.split(":"); if (parts.length != 2 && parts.length != 3) { diff --git a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java index ba8e73f6d..d03867f89 100644 --- a/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java +++ b/debezium-core/src/test/java/io/debezium/transforms/outbox/EventRouterTest.java @@ -9,6 +9,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; @@ -421,6 +422,38 @@ public void canSetMultipleFieldsIntoTheEnvelope() { assertThat(eventRouted.headers().lastWithName("payloadType").value()).isEqualTo("UserCreated"); } + @Test(expected = ConnectException.class) + public void shouldFailOnInvalidConfigurationForTopicRegex() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), " [[a-z]"); + router.configure(config); + } + + @Test(expected = ConnectException.class) + public void shouldFailOnInvalidConfigurationForAdditionalFields() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type"); + router.configure(config); + } + + @Test(expected = ConnectException.class) + public void shouldFailOnInvalidConfigurationForAdditionalFieldsEmpty() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), ""); + router.configure(config); + } + + @Test(expected = ConnectException.class) + public void shouldFailOnInvalidConfigurationForOperationBehavior() { + final EventRouter router = new EventRouter<>(); + final Map config = new HashMap<>(); + config.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), "invalidOption"); + router.configure(config); + } + private SourceRecord createEventRecord() { return createEventRecord( "da8d6de6-3b77-45ff-8f44-57db55a7a06c",