DBZ-6867 streaming pipeline should match events from collection include list as well as signal collection

This commit is contained in:
Jakub Cechacek 2023-09-04 12:27:05 +02:00 committed by Jiri Pechanec
parent c86b2648a5
commit 5a20419a98
3 changed files with 29 additions and 4 deletions

View File

@ -116,10 +116,10 @@ else if (filterConfig.getCollectionExcludeList() != null) {
}
// Combined filters
return andFilters(
dbFilters,
orFilters(
includedSignalCollectionFilters,
return orFilters(
includedSignalCollectionFilters,
andFilters(
dbFilters,
collectionsFilters));
}

View File

@ -173,6 +173,14 @@ public void shouldExcludeCollectionCoveredByLiteralInBlacklist() {
assertCollectionIncluded("db2.collectionA");
}
@Test
public void shouldIncludeAllCollectionsFromDatabaseWithSignalingCollection() {
filters = build.includeDatabases("db1")
.signalingCollection("db1.singal")
.createFilters();
assertCollectionIncluded("db1.other");
}
@Test
public void shouldIncludeSignalingCollectionAndNoWhitelistAndNoBlacklist() {
filters = build.signalingCollection("db1.signal").createFilters();

View File

@ -364,6 +364,23 @@ private <K> void snapshotOnly(K initialId, Function<K, K> idGenerator) throws Ex
assertThat(dbChanges).containsAllEntriesOf(expected);
}
@Test
public void shouldStreamWithDatabaseIncludeList() throws InterruptedException {
startConnector(
config -> config
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME)
.without(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name()),
loggingCompletion());
assertConnectorIsRunning();
assertNoRecordsToConsume();
insertDocuments(DATABASE_NAME, COLLECTION_NAME, new Document("foo", "bar"));
SourceRecords records = consumeRecordsByTopic(1);
records.topics().forEach(System.out::println);
assertThat(records.topics()).contains(topicName());
assertThat(records.recordsForTopic(topicName())).hasSize(1);
}
@Test
public void snapshotOnlyInt32() throws Exception {
snapshotOnly(0, k -> k + 1);