DBZ-6669 Snapshot will get data when only signal data collection table is specified
This commit is contained in:
parent
0fbad0c60c
commit
8ea2b3c5b7
@ -1226,6 +1226,27 @@ public void shouldIncludePartitionedTableIntoSnapshot() throws Exception {
|
|||||||
.forEach(i -> VerifyRecord.isValidRead(recordsForTopicPart.remove(0), PK_FIELD, expectedPks[i]));
|
.forEach(i -> VerifyRecord.isValidRead(recordsForTopicPart.remove(0), PK_FIELD, expectedPks[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@FixFor("DBZ-6669")
|
||||||
|
public void shouldGenerateSnapshotWhenSignalDataCollectionIsPresentWithoutTableIncludeList() throws Exception {
|
||||||
|
|
||||||
|
TestHelper.dropAllSchemas();
|
||||||
|
TestHelper.execute("CREATE TABLE t1 (pk SERIAL, aa integer, PRIMARY KEY(pk)); INSERT INTO t1 VALUES (default, 11)");
|
||||||
|
|
||||||
|
buildWithStreamProducer(TestHelper.defaultConfig()
|
||||||
|
.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL)
|
||||||
|
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "public.debezium_signal")
|
||||||
|
.with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true));
|
||||||
|
|
||||||
|
TestConsumer consumer = testConsumer(1);
|
||||||
|
|
||||||
|
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
final SourceRecord first = consumer.remove();
|
||||||
|
VerifyRecord.isValidRead(first, PK_FIELD, 1);
|
||||||
|
assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.LAST);
|
||||||
|
}
|
||||||
|
|
||||||
private void buildNoStreamProducer(Configuration.Builder config) {
|
private void buildNoStreamProducer(Configuration.Builder config) {
|
||||||
alterConfig(config);
|
alterConfig(config);
|
||||||
start(PostgresConnector.class, config
|
start(PostgresConnector.class, config
|
||||||
|
@ -74,6 +74,7 @@ public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O
|
|||||||
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalSnapshotChangeEventSource.class);
|
||||||
|
|
||||||
public static final Pattern SELECT_ALL_PATTERN = Pattern.compile("\\*");
|
public static final Pattern SELECT_ALL_PATTERN = Pattern.compile("\\*");
|
||||||
|
public static final Pattern MATCH_ALL_PATTERN = Pattern.compile(".*");
|
||||||
|
|
||||||
private final RelationalDatabaseConnectorConfig connectorConfig;
|
private final RelationalDatabaseConnectorConfig connectorConfig;
|
||||||
private final JdbcConnection jdbcConnection;
|
private final JdbcConnection jdbcConnection;
|
||||||
@ -241,25 +242,26 @@ private Stream<TableId> toTableIds(Set<TableId> tableIds, Pattern pattern) {
|
|||||||
.sorted();
|
.sorted();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables) throws Exception {
|
private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables) {
|
||||||
|
|
||||||
String tableIncludeList = connectorConfig.tableIncludeList();
|
String tableIncludeList = connectorConfig.tableIncludeList();
|
||||||
String signalingDataCollection = connectorConfig.getSignalingDataCollectionId();
|
String signalingDataCollection = connectorConfig.getSignalingDataCollectionId();
|
||||||
|
|
||||||
List<Pattern> captureTablePatterns = new ArrayList<>();
|
List<Pattern> captureTablePatterns = new ArrayList<>();
|
||||||
if (!Strings.isNullOrBlank(tableIncludeList)) {
|
if (!Strings.isNullOrBlank(tableIncludeList)) {
|
||||||
captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
|
captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE));
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
captureTablePatterns.add(MATCH_ALL_PATTERN);
|
||||||
|
}
|
||||||
|
|
||||||
if (!Strings.isNullOrBlank(signalingDataCollection)) {
|
if (!Strings.isNullOrBlank(signalingDataCollection)) {
|
||||||
captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection));
|
captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection));
|
||||||
}
|
}
|
||||||
if (captureTablePatterns.size() > 0) {
|
|
||||||
return captureTablePatterns
|
return captureTablePatterns
|
||||||
.stream()
|
|
||||||
.flatMap(pattern -> toTableIds(capturedTables, pattern))
|
|
||||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
|
||||||
}
|
|
||||||
return capturedTables
|
|
||||||
.stream()
|
.stream()
|
||||||
.sorted()
|
.flatMap(pattern -> toTableIds(capturedTables, pattern))
|
||||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
.collect(Collectors.toCollection(LinkedHashSet::new));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user