DBZ-6712 fix snapshot to history internal topic to respect schema.history.internal.store.only.captured.databases.ddl flag

This commit is contained in:
Matan Cohen 2023-08-07 12:16:38 +03:00 committed by Jiri Pechanec
parent b7f9f876a7
commit 40ac4662fb
5 changed files with 42 additions and 3 deletions

View File

@ -543,3 +543,4 @@ Stephen Clarkson
Gurps Bassi
Massimo Fortunat
Vincenzo Santonastaso
Matan Cohen

View File

@ -87,9 +87,15 @@ public RelationalTableFilters(Configuration config, TableFilter systemTablesFilt
? systemTablesFilter::isIncluded
: x -> true;
this.schemaSnapshotFilter = config.getBoolean(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL)
? eligibleSchemaPredicate.and(tableFilter::isIncluded)::test
: eligibleSchemaPredicate::test;
if (config.getBoolean(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL)) {
this.schemaSnapshotFilter = eligibleSchemaPredicate.and(tableFilter::isIncluded)::test;
}
else if (config.getBoolean(SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL)) {
this.schemaSnapshotFilter = finalEligibleTablePredicate::test;
}
else {
this.schemaSnapshotFilter = eligibleSchemaPredicate::test;
}
this.excludeColumns = config.getString(COLUMN_EXCLUDE_LIST);
}

View File

@ -43,6 +43,11 @@ public Configurator excludeCollections(String regexList) {
return with(RelationalDatabaseConnectorConfig.TABLE_EXCLUDE_LIST, regexList);
}
public Configurator storeOnlyCapturedDatabasesDdl(String flag) {
Testing.debug("Using \"" + HistorizedRelationalDatabaseConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL.name() + "\" config property");
return with(HistorizedRelationalDatabaseConnectorConfig.STORE_ONLY_CAPTURED_DATABASES_DDL, flag);
}
public Configurator signalingCollection(String signalingCollection) {
return with(RelationalDatabaseConnectorConfig.SIGNAL_DATA_COLLECTION, signalingCollection);
}

View File

@ -37,6 +37,20 @@ public void shouldIncludeDatabaseCoveredByMultipleLiteralsInWhitelist() {
assertThat(filters.databaseFilter().test("db2")).isTrue();
}
@Test
public void shouldIncludeAllCapturedDatabaseDdlInSchemaSnapshot() {
filters = build.includeDatabases("db1").createFilters();
assertCollectionIncludedInSchemaSnapshot("db1.collectionA");
assertCollectionIncludedInSchemaSnapshot("db2.collectionA");
}
@Test
public void shouldIncludeOnlyCapturedDatabaseDdlInSchemaSnapshot() {
filters = build.includeDatabases("db1").storeOnlyCapturedDatabasesDdl("true").createFilters();
assertCollectionIncludedInSchemaSnapshot("db1.collectionA");
assertCollectionExcludedFromSchemaSnapshot("db2.collectionA");
}
@Test
public void shouldIncludeDatabaseCoveredByWildcardInWhitelist() {
filters = build.includeDatabases("db.*").createFilters();
@ -197,4 +211,16 @@ protected void assertCollectionExcluded(String fullyQualifiedCollectionName) {
assertThat(filters.dataCollectionFilter().isIncluded(id)).isFalse();
}
protected void assertCollectionExcludedFromSchemaSnapshot(String fullyQualifiedCollectionName) {
TableId id = TableId.parse(fullyQualifiedCollectionName);
assertThat(id).isNotNull();
assertThat(filters.eligibleForSchemaDataCollectionFilter().isIncluded(id)).isFalse();
}
protected void assertCollectionIncludedInSchemaSnapshot(String fullyQualifiedCollectionName) {
TableId id = TableId.parse(fullyQualifiedCollectionName);
assertThat(id).isNotNull();
assertThat(filters.eligibleForSchemaDataCollectionFilter().isIncluded(id)).isTrue();
}
}

View File

@ -228,3 +228,4 @@ tyrantlucifer,Chao Tian
ryanvanhuuksloot,Ryan van Huuksloot
vsantona, Vincenzo Santonastaso
rolevinks, Stein Rolevink
matan-cohen,Matan Cohen