DBZ-3028 Validate field.exclude.list & field.renames for MongoDB connector
This commit is contained in:
parent
b855cbb529
commit
389a8a0a83
@ -10,6 +10,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||||
@ -39,6 +40,13 @@ public class MongoDbConnectorConfig extends CommonConnectorConfig {
|
|||||||
protected static final String DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.include.list\" is already specified";
|
protected static final String DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.include.list\" is already specified";
|
||||||
protected static final String DATABASE_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.whitelist\" is already specified";
|
protected static final String DATABASE_WHITELIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.whitelist\" is already specified";
|
||||||
|
|
||||||
|
protected static final Pattern PATTERN_SPILT = Pattern.compile(",");
|
||||||
|
|
||||||
|
protected static final Pattern FIELD_EXCLUDE_LIST_PATTERN = Pattern.compile("^[*|\\w|\\s*]+(?:\\.[\\w]+\\.[\\w]+)+(\\.[\\w]+)*\\s*$");
|
||||||
|
protected static final String QUALIFIED_FIELD_EXCLUDE_LIST_PATTERN = "<databaseName>.<collectionName>.<fieldName>.<nestedFieldName>";
|
||||||
|
protected static final Pattern FIELD_RENAMES_PATTERN = Pattern.compile("^[*|\\w|\\s*]+(?:\\.[\\w]+\\.[\\w]+)+(\\.[\\w]+)*:(?:[\\w]+)+\\s*$");
|
||||||
|
protected static final String QUALIFIED_FIELD_RENAMES_PATTERN = "<databaseName>.<collectionName>.<fieldName>.<nestedFieldName>:<newNestedFieldName>";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The set of predefined SnapshotMode options or aliases.
|
* The set of predefined SnapshotMode options or aliases.
|
||||||
*/
|
*/
|
||||||
@ -358,6 +366,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
|||||||
.withType(Type.STRING)
|
.withType(Type.STRING)
|
||||||
.withWidth(Width.LONG)
|
.withWidth(Width.LONG)
|
||||||
.withImportance(Importance.MEDIUM)
|
.withImportance(Importance.MEDIUM)
|
||||||
|
.withValidation(MongoDbConnectorConfig::validateFieldExcludeList)
|
||||||
.withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values");
|
.withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -385,6 +394,7 @@ public static SnapshotMode parse(String value, String defaultValue) {
|
|||||||
.withType(Type.STRING)
|
.withType(Type.STRING)
|
||||||
.withWidth(Width.LONG)
|
.withWidth(Width.LONG)
|
||||||
.withImportance(Importance.MEDIUM)
|
.withImportance(Importance.MEDIUM)
|
||||||
|
.withValidation(MongoDbConnectorConfig::validateFieldRenamesList)
|
||||||
.withDescription("");
|
.withDescription("");
|
||||||
|
|
||||||
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
|
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
|
||||||
@ -509,6 +519,36 @@ private static int validateHosts(Configuration config, Field field, ValidationOu
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int validateFieldExcludeList(Configuration config, Field field, ValidationOutput problems) {
|
||||||
|
int problemCount = 0;
|
||||||
|
String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST);
|
||||||
|
|
||||||
|
if (fieldExcludeList != null) {
|
||||||
|
for (String excludeField : PATTERN_SPILT.split(fieldExcludeList)) {
|
||||||
|
if (!FIELD_EXCLUDE_LIST_PATTERN.asPredicate().test(excludeField)) {
|
||||||
|
problems.accept(FIELD_EXCLUDE_LIST, excludeField, excludeField + " has invalid format (expecting " + QUALIFIED_FIELD_EXCLUDE_LIST_PATTERN + ")");
|
||||||
|
problemCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return problemCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int validateFieldRenamesList(Configuration config, Field field, ValidationOutput problems) {
|
||||||
|
int problemCount = 0;
|
||||||
|
String fieldRenamesList = config.getString(FIELD_RENAMES);
|
||||||
|
|
||||||
|
if (fieldRenamesList != null) {
|
||||||
|
for (String renameField : PATTERN_SPILT.split(fieldRenamesList)) {
|
||||||
|
if (!FIELD_RENAMES_PATTERN.asPredicate().test(renameField)) {
|
||||||
|
problems.accept(FIELD_EXCLUDE_LIST, renameField, renameField + " has invalid format (expecting " + QUALIFIED_FIELD_RENAMES_PATTERN + ")");
|
||||||
|
problemCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return problemCount;
|
||||||
|
}
|
||||||
|
|
||||||
private static int validateCollectionExcludeList(Configuration config, Field field, ValidationOutput problems) {
|
private static int validateCollectionExcludeList(Configuration config, Field field, ValidationOutput problems) {
|
||||||
String includeList = config.getString(COLLECTION_INCLUDE_LIST);
|
String includeList = config.getString(COLLECTION_INCLUDE_LIST);
|
||||||
String excludeList = config.getString(COLLECTION_EXCLUDE_LIST);
|
String excludeList = config.getString(COLLECTION_EXCLUDE_LIST);
|
||||||
|
@ -7,7 +7,6 @@
|
|||||||
|
|
||||||
import static org.fest.assertions.Assertions.assertThat;
|
import static org.fest.assertions.Assertions.assertThat;
|
||||||
|
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -162,41 +161,6 @@ public void shouldExcludeCollectionCoveredByLiteralInBlacklist() {
|
|||||||
assertCollectionIncluded("db2.collectionA");
|
assertCollectionIncluded("db2.collectionA");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldBlacklistDatabaseAndCollectionPartsAreMissing() {
|
|
||||||
build.excludeFields(".name").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldBlacklistFieldPartIsMissing() {
|
|
||||||
build.excludeFields("db1.collectionA.").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldRenamesDatabaseAndCollectionPartsAreMissing() {
|
|
||||||
build.renameFields(".name=new_name").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldRenamesReplacementPartIsMissing() {
|
|
||||||
build.renameFields("db1.collectionA.").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldRenamesReplacementPartSeparatorIsMissing() {
|
|
||||||
build.renameFields("db1.collectionA.namenew_name").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldRenamesRenameMappingKeyIsMissing() {
|
|
||||||
build.renameFields("db1.collectionA.=new_name").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = ConfigException.class)
|
|
||||||
public void shouldThrowExceptionWhenFieldRenamesRenameMappingValueIsMissing() {
|
|
||||||
build.renameFields("db1.collectionA.name=").createFilters();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void assertCollectionIncluded(String fullyQualifiedCollectionName) {
|
protected void assertCollectionIncluded(String fullyQualifiedCollectionName) {
|
||||||
CollectionId id = CollectionId.parse("rs1.", fullyQualifiedCollectionName);
|
CollectionId id = CollectionId.parse("rs1.", fullyQualifiedCollectionName);
|
||||||
assertThat(id).isNotNull();
|
assertThat(id).isNotNull();
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
|
|
||||||
import io.debezium.config.CommonConnectorConfig;
|
import io.debezium.config.CommonConnectorConfig;
|
||||||
import io.debezium.config.Configuration;
|
import io.debezium.config.Configuration;
|
||||||
|
import io.debezium.config.Field;
|
||||||
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
|
import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary;
|
||||||
import io.debezium.converters.CloudEventsConverterTest;
|
import io.debezium.converters.CloudEventsConverterTest;
|
||||||
import io.debezium.data.Envelope;
|
import io.debezium.data.Envelope;
|
||||||
@ -143,6 +144,96 @@ public void shouldFailToValidateInvalidConfiguration() {
|
|||||||
assertNoConfigurationErrors(result, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
|
assertNoConfigurationErrors(result, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldExcludeListDatabasePartIsOnlyProvided() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "inventory", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldExcludeListDatabaseAndCollectionPartIsOnlyProvided() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "inventory.collectionA", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldExcludeListDatabaseAndCollectionPartsAreMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, ".name", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldExcludeListFieldPartIsMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldExcludeListHasLeadingWhiteSpaces() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, " *.collectionA.name", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldExcludeListHasWhiteSpaces() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.name ,db2.collectionB.house ", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldExcludeListIsValid() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.name1", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldRenamesDatabaseAndCollectionPartsAreMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, ".name=new_name", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldRenamesReplacementPartIsMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldRenamesReplacementPartSeparatorIsMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.namenew_name", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldRenamesRenameMappingKeyIsMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.=new_name", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldThrowExceptionWhenFieldRenamesRenameMappingValueIsMissing() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.name=", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldRenamesHasLeadingWhiteSpaces() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, " db1.collectionA.name:newname", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldRenamesHasWhiteSpaces() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "*.collectionA.name:new_name, db2.collectionB.house:new_house ", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldNotThrowExceptionWhenFieldRenamesIsValid() {
|
||||||
|
shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.name1:new_name1", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shouldValidateFilterFieldConfiguration(Field field, String value, int errorCount) {
|
||||||
|
config = TestHelper.getConfiguration().edit()
|
||||||
|
.with(field, value)
|
||||||
|
.build();
|
||||||
|
MongoDbConnector connector = new MongoDbConnector();
|
||||||
|
Config result = connector.validate(config.asMap());
|
||||||
|
|
||||||
|
if (errorCount == 0) {
|
||||||
|
assertNoConfigurationErrors(result, field);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
assertConfigurationErrors(result, field, errorCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldValidateAcceptableConfiguration() {
|
public void shouldValidateAcceptableConfiguration() {
|
||||||
config = TestHelper.getConfiguration();
|
config = TestHelper.getConfiguration();
|
||||||
|
Loading…
Reference in New Issue
Block a user