DBZ-5902 Moved list validation to CommonConnectorConfig
This commit is contained in:
parent
85d7866f3d
commit
80ac1b362f
@ -55,7 +55,8 @@ public abstract class CommonConnectorConfig {
|
||||
public static final String TASK_ID = "task.id";
|
||||
public static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_.\\-]+$");
|
||||
public static final String MULTI_PARTITION_MODE = "multi.partition.mode";
|
||||
|
||||
private static final String EMPTY_STRING = "";
|
||||
private static final CharSequence SPACE = " ";
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CommonConnectorConfig.class);
|
||||
|
||||
/**
|
||||
@ -1048,4 +1049,32 @@ public static int validateTopicName(Configuration config, Field field, Validatio
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static int notContainEmptyElements(Configuration config, Field field, Field.ValidationOutput problems) {
|
||||
|
||||
if (!config.hasKey(field)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
List<String> values = config.getList(field);
|
||||
if (values.contains(EMPTY_STRING)) {
|
||||
problems.accept(field, values, "not permitted empty string");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static int notContainSpaceInAnyElements(Configuration config, Field field, Field.ValidationOutput problems) {
|
||||
|
||||
if (!config.hasKey(field)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
List<String> values = config.getList(field);
|
||||
if (values.stream().anyMatch(h -> h.contains(SPACE))) {
|
||||
problems.accept(field, values, "some elements with not permitted space");
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import io.debezium.config.CommonConnectorConfig;
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.Field;
|
||||
import io.debezium.util.BoundedConcurrentHashMap;
|
||||
@ -44,8 +45,6 @@ public class HeaderToValue<R extends ConnectRecord<R>> implements Transformation
|
||||
private static final int CACHE_SIZE = 64;
|
||||
public static final String NESTING_SEPARATOR = ".";
|
||||
public static final String ROOT_FIELD_NAME = "payload";
|
||||
public static final String EMPTY_STRING = "";
|
||||
public static final String SPACE = " ";
|
||||
|
||||
enum Operation {
|
||||
MOVE(MOVE_OPERATION),
|
||||
@ -77,6 +76,9 @@ public String toString() {
|
||||
.withDisplayName("Header names list")
|
||||
.withType(ConfigDef.Type.LIST)
|
||||
.withImportance(ConfigDef.Importance.HIGH)
|
||||
.withValidation(
|
||||
CommonConnectorConfig::notContainSpaceInAnyElements,
|
||||
CommonConnectorConfig::notContainEmptyElements)
|
||||
.withDescription("Header names in the record whose values are to be copied or moved to record value.")
|
||||
.required();
|
||||
|
||||
@ -84,6 +86,9 @@ public String toString() {
|
||||
.withDisplayName("Field names list")
|
||||
.withType(ConfigDef.Type.LIST)
|
||||
.withImportance(ConfigDef.Importance.HIGH)
|
||||
.withValidation(
|
||||
CommonConnectorConfig::notContainSpaceInAnyElements,
|
||||
CommonConnectorConfig::notContainEmptyElements)
|
||||
.withDescription(
|
||||
"Field names, in the same order as the header names listed in the headers configuration property. Supports Struct nesting using dot notation.")
|
||||
.required();
|
||||
@ -135,21 +140,6 @@ private void validateConfiguration() {
|
||||
throw new ConfigException(format("'%s' config must have the same number of elements as '%s' config.",
|
||||
FIELDS_FIELD, HEADERS_FIELD));
|
||||
}
|
||||
|
||||
if (headers.contains(EMPTY_STRING) || fields.contains(EMPTY_STRING)) {
|
||||
throw new ConfigException(format("'%s' and/or '%s' config contains a not valid empty string.",
|
||||
FIELDS_FIELD, HEADERS_FIELD));
|
||||
}
|
||||
|
||||
if (headers.stream().anyMatch(h -> h.contains(SPACE))) {
|
||||
throw new ConfigException(format("'%s' config contains a field with a not valid space.",
|
||||
HEADERS_FIELD));
|
||||
}
|
||||
|
||||
if (fields.stream().anyMatch(f -> f.contains(SPACE))) {
|
||||
throw new ConfigException(format("'%s' config contains a field with a not valid space.",
|
||||
FIELDS_CONF));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,7 +128,7 @@ public void whenFieldsOrHeadersContainsAnEmptyValueAConfigExceptionIsThrew() {
|
||||
"operation", "copy")))
|
||||
.isInstanceOf(ConfigException.class)
|
||||
.hasMessageContaining(
|
||||
"'fields' and/or 'headers' config contains a not valid empty string.");
|
||||
"Invalid value ,f2 for configuration fields: The 'fields' value is invalid: not permitted empty string");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -140,7 +140,7 @@ public void whenFieldsOrHeadersContainsASpaceAConfigExceptionIsThrew() {
|
||||
"operation", "copy")))
|
||||
.isInstanceOf(ConfigException.class)
|
||||
.hasMessageContaining(
|
||||
"'headers' config contains a field with a not valid space.");
|
||||
"Invalid value header one for configuration headers: The 'headers' value is invalid: some elements with not permitted space");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -152,7 +152,7 @@ public void whenNestedFieldContainsASpaceInNestedAddressAConfigExceptionIsThrew(
|
||||
"operation", "copy")))
|
||||
.isInstanceOf(ConfigException.class)
|
||||
.hasMessageContaining(
|
||||
"'fields' config contains a field with a not valid space.");
|
||||
"Invalid value after.field one for configuration fields: The 'fields' value is invalid: some elements with not permitted space");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -164,7 +164,7 @@ public void whenNestedFieldIsSeparatedWithADotAndASpaceAConfigExceptionIsThrew()
|
||||
"operation", "copy")))
|
||||
.isInstanceOf(ConfigException.class)
|
||||
.hasMessageContaining(
|
||||
"'fields' config contains a field with a not valid space.");
|
||||
"Invalid value after. fieldOne for configuration fields: The 'fields' value is invalid: some elements with not permitted space");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user