diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java index 2b74772bb..ef66caacf 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/ChangeStreamPipelineFactory.java @@ -116,10 +116,10 @@ else if (filterConfig.getCollectionExcludeList() != null) { } // Combined filters - return andFilters( - dbFilters, - orFilters( - includedSignalCollectionFilters, + return orFilters( + includedSignalCollectionFilters, + andFilters( + dbFilters, collectionsFilters)); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FiltersTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FiltersTest.java index 7c0df861c..814371aee 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FiltersTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/FiltersTest.java @@ -173,6 +173,14 @@ public void shouldExcludeCollectionCoveredByLiteralInBlacklist() { assertCollectionIncluded("db2.collectionA"); } + @Test + public void shouldIncludeAllCollectionsFromDatabaseWithSignalingCollection() { + filters = build.includeDatabases("db1") + .signalingCollection("db1.singal") + .createFilters(); + assertCollectionIncluded("db1.other"); + } + @Test public void shouldIncludeSignalingCollectionAndNoWhitelistAndNoBlacklist() { filters = build.signalingCollection("db1.signal").createFilters(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java index 463513cfc..5ae5cb84b 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/IncrementalSnapshotIT.java @@ -364,6 +364,23 @@ private void snapshotOnly(K initialId, Function idGenerator) throws Ex assertThat(dbChanges).containsAllEntriesOf(expected); } + @Test + public void shouldStreamWithDatabaseIncludeList() throws InterruptedException { + startConnector( + config -> config + .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME) + .without(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name()), + loggingCompletion()); + assertConnectorIsRunning(); + assertNoRecordsToConsume(); + insertDocuments(DATABASE_NAME, COLLECTION_NAME, new Document("foo", "bar")); + SourceRecords records = consumeRecordsByTopic(1); + records.topics().forEach(System.out::println); + + assertThat(records.topics()).contains(topicName()); + assertThat(records.recordsForTopic(topicName())).hasSize(1); + } + @Test public void snapshotOnlyInt32() throws Exception { snapshotOnly(0, k -> k + 1);