DBZ-6072 NPE when setting schema.history.internal.store.only.captured.tables.ddl=true

This commit is contained in:
harveyyue 2023-02-08 23:11:16 +08:00 committed by Jiri Pechanec
parent 2f46c83b2a
commit 5dd4b53ec3

View File

@ -13,6 +13,7 @@
import java.time.Instant; import java.time.Instant;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -228,14 +229,22 @@ protected void connectionCreated(RelationalSnapshotContext<P, O> snapshotContext
private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) { private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
return tableIds return tableIds
.stream() .stream()
.filter(tid -> pattern.asPredicate().test(connectorConfig.getTableIdMapper().toString(tid)) || connectorConfig.isSignalDataCollection(tid)) .filter(tid -> pattern.asPredicate().test(connectorConfig.getTableIdMapper().toString(tid)))
.sorted(); .sorted();
} }
private Set<TableId> sort(Set<TableId> capturedTables) throws Exception { private Set<TableId> sort(Set<TableId> capturedTables) throws Exception {
String tableIncludeList = connectorConfig.tableIncludeList(); String tableIncludeList = connectorConfig.tableIncludeList();
if (tableIncludeList != null) { String signalingDataCollection = connectorConfig.getSignalingDataCollectionId();
return Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE) List<Pattern> captureTablePatterns = new ArrayList<>();
if (!Strings.isNullOrBlank(tableIncludeList)) {
captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
}
if (!Strings.isNullOrBlank(signalingDataCollection)) {
captureTablePatterns.addAll(Strings.listOfRegex(signalingDataCollection, Pattern.CASE_INSENSITIVE));
}
if (captureTablePatterns.size() > 0) {
return captureTablePatterns
.stream() .stream()
.flatMap(pattern -> toTableIds(capturedTables, pattern)) .flatMap(pattern -> toTableIds(capturedTables, pattern))
.collect(Collectors.toCollection(LinkedHashSet::new)); .collect(Collectors.toCollection(LinkedHashSet::new));