DBZ-6787 Dedupe table list while incremental snapshot
This commit is contained in:
parent
b08acc6043
commit
fcbff074f4
@ -501,7 +501,7 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, Sna
|
||||
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
|
||||
boolean shouldReadChunk = !context.snapshotRunning();
|
||||
|
||||
List<String> expandedDataCollectionIds = expandDataCollectionIds(snapshotConfiguration.getDataCollections());
|
||||
List<String> expandedDataCollectionIds = expandAndDedupeDataCollectionIds(snapshotConfiguration.getDataCollections());
|
||||
if (expandedDataCollectionIds.size() > snapshotConfiguration.getDataCollections().size()) {
|
||||
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", snapshotConfiguration.getDataCollections(), expandedDataCollectionIds);
|
||||
}
|
||||
@ -549,7 +549,7 @@ public void stopSnapshot(P partition, OffsetContext offsetContext, Map<String, O
|
||||
}
|
||||
}
|
||||
else {
|
||||
final List<String> expandedDataCollectionIds = expandDataCollectionIds(dataCollectionIds);
|
||||
final List<String> expandedDataCollectionIds = expandAndDedupeDataCollectionIds(dataCollectionIds);
|
||||
LOGGER.info("Removing '{}' collections from incremental snapshot", expandedDataCollectionIds);
|
||||
// Iterate and remove any collections that are not current.
|
||||
// If current is marked for removal, delay that until after others have been removed.
|
||||
@ -606,7 +606,7 @@ protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String p
|
||||
* Expands the string-based list of data collection ids if supplied using regex to a list of
|
||||
* all matching explicit data collection ids.
|
||||
*/
|
||||
private List<String> expandDataCollectionIds(List<String> dataCollectionIds) {
|
||||
private List<String> expandAndDedupeDataCollectionIds(List<String> dataCollectionIds) {
|
||||
|
||||
return dataCollectionIds
|
||||
.stream()
|
||||
@ -618,8 +618,7 @@ private List<String> expandDataCollectionIds(List<String> dataCollectionIds) {
|
||||
.filter(t -> Pattern.compile(x).matcher(t).matches())
|
||||
.collect(Collectors.toList());
|
||||
return ids.isEmpty() ? Stream.of(x) : ids.stream();
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}).distinct().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -15,6 +15,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -410,6 +411,24 @@ public void snapshotWithRegexDataCollections() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-6945")
|
||||
public void snapshotWithDuplicateDataCollections() throws Exception {
|
||||
populateTable();
|
||||
startConnector();
|
||||
sendAdHocSnapshotSignal(tableDataCollectionId(), tableDataCollectionId());
|
||||
|
||||
final int expectedRecordCount = ROW_COUNT;
|
||||
Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
|
||||
for (int i = 0; i < expectedRecordCount; i++) {
|
||||
assertThat(dbChanges).contains(entry(i + 1, i));
|
||||
}
|
||||
|
||||
SourceRecords sourceRecords = consumeRecordsByTopic(1, 1);
|
||||
assertTrue(Objects.isNull(sourceRecords.recordsForTopic(topicName())));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@FixFor("DBZ-4271")
|
||||
public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user