DBZ-8050 Skip data collection names that fails to be parsed as TableId

This commit is contained in:
mfvitale 2024-07-16 10:19:41 +02:00 committed by Chris Cranford
parent 3bd71a5e69
commit 3499f9e80d
2 changed files with 26 additions and 2 deletions

View File

@ -197,20 +197,29 @@ public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlati
LOGGER.trace("Adding data collections names {} to snapshot", dataCollectionIds);
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(buildDataCollection(additionalCondition, surrogateKey))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
this.correlationId = correlationId;
return newDataCollectionIds;
}
private Function<String, DataCollection<T>> buildDataCollection(List<AdditionalCondition> additionalCondition, String surrogateKey) {
private Function<String, Optional<DataCollection<T>>> buildDataCollection(List<AdditionalCondition> additionalCondition, String surrogateKey) {
return expandedCollectionName -> {
String filter = additionalCondition.stream()
.filter(condition -> condition.getDataCollection().matcher(expandedCollectionName).matches())
.map(AdditionalCondition::getFilter)
.findFirst()
.orElse("");
return new DataCollection<T>((T) TableId.parse(expandedCollectionName, useCatalogBeforeSchema), filter, surrogateKey);
try {
TableId parsedTable = TableId.parse(expandedCollectionName, useCatalogBeforeSchema);
return Optional.of(new DataCollection<T>((T) parsedTable, filter, surrogateKey));
}
catch (Exception e) {
LOGGER.warn("Unable to parse table identifier from {}. Skipping it.", expandedCollectionName);
return Optional.empty();
}
};
}

View File

@ -553,6 +553,21 @@ public void snapshotWithRegexDataCollections() throws Exception {
}
}
@Test
public void snapshotWithRegexDataCollectionsNotExist() throws Exception {
LogInterceptor interceptor = new LogInterceptor(AbstractIncrementalSnapshotChangeEventSource.class);
populateTable();
startConnector();
sendAdHocSnapshotSignal(".*notExist");
// Wait until the stop has been processed, verifying it was removed from the snapshot.
Awaitility.await().atMost(60, TimeUnit.SECONDS)
.until(() -> interceptor.containsMessage("Skipping read chunk because snapshot is not running"));
}
@Test
@FixFor("DBZ-6945")
public void snapshotWithDuplicateDataCollections() throws Exception {