From 8ea2b3c5b79fe344a61bdf70ae117fe5768e48cf Mon Sep 17 00:00:00 2001 From: mfvitale Date: Fri, 21 Jul 2023 15:55:44 +0200 Subject: [PATCH] DBZ-6669 Snapshot will get data when only signal data collection table is specified --- .../postgresql/RecordsSnapshotProducerIT.java | 21 +++++++++++++++++++ .../RelationalSnapshotChangeEventSource.java | 20 ++++++++++-------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 2815ef52c..0b1f2c41a 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -1226,6 +1226,27 @@ public void shouldIncludePartitionedTableIntoSnapshot() throws Exception { .forEach(i -> VerifyRecord.isValidRead(recordsForTopicPart.remove(0), PK_FIELD, expectedPks[i])); } + @Test + @FixFor("DBZ-6669") + public void shouldGenerateSnapshotWhenSignalDataCollectionIsPresentWithoutTableIncludeList() throws Exception { + + TestHelper.dropAllSchemas(); + TestHelper.execute("CREATE TABLE t1 (pk SERIAL, aa integer, PRIMARY KEY(pk)); INSERT INTO t1 VALUES (default, 11)"); + + buildWithStreamProducer(TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL) + .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "public.debezium_signal") + .with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)); + + TestConsumer consumer = testConsumer(1); + + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + + final SourceRecord first = consumer.remove(); + VerifyRecord.isValidRead(first, PK_FIELD, 1); + assertRecordOffsetAndSnapshotSource(first, SnapshotRecord.LAST); + } + private void buildNoStreamProducer(Configuration.Builder config) { alterConfig(config); start(PostgresConnector.class, config diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 1d8cc9ea6..30e720a13 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -74,6 +74,7 @@ public abstract class RelationalSnapshotChangeEventSource

toTableIds(Set tableIds, Pattern pattern) { .sorted(); } - private Set addSignalingCollectionAndSort(Set capturedTables) throws Exception { + private Set addSignalingCollectionAndSort(Set capturedTables) { + String tableIncludeList = connectorConfig.tableIncludeList(); String signalingDataCollection = connectorConfig.getSignalingDataCollectionId(); + List captureTablePatterns = new ArrayList<>(); if (!Strings.isNullOrBlank(tableIncludeList)) { captureTablePatterns.addAll(Strings.listOfRegex(tableIncludeList, Pattern.CASE_INSENSITIVE)); } + else { + captureTablePatterns.add(MATCH_ALL_PATTERN); + } + if (!Strings.isNullOrBlank(signalingDataCollection)) { captureTablePatterns.addAll(getSignalDataCollectionPattern(signalingDataCollection)); } - if (captureTablePatterns.size() > 0) { - return captureTablePatterns - .stream() - .flatMap(pattern -> toTableIds(capturedTables, pattern)) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - return capturedTables + + return captureTablePatterns .stream() - .sorted() + .flatMap(pattern -> toTableIds(capturedTables, pattern)) .collect(Collectors.toCollection(LinkedHashSet::new)); }