diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/FieldSelector.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/FieldSelector.java index 3a06b1125..904c78522 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/FieldSelector.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/FieldSelector.java @@ -93,7 +93,7 @@ public FieldSelectorBuilder renameFields(String fullyQualifiedFieldReplacements) /** * Builds the filter selector that returns the field filter for a given collection identifier, using the comma-separated - * list of fully-qualified field names (for details, see {@link MongoDbConnectorConfig#FIELD_BLACKLIST}) defining + * list of fully-qualified field names (for details, see {@link MongoDbConnectorConfig#FIELD_EXCLUDE_LIST}) defining * which fields (if any) should be excluded, and using the comma-separated list of fully-qualified field replacements * (for details, see {@link MongoDbConnectorConfig#FIELD_RENAMES}) defining which fields (if any) should be * renamed. diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Filters.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Filters.java index 500b3ddf4..94a2530d6 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Filters.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/Filters.java @@ -32,26 +32,26 @@ public final class Filters { * @param config the configuration; may not be null */ public Filters(Configuration config) { - String dbWhitelist = config.getString(MongoDbConnectorConfig.DATABASE_WHITELIST); - String dbBlacklist = config.getString(MongoDbConnectorConfig.DATABASE_BLACKLIST); - if (dbWhitelist != null && !dbWhitelist.trim().isEmpty()) { - databaseFilter = Predicates.includes(dbWhitelist); + String dbIncludeList = config.getFallbackStringProperty(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, MongoDbConnectorConfig.DATABASE_WHITELIST); + String dbExcludeList = config.getFallbackStringProperty(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST, MongoDbConnectorConfig.DATABASE_BLACKLIST); + if (dbIncludeList != null && !dbIncludeList.trim().isEmpty()) { + databaseFilter = Predicates.includes(dbIncludeList); } - else if (dbBlacklist != null && !dbBlacklist.trim().isEmpty()) { - databaseFilter = Predicates.excludes(dbBlacklist); + else if (dbExcludeList != null && !dbExcludeList.trim().isEmpty()) { + databaseFilter = Predicates.excludes(dbExcludeList); } else { databaseFilter = (db) -> true; } - String collectionWhitelist = config.getString(MongoDbConnectorConfig.COLLECTION_WHITELIST); - String collectionBlacklist = config.getString(MongoDbConnectorConfig.COLLECTION_BLACKLIST); + String collectionIncludeList = config.getFallbackStringProperty(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, MongoDbConnectorConfig.COLLECTION_WHITELIST); + String collectionExcludeList = config.getFallbackStringProperty(MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST, MongoDbConnectorConfig.COLLECTION_BLACKLIST); final Predicate collectionFilter; - if (collectionWhitelist != null && !collectionWhitelist.trim().isEmpty()) { - collectionFilter = Predicates.includes(collectionWhitelist, CollectionId::namespace); + if (collectionIncludeList != null && !collectionIncludeList.trim().isEmpty()) { + collectionFilter = Predicates.includes(collectionIncludeList, CollectionId::namespace); } - else if (collectionBlacklist != null && !collectionBlacklist.trim().isEmpty()) { - collectionFilter = Predicates.excludes(collectionBlacklist, CollectionId::namespace); + else if (collectionExcludeList != null && !collectionExcludeList.trim().isEmpty()) { + collectionFilter = Predicates.excludes(collectionExcludeList, CollectionId::namespace); } else { collectionFilter = (id) -> true; @@ -61,7 +61,7 @@ else if (collectionBlacklist != null && !collectionBlacklist.trim().isEmpty()) { // Define the field selector that provides the field filter to exclude or rename fields in a document ... fieldSelector = FieldSelector.builder() - .excludeFields(config.getString(MongoDbConnectorConfig.FIELD_BLACKLIST)) + .excludeFields(config.getFallbackStringProperty(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, MongoDbConnectorConfig.FIELD_BLACKLIST)) .renameFields(config.getString(MongoDbConnectorConfig.FIELD_RENAMES)) .build(); } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index 9c5ac52dd..c34e9ad38 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -26,6 +26,9 @@ */ public class MongoDbConnectorConfig extends CommonConnectorConfig { + protected static final String COLLECTION_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"collection.include.list\" or \"collection.whitelist\" is already specified"; + protected static final String DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG = "\"database.include.list\" or \"database.whitelist\" is already specified"; + /** * The set of predefined SnapshotMode options or aliases. */ @@ -236,48 +239,94 @@ public static SnapshotMode parse(String value, String defaultValue) { /** * A comma-separated list of regular expressions that match the databases to be monitored. - * May not be used with {@link #DATABASE_BLACKLIST}. + * Must not be used with {@link #DATABASE_EXCLUDE_LIST}. */ - public static final Field DATABASE_WHITELIST = Field.create("database.whitelist") - .withDisplayName("DB Whitelist") + public static final Field DATABASE_INCLUDE_LIST = Field.create("database.include.list") + .withDisplayName("Include Databases") .withType(Type.LIST) .withWidth(Width.LONG) .withImportance(Importance.HIGH) - .withValidation(Field::isListOfRegex, - MongoDbConnectorConfig::validateDatabaseBlacklist) + .withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateDatabaseExcludeList) .withDescription("The databases for which changes are to be captured"); /** - * A comma-separated list of regular expressions that match the databases to be excluded. - * May not be used with {@link #DATABASE_WHITELIST}. + * Old, backwards-compatible "whitelist" property. */ - public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist") - .withDisplayName("DB Blacklist") + @Deprecated + public static final Field DATABASE_WHITELIST = Field.create("database.whitelist") + .withDisplayName("Deprecated: Include Databases") + .withType(Type.LIST) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateDatabaseExcludeList) + .withInvisibleRecommender() + .withDescription("The databases for which changes are to be captured (deprecated, use \"" + DATABASE_INCLUDE_LIST.name() + "\" instead)"); + + /** + * A comma-separated list of regular expressions that match the databases to be excluded. + * Must not be used with {@link #DATABASE_INCLUDE_LIST}. + */ + public static final Field DATABASE_EXCLUDE_LIST = Field.create("database.exclude.list") + .withDisplayName("Exclude Databases") .withType(Type.LIST) .withWidth(Width.LONG) .withImportance(Importance.HIGH) .withValidation(Field::isListOfRegex) .withDescription("The databases for which changes are to be excluded"); + /** + * Old, backwards-compatible "blacklist" property. + */ + @Deprecated + public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist") + .withDisplayName("Deprecated: Exclude Databases") + .withType(Type.LIST) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(Field::isListOfRegex) + .withInvisibleRecommender() + .withDescription("The databases for which changes are to be excluded (deprecated, use \"" + DATABASE_EXCLUDE_LIST.name() + "\" instead)"); + /** * A comma-separated list of regular expressions that match the fully-qualified namespaces of collections to be monitored. * Fully-qualified namespaces for collections are of the form {@code .}. - * May not be used with {@link #COLLECTION_BLACKLIST}. + * Must not be used with {@link #COLLECTION_EXCLUDE_LIST}. */ - public static final Field COLLECTION_WHITELIST = Field.create("collection.whitelist") - .withDisplayName("Collections") + public static final Field COLLECTION_INCLUDE_LIST = Field.create("collection.include.list") + .withDisplayName("Include Collections") .withType(Type.LIST) .withWidth(Width.LONG) .withImportance(Importance.HIGH) .withValidation(Field::isListOfRegex, - MongoDbConnectorConfig::validateCollectionBlacklist) + MongoDbConnectorConfig::validateCollectionExcludeList) .withDescription("The collections for which changes are to be captured"); + /** + * Old, backwards-compatible "whitelist" property. + */ + @Deprecated + public static final Field COLLECTION_WHITELIST = Field.create("collection.whitelist") + .withDisplayName("Deprecated: Include Collections") + .withType(Type.LIST) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withValidation(Field::isListOfRegex, MongoDbConnectorConfig::validateCollectionExcludeList) + .withInvisibleRecommender() + .withDescription("The collections for which changes are to be captured (deprecated, use \"" + COLLECTION_INCLUDE_LIST.name() + "\" instead)"); + /** * A comma-separated list of regular expressions that match the fully-qualified namespaces of collections to be excluded from * monitoring. Fully-qualified namespaces for collections are of the form {@code .}. - * May not be used with {@link #COLLECTION_WHITELIST}. + * Must not be used with {@link #COLLECTION_INCLUDE_LIST}. */ + public static final Field COLLECTION_EXCLUDE_LIST = Field.create("collection.exclude.list") + .withValidation(Field::isListOfRegex) + .withInvisibleRecommender(); + + /** + * Old, backwards-compatible "blacklist" property. + */ + @Deprecated public static final Field COLLECTION_BLACKLIST = Field.create("collection.blacklist") .withValidation(Field::isListOfRegex) .withInvisibleRecommender(); @@ -288,12 +337,25 @@ public static SnapshotMode parse(String value, String defaultValue) { * ...}, where {@code } and * {@code } may contain the wildcard ({@code *}) which matches any characters. */ - public static final Field FIELD_BLACKLIST = Field.create("field.blacklist") + public static final Field FIELD_EXCLUDE_LIST = Field.create("field.exclude.list") .withDisplayName("Exclude Fields") .withType(Type.STRING) .withWidth(Width.LONG) .withImportance(Importance.MEDIUM) - .withDescription(""); + .withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values"); + + /** + * Old, backwards-compatible "blacklist" property. + */ + @Deprecated + public static final Field FIELD_BLACKLIST = Field.create("field.blacklist") + .withDisplayName("Deprecated: Exclude Fields") + .withType(Type.STRING) + .withWidth(Width.LONG) + .withImportance(Importance.LOW) + .withInvisibleRecommender() + .withDescription("A comma-separated list of the fully-qualified names of fields that should be excluded from change event message values (deprecated, use \"" + + FIELD_EXCLUDE_LIST.name() + "\" instead)"); /** * A comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change @@ -369,10 +431,15 @@ public static SnapshotMode parse(String value, String defaultValue) { SSL_ALLOW_INVALID_HOSTNAMES) .events( DATABASE_WHITELIST, + DATABASE_INCLUDE_LIST, DATABASE_BLACKLIST, + DATABASE_EXCLUDE_LIST, COLLECTION_WHITELIST, + COLLECTION_INCLUDE_LIST, COLLECTION_BLACKLIST, + COLLECTION_EXCLUDE_LIST, FIELD_BLACKLIST, + FIELD_EXCLUDE_LIST, FIELD_RENAMES) .connector( MAX_COPY_THREADS, @@ -413,19 +480,21 @@ private static int validateHosts(Configuration config, Field field, ValidationOu return count; } - private static int validateCollectionBlacklist(Configuration config, Field field, ValidationOutput problems) { - return validateBlacklistField(config, problems, COLLECTION_WHITELIST, COLLECTION_BLACKLIST); + private static int validateCollectionExcludeList(Configuration config, Field field, ValidationOutput problems) { + String includeList = config.getFallbackStringProperty(COLLECTION_INCLUDE_LIST, COLLECTION_WHITELIST); + String excludeList = config.getFallbackStringProperty(COLLECTION_EXCLUDE_LIST, COLLECTION_BLACKLIST); + if (includeList != null && excludeList != null) { + problems.accept(COLLECTION_EXCLUDE_LIST, excludeList, COLLECTION_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG); + return 1; + } + return 0; } - private static int validateDatabaseBlacklist(Configuration config, Field field, ValidationOutput problems) { - return validateBlacklistField(config, problems, DATABASE_WHITELIST, DATABASE_BLACKLIST); - } - - private static int validateBlacklistField(Configuration config, ValidationOutput problems, Field fieldWhitelist, Field fieldBlacklist) { - String whitelist = config.getString(fieldWhitelist); - String blacklist = config.getString(fieldBlacklist); - if (whitelist != null && blacklist != null) { - problems.accept(fieldBlacklist, blacklist, "Whitelist is already specified"); + private static int validateDatabaseExcludeList(Configuration config, Field field, ValidationOutput problems) { + String includeList = config.getFallbackStringProperty(DATABASE_INCLUDE_LIST, DATABASE_WHITELIST); + String excludeList = config.getFallbackStringProperty(DATABASE_EXCLUDE_LIST, DATABASE_BLACKLIST); + if (includeList != null && excludeList != null) { + problems.accept(DATABASE_EXCLUDE_LIST, excludeList, DATABASE_INCLUDE_LIST_ALREADY_SPECIFIED_ERROR_MSG); return 1; } return 0; diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java index df4273c16..ae3718085 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSchema.java @@ -113,7 +113,7 @@ public boolean tableInformationComplete() { @Override public void assureNonEmptySchema() { if (collections.isEmpty()) { - LOGGER.warn("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration"); + LOGGER.warn(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING); } } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/Configurator.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/Configurator.java index 51a485934..c494a6996 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/Configurator.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/Configurator.java @@ -7,6 +7,7 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; +import io.debezium.util.Testing; /** * A helper for easily building connector configurations for testing. @@ -45,23 +46,48 @@ public Configurator maxBatchSize(int maxBatchSize) { } public Configurator includeDatabases(String regexList) { - return with(MongoDbConnectorConfig.DATABASE_WHITELIST, regexList); + if (Math.random() >= 0.5) { + Testing.debug("Using \"" + MongoDbConnectorConfig.DATABASE_WHITELIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.DATABASE_WHITELIST, regexList); + } + Testing.debug("Using \"" + MongoDbConnectorConfig.DATABASE_INCLUDE_LIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, regexList); } public Configurator excludeDatabases(String regexList) { - return with(MongoDbConnectorConfig.DATABASE_BLACKLIST, regexList); + if (Math.random() >= 0.5) { + Testing.debug("Using \"" + MongoDbConnectorConfig.DATABASE_BLACKLIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.DATABASE_BLACKLIST, regexList); + } + Testing.debug("Using \"" + MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST, regexList); } public Configurator includeCollections(String regexList) { - return with(MongoDbConnectorConfig.COLLECTION_WHITELIST, regexList); + if (Math.random() >= 0.5) { + Testing.debug("Using \"" + MongoDbConnectorConfig.COLLECTION_WHITELIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.COLLECTION_WHITELIST, regexList); + } + Testing.debug("Using \"" + MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, regexList); } public Configurator excludeCollections(String regexList) { - return with(MongoDbConnectorConfig.COLLECTION_BLACKLIST, regexList); + if (Math.random() >= 0.5) { + Testing.debug("Using \"" + MongoDbConnectorConfig.COLLECTION_BLACKLIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.COLLECTION_BLACKLIST, regexList); + } + Testing.debug("Using \"" + MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST, regexList); } - public Configurator excludeFields(String blacklist) { - return with(MongoDbConnectorConfig.FIELD_BLACKLIST, blacklist); + public Configurator excludeFields(String excludeList) { + if (Math.random() >= 0.5) { + Testing.debug("Using \"" + MongoDbConnectorConfig.FIELD_BLACKLIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.FIELD_BLACKLIST, excludeList); + } + Testing.debug("Using \"" + MongoDbConnectorConfig.FIELD_EXCLUDE_LIST.name() + "\" config property"); + return with(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, excludeList); } public Configurator renameFields(String renames) { diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java index 41d27e1d5..2777f3948 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/ConnectionIT.java @@ -40,8 +40,8 @@ public void setUp() { @Test public void shouldCreateMovieDatabase() { useConfiguration(config.edit() - .with(MongoDbConnectorConfig.DATABASE_WHITELIST, "dbA,dbB") - .with(MongoDbConnectorConfig.COLLECTION_BLACKLIST, "dbB.moviesB") + .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, "dbA,dbB") + .with(MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST, "dbB.moviesB") .build()); Testing.print("Configuration: " + config); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java new file mode 100644 index 000000000..4a0d0b5f0 --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldExcludeListIT.java @@ -0,0 +1,1274 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb; + +import static io.debezium.connector.mongodb.MongoDbSchema.COMPACT_JSON_SETTINGS; +import static io.debezium.data.Envelope.FieldName.AFTER; +import static org.fest.assertions.Assertions.assertThat; +import static org.fest.assertions.Fail.fail; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.InsertOneOptions; + +import io.debezium.config.Configuration; +import io.debezium.connector.mongodb.ConnectionContext.MongoPrimary; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.util.Testing; + +// todo: extend AbstractMongoConnectorIT? +public class FieldExcludeListIT extends AbstractConnectorTest { + + private static final String SERVER_NAME = "serverX"; + private static final String PATCH = "patch"; + + private Configuration config; + private MongoDbTaskContext context; + + @Before + public void beforeEach() { + Debug.disable(); + Print.disable(); + stopConnector(); + initializeConnectorTestFramework(); + } + + @After + public void afterEach() { + try { + stopConnector(); + } + finally { + if (context != null) { + context.getConnectionContext().shutdown(); + } + } + } + + @Test + public void shouldNotExcludeFieldsForEventOfOtherCollection() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + assertReadRecord("*.c2.name,*.c2.active", obj, AFTER, obj.toJson(COMPACT_JSON_SETTINGS)); + } + + @Test + public void shouldExcludeFieldsForReadEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"_id\": {\"$oid\": \"" + objId + "\"}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}"; + // @formatter:on + + assertReadRecord("*.c1.name,*.c1.active", obj, AFTER, expected); + } + + @Test + public void shouldNotExcludeMissingFieldsForReadEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + assertReadRecord("*.c1.missing", obj, AFTER, obj.toJson(COMPACT_JSON_SETTINGS)); + } + + @Test + public void shouldExcludeNestedFieldsForReadEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"_id\": {\"$oid\": \"" + objId + "\"}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"address\": {" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "\"scores\": [1.2,3.4,5.6]" + + "}"; + // @formatter:on + + assertReadRecord("*.c1.name,*.c1.active,*.c1.address.number", obj, AFTER, expected); + } + + @Test + public void shouldNotExcludeNestedMissingFieldsForReadEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + assertReadRecord("*.c1.address.missing", obj, AFTER, obj.toJson(COMPACT_JSON_SETTINGS)); + } + + @Test + public void shouldExcludeFieldsForInsertEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"_id\": {\"$oid\": \"" + objId + "\"}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}"; + // @formatter:on + + assertInsertRecord("*.c1.name,*.c1.active", obj, AFTER, expected); + } + + @Test + public void shouldNotExcludeMissingFieldsForInsertEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + assertInsertRecord("*.c1.missing", obj, AFTER, obj.toJson(COMPACT_JSON_SETTINGS)); + } + + @Test + public void shouldExcludeNestedFieldsForInsertEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"_id\": {\"$oid\": \"" + objId + "\"}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"address\": {" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "\"scores\": [1.2,3.4,5.6]" + + "}"; + // @formatter:on + + assertInsertRecord("*.c1.name,*.c1.active,*.c1.address.number", obj, AFTER, expected); + } + + @Test + public void shouldNotExcludeNestedMissingFieldsForInsertEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + assertInsertRecord("*.c1.address.missing", obj, AFTER, obj.toJson(COMPACT_JSON_SETTINGS)); + } + + @Test + public void shouldExcludeFieldsForUpdateEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("phone", 123L) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.active", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldNotExcludeMissingFieldsForUpdateEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("phone", 123L) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.missing", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForUpdateEventWithEmbeddedDocument() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("address", new Document() + .append("number", 35L) + .append("street", "Claude Debussylaane") + .append("city", "Amsterdame")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"address\": {" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.active,*.c1.address.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldNotExcludeNestedMissingFieldsForUpdateEventWithEmbeddedDocument() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("address", new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")) + .append("active", false) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"active\": true," + + "\"address\": {" + + "\"number\": {\"$numberLong\": \"34\"}," + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.address.missing", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("addresses", Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"), + new Document() + .append("number", 8L) + .append("street", "Fragkokklisiass") + .append("city", "Athense"))) + .append("active", false) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam"), + new Document() + .append("number", 7L) + .append("street", "Fragkokklisias") + .append("city", "Athens"))) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"active\": true," + + "\"addresses\": [" + + "{" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "{" + + "\"street\": \"Fragkokklisias\"," + + "\"city\": \"Athens\"" + + "}" + + "]," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldNotExcludeNestedFieldsForUpdateEventWithArrayOfArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally Mae") + .append("phone", 456L) + .append("addresses", Arrays.asList( + Collections.singletonList(new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")), + Collections.singletonList(new Document() + .append("number", 8L) + .append("street", "Fragkokklisiass") + .append("city", "Athenss")))) + .append("active", false) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + Collections.singletonList(new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")), + Collections.singletonList(new Document() + .append("number", 7L) + .append("street", "Fragkokklisias") + .append("city", "Athens")))) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"active\": true," + + "\"addresses\": [" + + "[" + + "{" + + "\"number\": {\"$numberLong\": \"34\"}," + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}" + + "]," + + "[" + + "{" + + "\"number\": {\"$numberLong\": \"7\"}," + + "\"street\": \"Fragkokklisias\"," + + "\"city\": \"Athens\"" + + "}" + + "]" + + "]," + + "\"phone\": {\"$numberLong\": \"123\"}," + + "\"scores\": [1.2,3.4,5.6]" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForSetTopLevelFieldUpdateEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("phone", 456L); + + Document updateObj = new Document() + .append("name", "Sally") + .append("phone", 123L); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"phone\": {\"$numberLong\": \"123\"}" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForUnsetTopLevelFieldUpdateEvent() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + Document updateObj = new Document() + .append("name", "") + .append("phone", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"phone\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForSetTopLevelFieldUpdateEventWithEmbeddedDocument() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("phone", 456L) + .append("address", new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")); + + Document updateObj = new Document() + .append("name", "Sally") + .append("phone", 123L) + .append("address", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"address\": {" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "\"phone\": {\"$numberLong\": \"123\"}" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.address.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForSetTopLevelFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("phone", 456L) + .append("addresses", Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"), + new Document() + .append("number", 8L) + .append("street", "Fragkokklisiass") + .append("city", "Athense"))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam"), + new Document() + .append("number", 7L) + .append("street", "Fragkokklisias") + .append("city", "Athens"))); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"addresses\": [" + + "{" + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}," + + "{" + + "\"street\": \"Fragkokklisias\"," + + "\"city\": \"Athens\"" + + "}" + + "]," + + "\"phone\": {\"$numberLong\": \"123\"}" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldNotExcludeNestedFieldsForSetTopLevelFieldUpdateEventWithArrayOfArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("phone", 456L) + .append("addresses", Arrays.asList( + Collections.singletonList(new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")), + Collections.singletonList(new Document() + .append("number", 8L) + .append("street", "Fragkokklisiass") + .append("city", "Athense")))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + Collections.singletonList(new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")), + Collections.singletonList(new Document() + .append("number", 7L) + .append("street", "Fragkokklisias") + .append("city", "Athens")))); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"addresses\": [" + + "[" + + "{" + + "\"number\": {\"$numberLong\": \"34\"}," + + "\"street\": \"Claude Debussylaan\"," + + "\"city\": \"Amsterdam\"" + + "}" + + "]," + + "[" + + "{" + + "\"number\": {\"$numberLong\": \"7\"}," + + "\"street\": \"Fragkokklisias\"," + + "\"city\": \"Athens\"" + + "}" + + "]" + + "]," + + "\"phone\": {\"$numberLong\": \"123\"}" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForSetNestedFieldUpdateEventWithEmbeddedDocument() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("phone", 456L) + .append("address", new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")); + + Document updateObj = new Document() + .append("name", "Sally") + .append("address.number", 34L) + .append("address.street", "Claude Debussylaan") + .append("address.city", "Amsterdam"); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"address.city\": \"Amsterdam\"," + + "\"address.street\": \"Claude Debussylaan\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.address.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForSetNestedFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("addresses", Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("addresses.0.number", 34L) + .append("addresses.0.street", "Claude Debussylaan") + .append("addresses.0.city", "Amsterdam"); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"addresses.0.city\": \"Amsterdam\"," + + "\"addresses.0.street\": \"Claude Debussylaan\"," + + "\"name\": \"Sally\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldNotExcludeNestedFieldsForSetNestedFieldUpdateEventWithArrayOfArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("addresses", Arrays.asList( + Collections.singletonList(new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")), + Collections.singletonList(new Document() + .append("number", 8L) + .append("street", "Fragkokklisiass") + .append("city", "Athense")))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("addresses.0.0.number", 34L) + .append("addresses.0.0.street", "Claude Debussylaan") + .append("addresses.0.0.city", "Amsterdam"); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"addresses.0.0.city\": \"Amsterdam\"," + + "\"addresses.0.0.number\": {\"$numberLong\": \"34\"}," + + "\"addresses.0.0.street\": \"Claude Debussylaan\"," + + "\"name\": \"Sally\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForSetNestedFieldUpdateEventWithSeveralArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("addresses", Arrays.asList(Collections.singletonMap("second", + Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"))))); + Document updateObj = new Document() + .append("name", "Sally") + .append("addresses.0.second.0.number", 34L) + .append("addresses.0.second.0.street", "Claude Debussylaan") + .append("addresses.0.second.0.city", "Amsterdam"); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"addresses.0.second.0.city\": \"Amsterdam\"," + + "\"addresses.0.second.0.street\": \"Claude Debussylaan\"," + + "\"name\": \"Sally\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.second.number", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForSetNestedFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("addresses", Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("addresses.0.0.number", 34L) + .append("addresses.0.0.street", "Claude Debussylaan") + .append("addresses.0.0.city", "Amsterdam"); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"name\": \"Sally\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForSetToArrayFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally May") + .append("addresses", Arrays.asList( + new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame"))); + + Document updateObj = new Document() + .append("name", "Sally") + .append("addresses.0", new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$set\": {" + + "\"name\": \"Sally\"" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses", objId, obj, updateObj, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForUnsetNestedFieldUpdateEventWithEmbeddedDocument() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 456L) + .append("address", new Document() + .append("number", 45L) + .append("street", "Claude Debussylaann") + .append("city", "Amsterdame")) + .append("active", false) + .append("scores", Arrays.asList(1.2, 3.4, 5.6, 7.8)); + + Document updateObj = new Document() + .append("name", "") + .append("address.number", "") + .append("address.street", "") + .append("address.city", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"address.city\": true," + + "\"address.street\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.name,*.c1.address.number", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForUnsetNestedFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam"), + new Document() + .append("number", 7L) + .append("street", "Fragkokklisias") + .append("city", "Athens"))) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + Document updateObj = new Document() + .append("name", "") + .append("addresses.0.number", "") + .append("addresses.0.street", "") + .append("addresses.0.city", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"addresses.0.city\": true," + + "\"addresses.0.street\": true," + + "\"name\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.number", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldNotExcludeNestedFieldsForUnsetNestedFieldUpdateEventWithArrayOfArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("phone", 123L) + .append("addresses", Arrays.asList( + Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam")))) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + Document updateObj = new Document() + .append("name", "") + .append("addresses.0.0.number", "") + .append("addresses.0.0.street", "") + .append("addresses.0.0.city", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"addresses.0.0.city\": true," + + "\"addresses.0.0.number\": true," + + "\"addresses.0.0.street\": true," + + "\"name\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.number", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldExcludeNestedFieldsForUnsetNestedFieldUpdateEventWithSeveralArrays() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("addresses", Arrays.asList(Collections.singletonMap("second", + Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam"))))); + + Document updateObj = new Document() + .append("name", "") + .append("addresses.0.second.0.number", "") + .append("addresses.0.second.0.street", "") + .append("addresses.0.second.0.city", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"addresses.0.second.0.city\": true," + + "\"addresses.0.second.0.street\": true," + + "\"name\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses.second.number", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForUnsetNestedFieldUpdateEventWithArrayOfEmbeddedDocuments() throws InterruptedException { + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("_id", objId) + .append("name", "Sally") + .append("addresses", Arrays.asList( + new Document() + .append("number", 34L) + .append("street", "Claude Debussylaan") + .append("city", "Amsterdam"))); + + Document updateObj = new Document() + .append("name", "") + .append("addresses.0.number", "") + .append("addresses.0.street", "") + .append("addresses.0.city", ""); + + // @formatter:off + String expected = "{" + + "\"$v\": 1," + + "\"$unset\": {" + + "\"name\": true" + + "}" + + "}"; + // @formatter:on + + assertUpdateRecord("*.c1.addresses", objId, obj, updateObj, false, PATCH, expected); + } + + @Test + public void shouldExcludeFieldsForDeleteEvent() throws InterruptedException { + config = getConfiguration("*.c1.name,*.c1.active"); + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + + ObjectId objId = new ObjectId(); + Document obj = new Document("_id", objId); + storeDocuments("dbA", "c1", obj); + + start(MongoDbConnector.class, config); + + SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.topics().size()).isEqualTo(1); + assertThat(snapshotRecords.allRecordsInOrder().size()).isEqualTo(1); + + // Wait for streaming to start and perform an update + waitForStreamingRunning("mongodb", SERVER_NAME); + deleteDocuments("dbA", "c1", objId); + + // Get the delete records (1 delete and 1 tombstone) + SourceRecords deleteRecords = consumeRecordsByTopic(2); + assertThat(deleteRecords.topics().size()).isEqualTo(1); + assertThat(deleteRecords.allRecordsInOrder().size()).isEqualTo(2); + + // Only validating delete record, non-tombstone + SourceRecord record = deleteRecords.allRecordsInOrder().get(0); + Struct value = getValue(record); + + String json = value.getString(AFTER); + if (json == null) { + json = value.getString(PATCH); + } + + assertThat(json).isNull(); + } + + @Test + public void shouldExcludeFieldsForDeleteTombstoneEvent() throws InterruptedException { + config = getConfiguration("*.c1.name,*.c1.active"); + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + + ObjectId objId = new ObjectId(); + Document obj = new Document("_id", objId); + storeDocuments("dbA", "c1", obj); + + start(MongoDbConnector.class, config); + + SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.topics().size()).isEqualTo(1); + assertThat(snapshotRecords.allRecordsInOrder().size()).isEqualTo(1); + + // Wait for streaming to start and perform an update + waitForStreamingRunning("mongodb", SERVER_NAME); + deleteDocuments("dbA", "c1", objId); + + // Get the delete records (1 delete and 1 tombstone) + SourceRecords deleteRecords = consumeRecordsByTopic(2); + assertThat(deleteRecords.topics().size()).isEqualTo(1); + assertThat(deleteRecords.allRecordsInOrder().size()).isEqualTo(2); + + // Only validating tombstone record, non-delete + SourceRecord record = deleteRecords.allRecordsInOrder().get(1); + Struct value = getValue(record); + + assertThat(value).isNull(); + } + + private Configuration getConfiguration(String blackList) { + return TestHelper.getConfiguration().edit() + .with(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, blackList) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1") + .with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME) + .build(); + } + + private Struct getValue(SourceRecord record) { + return (Struct) record.value(); + } + + private BiConsumer connectionErrorHandler(int numErrorsBeforeFailing) { + AtomicInteger attempts = new AtomicInteger(); + return (desc, error) -> { + if (attempts.incrementAndGet() > numErrorsBeforeFailing) { + fail("Unable to connect to primary after " + numErrorsBeforeFailing + " errors trying to " + desc + ": " + error); + } + logger.error("Error while attempting to {}: {}", desc, error.getMessage(), error); + }; + } + + private MongoPrimary primary() { + ReplicaSet replicaSet = ReplicaSet.parse(context.getConnectionContext().hosts()); + return context.getConnectionContext().primaryFor(replicaSet, context.filters(), connectionErrorHandler(3)); + } + + private void storeDocuments(String dbName, String collectionName, Document... documents) { + primary().execute("store documents", mongo -> { + Testing.debug("Storing in '" + dbName + "." + collectionName + "' document"); + MongoDatabase db = mongo.getDatabase(dbName); + MongoCollection coll = db.getCollection(collectionName); + coll.drop(); + + for (Document document : documents) { + InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true); + assertThat(document).isNotNull(); + assertThat(document.size()).isGreaterThan(0); + coll.insertOne(document, insertOptions); + } + }); + } + + private void updateDocuments(String dbName, String collectionName, ObjectId objId, Document document, boolean doSet) { + primary().execute("update", mongo -> { + MongoDatabase db = mongo.getDatabase(dbName); + MongoCollection coll = db.getCollection(collectionName); + Document filter = Document.parse("{\"_id\": {\"$oid\": \"" + objId + "\"}}"); + coll.updateOne(filter, new Document().append(doSet ? "$set" : "$unset", document)); + }); + } + + private void deleteDocuments(String dbName, String collectionName, ObjectId objId) { + primary().execute("delete", mongo -> { + MongoDatabase db = mongo.getDatabase(dbName); + MongoCollection coll = db.getCollection(collectionName); + Document filter = Document.parse("{\"_id\": {\"$oid\": \"" + objId + "\"}}"); + coll.deleteOne(filter); + }); + } + + private void assertReadRecord(String blackList, Document snapshotRecord, String field, String expected) throws InterruptedException { + config = getConfiguration(blackList); + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + storeDocuments("dbA", "c1", snapshotRecord); + + start(MongoDbConnector.class, config); + + SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.topics().size()).isEqualTo(1); + assertThat(snapshotRecords.allRecordsInOrder().size()).isEqualTo(1); + + SourceRecord record = snapshotRecords.allRecordsInOrder().get(0); + Struct value = getValue(record); + + assertThat(value.get(field)).isEqualTo(expected); + } + + private void assertInsertRecord(String blackList, Document insertRecord, String field, String expected) throws InterruptedException { + config = getConfiguration(blackList); + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + + start(MongoDbConnector.class, config); + waitForSnapshotToBeCompleted("mongodb", SERVER_NAME); + + storeDocuments("dbA", "c1", insertRecord); + + // Get the insert records + SourceRecords insertRecords = consumeRecordsByTopic(1); + assertThat(insertRecords.topics().size()).isEqualTo(1); + assertThat(insertRecords.allRecordsInOrder().size()).isEqualTo(1); + + SourceRecord record = insertRecords.allRecordsInOrder().get(0); + Struct value = getValue(record); + + assertThat(value.get(field)).isEqualTo(expected); + } + + private void assertUpdateRecord(String blackList, ObjectId objectId, Document snapshotRecord, Document updateRecord, + String field, String expected) + throws InterruptedException { + assertUpdateRecord(blackList, objectId, snapshotRecord, updateRecord, true, field, expected); + } + + private void assertUpdateRecord(String blackList, ObjectId objectId, Document snapshotRecord, Document updateRecord, + boolean doSet, String field, String expected) + throws InterruptedException { + config = getConfiguration(blackList); + context = new MongoDbTaskContext(config); + + TestHelper.cleanDatabase(primary(), "dbA"); + + storeDocuments("dbA", "c1", snapshotRecord); + + start(MongoDbConnector.class, config); + + // Get the snapshot records + SourceRecords snapshotRecords = consumeRecordsByTopic(1); + assertThat(snapshotRecords.topics().size()).isEqualTo(1); + assertThat(snapshotRecords.allRecordsInOrder().size()).isEqualTo(1); + + // Wait for streaming to start and perform an update + waitForStreamingRunning("mongodb", SERVER_NAME); + updateDocuments("dbA", "c1", objectId, updateRecord, doSet); + + // Get the update records + SourceRecords updateRecords = consumeRecordsByTopic(1); + assertThat(updateRecords.topics().size()).isEqualTo(1); + assertThat(updateRecords.allRecordsInOrder().size()).isEqualTo(1); + + SourceRecord record = updateRecords.allRecordsInOrder().get(0); + Struct value = getValue(record); + + Document expectedDoc = TestHelper.getDocumentWithoutLanguageVersion(expected); + Document actualDoc = TestHelper.getDocumentWithoutLanguageVersion(value.getString(field)); + assertThat(actualDoc).isEqualTo(expectedDoc); + } +} diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java index 789f4f9ff..417baa1a7 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FieldRenamesIT.java @@ -1413,7 +1413,7 @@ private static Configuration getConfiguration(String fieldRenames) { private static Configuration getConfiguration(String fieldRenames, String database, String collection) { Configuration.Builder builder = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, database + "." + collection) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, database + "." + collection) .with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME); if (fieldRenames != null && !"".equals(fieldRenames.trim())) { diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java index ab2e69ac9..dfa026909 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoDbConnectorIT.java @@ -59,6 +59,7 @@ import io.debezium.embedded.AbstractConnectorTest; import io.debezium.heartbeat.Heartbeat; import io.debezium.junit.logging.LogInterceptor; +import io.debezium.schema.DatabaseSchema; import io.debezium.util.Collect; import io.debezium.util.IoUtil; import io.debezium.util.Testing; @@ -122,9 +123,13 @@ public void shouldFailToValidateInvalidConfiguration() { assertNoConfigurationErrors(result, MongoDbConnectorConfig.PASSWORD); assertNoConfigurationErrors(result, MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS); assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_WHITELIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_BLACKLIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_WHITELIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_BLACKLIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_COPY_THREADS); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_QUEUE_SIZE); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_BATCH_SIZE); @@ -155,9 +160,13 @@ public void shouldValidateAcceptableConfiguration() { assertNoConfigurationErrors(result, MongoDbConnectorConfig.PASSWORD); assertNoConfigurationErrors(result, MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS); assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_WHITELIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_INCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_BLACKLIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_WHITELIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_BLACKLIST); + assertNoConfigurationErrors(result, MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_COPY_THREADS); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_QUEUE_SIZE); assertNoConfigurationErrors(result, MongoDbConnectorConfig.MAX_BATCH_SIZE); @@ -176,7 +185,7 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -365,7 +374,7 @@ public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws Int // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(MongoDbConnectorConfig.SKIPPED_OPERATIONS, "u") .build(); @@ -453,7 +462,7 @@ public void shouldConsumeAllEventsFromDatabaseWithCustomAuthSource() throws Inte // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -485,7 +494,7 @@ public void shouldConsumeAllEventsFromDatabaseWithCustomAuthSource() throws Inte .with(MongoDbConnectorConfig.PASSWORD, "pass") .with(MongoDbConnectorConfig.AUTH_SOURCE, authDbName) .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -544,7 +553,7 @@ public void shouldSupportDbRef() throws InterruptedException, IOException { // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -614,7 +623,7 @@ public void shouldConsumeEventsFromCollectionWithReplacedTopicName() throws Inte // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.dbz865.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -656,7 +665,7 @@ public void shouldConsumeEventsFromCollectionWithReplacedTopicName() throws Inte // --------------------------------------------------------------------------------------------------------------- // Stop the connector // --------------------------------------------------------------------------------------------------------------- - stopConnector(value -> assertThat(logInterceptor.containsWarnMessage(NO_MONITORED_TABLES_WARNING)).isFalse()); + stopConnector(value -> assertThat(logInterceptor.containsWarnMessage(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING)).isFalse()); } @Test @@ -668,7 +677,7 @@ public void testEmptySchemaWarningAfterApplyingCollectionFilters() throws Except // Use the DB configuration to define the connector's configuration... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.dbz865.my_products") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.my_products") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -694,7 +703,7 @@ public void testEmptySchemaWarningAfterApplyingCollectionFilters() throws Except // Consume all records consumeRecordsByTopic(12); - stopConnector(value -> assertThat(logInterceptor.containsWarnMessage(NO_MONITORED_TABLES_WARNING)).isTrue()); + stopConnector(value -> assertThat(logInterceptor.containsWarnMessage(DatabaseSchema.NO_CAPTURED_DATA_COLLECTIONS_WARNING)).isTrue()); } protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) { @@ -718,7 +727,7 @@ public void shouldConsumeTransaction() throws InterruptedException, IOException // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -812,7 +821,7 @@ public void shouldResumeTransactionInMiddle() throws InterruptedException, IOExc // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -912,7 +921,7 @@ public void shouldSnapshotDocumentContainingFieldNamedOp() throws Exception { // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1084,7 +1093,7 @@ public void shouldUseSSL() throws InterruptedException, IOException { // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS, 0) .with(MongoDbConnectorConfig.SSL_ENABLED, true) @@ -1106,7 +1115,7 @@ public void shouldEmitHeartbeatMessages() throws InterruptedException, IOExcepti // Use the DB configuration to define the connector's configuration ... config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.mhb") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.mhb") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(Heartbeat.HEARTBEAT_INTERVAL, "1") .build(); @@ -1183,7 +1192,7 @@ public void shouldEmitHeartbeatMessages() throws InterruptedException, IOExcepti public void shouldOutputRecordsInCloudEventsFormat() throws Exception { config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1218,7 +1227,7 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { @Test public void shouldGenerateRecordForInsertEvent() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1261,7 +1270,7 @@ public void shouldGenerateRecordForInsertEvent() throws Exception { @Test public void shouldGenerateRecordForUpdateEvent() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1318,7 +1327,7 @@ public void shouldGenerateRecordForUpdateEvent() throws Exception { @Test public void shouldGeneratorRecordForDeleteEvent() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1376,7 +1385,7 @@ public void shouldGeneratorRecordForDeleteEvent() throws Exception { @FixFor("DBZ-582") public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(MongoDbConnectorConfig.TOMBSTONES_ON_DELETE, false) .build(); @@ -1427,7 +1436,7 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exceptio @Test public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1496,7 +1505,7 @@ private static void assertSourceRecordKeyFieldIsEqualTo(SourceRecord record, Str @Test public void shouldSupportDbRef2() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); @@ -1548,7 +1557,7 @@ public void shouldSupportDbRef2() throws Exception { @Test public void shouldReplicateContent() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbA.contacts") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL) .build(); @@ -1701,7 +1710,7 @@ public void shouldReplicateContent() throws Exception { public void shouldNotReplicateSnapshot() throws Exception { // todo: this configuration causes NPE at MongoDbStreamingChangeEventSource.java:143 config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbA.contacts") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) .build(); @@ -1754,7 +1763,7 @@ public void shouldNotReplicateSnapshot() throws Exception { @FixFor("DBZ-1880") public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws Exception { config = TestHelper.getConfiguration().edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, "v1") .with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo") .build(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java index eeee82476..d450519c7 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/MongoMetricsIT.java @@ -27,7 +27,7 @@ public void testLifecycle() throws Exception { this.config = TestHelper.getConfiguration() .edit() .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .build(); this.context = new MongoDbTaskContext(config); @@ -68,7 +68,7 @@ public void testSnapshotOnlyMetrics() throws Exception { this.config = TestHelper.getConfiguration() .edit() .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .build(); this.context = new MongoDbTaskContext(config); @@ -108,7 +108,7 @@ public void testStreamingOnlyMetrics() throws Exception { this.config = TestHelper.getConfiguration() .edit() .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*") .build(); this.context = new MongoDbTaskContext(config); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TransactionMetadataIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TransactionMetadataIT.java index 2e3a2b417..05426afb1 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TransactionMetadataIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/TransactionMetadataIT.java @@ -27,7 +27,7 @@ public class TransactionMetadataIT extends AbstractMongoConnectorIT { public void transactionMetadata() throws Exception { config = TestHelper.getConfiguration() .edit() - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbA.c1") + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.c1") .with(MongoDbConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(MongoDbConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .build(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/AbstractExtractNewDocumentStateTestIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/AbstractExtractNewDocumentStateTestIT.java index c64a06863..3d2b5d1e3 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/AbstractExtractNewDocumentStateTestIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/AbstractExtractNewDocumentStateTestIT.java @@ -51,7 +51,7 @@ public void beforeEach() { // Use the DB configuration to define the connector's configuration ... Configuration config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, DB_NAME + "." + this.getCollectionName()) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, DB_NAME + "." + this.getCollectionName()) .with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME) .build(); @@ -97,7 +97,7 @@ protected void restartConnectorWithoutEmittingTombstones() { // reconfigure and restart Configuration config = TestHelper.getConfiguration().edit() .with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10) - .with(MongoDbConnectorConfig.COLLECTION_WHITELIST, DB_NAME + "." + this.getCollectionName()) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, DB_NAME + "." + this.getCollectionName()) .with(MongoDbConnectorConfig.LOGICAL_NAME, SERVER_NAME) .with(MongoDbConnectorConfig.TOMBSTONES_ON_DELETE, false) .build(); diff --git a/debezium-connector-mongodb/src/test/resources/log4j.properties b/debezium-connector-mongodb/src/test/resources/log4j.properties index 0366d2fa5..00884bc1c 100644 --- a/debezium-connector-mongodb/src/test/resources/log4j.properties +++ b/debezium-connector-mongodb/src/test/resources/log4j.properties @@ -3,6 +3,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %X{dbz.connectorType}|%X{dbz.connectorName}|%X{dbz.connectorContext} %m [%c]%n +log4j.appender.stdout.threshold=WARN # Root logger option log4j.rootLogger=INFO, stdout diff --git a/debezium-connector-mysql/pom.xml b/debezium-connector-mysql/pom.xml index 96452ebf2..132ff4580 100644 --- a/debezium-connector-mysql/pom.xml +++ b/debezium-connector-mysql/pom.xml @@ -115,7 +115,7 @@ 3306 3306 3306 - 3306 + 3306 60000