DBZ-1169 Ensure SMT configuration is valid
Also had to update a validator
This commit is contained in:
parent
87a3278dfe
commit
be9c70db64
@ -6,6 +6,7 @@
|
|||||||
package io.debezium.transforms.outbox;
|
package io.debezium.transforms.outbox;
|
||||||
|
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.config.Field;
|
||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
|
import io.debezium.transforms.outbox.EventRouterConfigDefinition.AdditionalField;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
@ -13,6 +14,7 @@
|
|||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.header.Headers;
|
import org.apache.kafka.connect.header.Headers;
|
||||||
import org.apache.kafka.connect.transforms.ExtractField;
|
import org.apache.kafka.connect.transforms.ExtractField;
|
||||||
import org.apache.kafka.connect.transforms.RegexRouter;
|
import org.apache.kafka.connect.transforms.RegexRouter;
|
||||||
@ -171,6 +173,10 @@ public void close() {
|
|||||||
@Override
|
@Override
|
||||||
public void configure(Map<String, ?> configMap) {
|
public void configure(Map<String, ?> configMap) {
|
||||||
final Configuration config = Configuration.from(configMap);
|
final Configuration config = Configuration.from(configMap);
|
||||||
|
Field.Set allFields = Field.setOf(EventRouterConfigDefinition.CONFIG_FIELDS);
|
||||||
|
if (!config.validateAndRecord(allFields, LOGGER::error)) {
|
||||||
|
throw new ConnectException("Unable to validate config.");
|
||||||
|
}
|
||||||
|
|
||||||
invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(
|
invalidOperationBehavior = EventRouterConfigDefinition.InvalidOperationBehavior.parse(
|
||||||
config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR)
|
config.getString(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR)
|
||||||
|
@ -235,6 +235,21 @@ public String getAlias() {
|
|||||||
" in case something else is processed this transform can log it as warning, error or stop the" +
|
" in case something else is processed this transform can log it as warning, error or stop the" +
|
||||||
" process");
|
" process");
|
||||||
|
|
||||||
|
static final Field[] CONFIG_FIELDS = {
|
||||||
|
FIELD_EVENT_ID,
|
||||||
|
FIELD_EVENT_KEY,
|
||||||
|
FIELD_EVENT_TYPE,
|
||||||
|
FIELD_PAYLOAD,
|
||||||
|
FIELD_PAYLOAD_ID,
|
||||||
|
FIELD_EVENT_TIMESTAMP,
|
||||||
|
FIELDS_ADDITIONAL_PLACEMENT,
|
||||||
|
FIELD_SCHEMA_VERSION,
|
||||||
|
ROUTE_BY_FIELD,
|
||||||
|
ROUTE_TOPIC_REGEX,
|
||||||
|
ROUTE_TOPIC_REPLACEMENT,
|
||||||
|
OPERATION_INVALID_BEHAVIOR
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* There are 3 configuration groups available:
|
* There are 3 configuration groups available:
|
||||||
* - Table: Allows you to customize each of The column names in the outbox table for your convenience
|
* - Table: Allows you to customize each of The column names in the outbox table for your convenience
|
||||||
@ -253,7 +268,7 @@ public static ConfigDef configDef() {
|
|||||||
Field.group(
|
Field.group(
|
||||||
config,
|
config,
|
||||||
"Router",
|
"Router",
|
||||||
ROUTE_BY_FIELD, ROUTE_TOPIC_REPLACEMENT
|
ROUTE_BY_FIELD, ROUTE_TOPIC_REGEX, ROUTE_TOPIC_REPLACEMENT
|
||||||
);
|
);
|
||||||
Field.group(
|
Field.group(
|
||||||
config,
|
config,
|
||||||
@ -263,7 +278,7 @@ public static ConfigDef configDef() {
|
|||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<AdditionalField> parseAdditionalFieldsConfig(Configuration config) {
|
static List<AdditionalField> parseAdditionalFieldsConfig(Configuration config) {
|
||||||
String extraFieldsMapping = config.getString(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT);
|
String extraFieldsMapping = config.getString(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT);
|
||||||
|
|
||||||
List<AdditionalField> additionalFields = new ArrayList<>();
|
List<AdditionalField> additionalFields = new ArrayList<>();
|
||||||
@ -272,7 +287,7 @@ public static List<AdditionalField> parseAdditionalFieldsConfig(Configuration co
|
|||||||
return additionalFields;
|
return additionalFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String field: extraFieldsMapping.split(",")) {
|
for (String field : extraFieldsMapping.split(",")) {
|
||||||
final String[] parts = field.split(":");
|
final String[] parts = field.split(":");
|
||||||
AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]);
|
AdditionalFieldPlacement placement = AdditionalFieldPlacement.parse(parts[1]);
|
||||||
additionalFields.add(
|
additionalFields.add(
|
||||||
@ -286,6 +301,11 @@ public static List<AdditionalField> parseAdditionalFieldsConfig(Configuration co
|
|||||||
private static int isListOfStringPairs(Configuration config, Field field, Field.ValidationOutput problems) {
|
private static int isListOfStringPairs(Configuration config, Field field, Field.ValidationOutput problems) {
|
||||||
List<String> value = config.getStrings(field, ",");
|
List<String> value = config.getStrings(field, ",");
|
||||||
int errors = 0;
|
int errors = 0;
|
||||||
|
|
||||||
|
if (value == null) {
|
||||||
|
return errors;
|
||||||
|
}
|
||||||
|
|
||||||
for (String mapping : value) {
|
for (String mapping : value) {
|
||||||
final String[] parts = mapping.split(":");
|
final String[] parts = mapping.split(":");
|
||||||
if (parts.length != 2 && parts.length != 3) {
|
if (parts.length != 2 && parts.length != 3) {
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
import org.apache.kafka.connect.data.Schema;
|
import org.apache.kafka.connect.data.Schema;
|
||||||
import org.apache.kafka.connect.data.SchemaBuilder;
|
import org.apache.kafka.connect.data.SchemaBuilder;
|
||||||
import org.apache.kafka.connect.data.Struct;
|
import org.apache.kafka.connect.data.Struct;
|
||||||
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.connect.errors.DataException;
|
import org.apache.kafka.connect.errors.DataException;
|
||||||
import org.apache.kafka.connect.header.Header;
|
import org.apache.kafka.connect.header.Header;
|
||||||
import org.apache.kafka.connect.header.Headers;
|
import org.apache.kafka.connect.header.Headers;
|
||||||
@ -421,6 +422,38 @@ public void canSetMultipleFieldsIntoTheEnvelope() {
|
|||||||
assertThat(eventRouted.headers().lastWithName("payloadType").value()).isEqualTo("UserCreated");
|
assertThat(eventRouted.headers().lastWithName("payloadType").value()).isEqualTo("UserCreated");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = ConnectException.class)
|
||||||
|
public void shouldFailOnInvalidConfigurationForTopicRegex() {
|
||||||
|
final EventRouter<SourceRecord> router = new EventRouter<>();
|
||||||
|
final Map<String, String> config = new HashMap<>();
|
||||||
|
config.put(EventRouterConfigDefinition.ROUTE_TOPIC_REGEX.name(), " [[a-z]");
|
||||||
|
router.configure(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ConnectException.class)
|
||||||
|
public void shouldFailOnInvalidConfigurationForAdditionalFields() {
|
||||||
|
final EventRouter<SourceRecord> router = new EventRouter<>();
|
||||||
|
final Map<String, String> config = new HashMap<>();
|
||||||
|
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "type");
|
||||||
|
router.configure(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ConnectException.class)
|
||||||
|
public void shouldFailOnInvalidConfigurationForAdditionalFieldsEmpty() {
|
||||||
|
final EventRouter<SourceRecord> router = new EventRouter<>();
|
||||||
|
final Map<String, String> config = new HashMap<>();
|
||||||
|
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "");
|
||||||
|
router.configure(config);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = ConnectException.class)
|
||||||
|
public void shouldFailOnInvalidConfigurationForOperationBehavior() {
|
||||||
|
final EventRouter<SourceRecord> router = new EventRouter<>();
|
||||||
|
final Map<String, String> config = new HashMap<>();
|
||||||
|
config.put(EventRouterConfigDefinition.OPERATION_INVALID_BEHAVIOR.name(), "invalidOption");
|
||||||
|
router.configure(config);
|
||||||
|
}
|
||||||
|
|
||||||
private SourceRecord createEventRecord() {
|
private SourceRecord createEventRecord() {
|
||||||
return createEventRecord(
|
return createEventRecord(
|
||||||
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
|
"da8d6de6-3b77-45ff-8f44-57db55a7a06c",
|
||||||
|
Loading…
Reference in New Issue
Block a user