From c3a8ba4afb176b6cf3047e33f779f9109d93099a Mon Sep 17 00:00:00 2001 From: mfvitale Date: Wed, 8 May 2024 12:43:24 +0200 Subject: [PATCH] DBZ-7828 Correct filter query in snapshot task to handle SQLServer table quote --- .../mongodb/MongoDbConnectorConfig.java | 10 +++------ .../MongoDbSnapshotChangeEventSource.java | 8 +++---- .../PostgresSnapshotChangeEventSource.java | 4 ++-- .../connector/sqlserver/SnapshotIT.java | 8 ++++--- .../pipeline/source/SnapshottingTask.java | 8 ++++--- .../source/spi/SnapshotChangeEventSource.java | 12 +--------- .../RelationalSnapshotChangeEventSource.java | 22 +++++++++++++------ 7 files changed, 35 insertions(+), 37 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java index e6f63cfab..adce2839a 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbConnectorConfig.java @@ -1402,22 +1402,18 @@ protected SourceInfoStructMaker getSourceInfoStruc return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this); } - public Optional getSnapshotFilterQueryForCollection(CollectionId collectionId) { - return Optional.ofNullable(getSnapshotFilterQueryByCollection().get(collectionId.dbName() + "." + collectionId.name())); - } - - public Map getSnapshotFilterQueryByCollection() { + public Map getSnapshotFilterQueryByCollection() { String collectionList = getConfig().getString(SNAPSHOT_FILTER_QUERY_BY_COLLECTION); if (collectionList == null) { return Collections.emptyMap(); } - Map snapshotFilterQueryByCollection = new HashMap<>(); + Map snapshotFilterQueryByCollection = new HashMap<>(); for (String collection : collectionList.split(",")) { snapshotFilterQueryByCollection.put( - collection, + CollectionId.parse(collection), getConfig().getString( new StringBuilder().append(SNAPSHOT_FILTER_QUERY_BY_COLLECTION).append(".") .append(collection).toString())); diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index bd3b778ca..b05ba25a1 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -118,8 +118,8 @@ protected SnapshotResult doExecute(ChangeEventSourceContex @Override public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext previousOffset, SnapshotConfiguration snapshotConfiguration) { - Map filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() - .collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter)); + Map filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() + .collect(Collectors.toMap(k -> CollectionId.parse(k.getDataCollection().toString()), AdditionalCondition::getFilter)); return new SnapshottingTask(false, true, snapshotConfiguration.getDataCollections(), filtersByTable, true); } @@ -290,7 +290,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex MongoDbSnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, CollectionId collectionId, MongoDbConnection mongo, - Map snapshotFilterQueryForCollection) + Map snapshotFilterQueryForCollection) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); LOGGER.info("\t Exporting data for collection '{}'", collectionId); @@ -303,7 +303,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex final int batchSize = taskContext.getConnectorConfig().getSnapshotFetchSize(); long docs = 0; - Optional snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId.dbName() + "." + collectionId.name())); + Optional snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId)); Bson filterQuery = Document.parse(snapshotFilterForCollectionId.orElse("{}")); try (MongoCursor cursor = collection.find(filterQuery).batchSize(batchSize).iterator()) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index b538dd593..65a5af63c 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -32,6 +32,7 @@ import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; import io.debezium.snapshot.SnapshotterService; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -63,8 +64,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre boolean snapshotSchema = true; List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); - Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable(); boolean offsetExists = previousOffset != null; boolean snapshotInProgress = false; diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java index 801f45aa8..5e1996b0d 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/SnapshotIT.java @@ -564,17 +564,19 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception { } @Test - @FixFor("DBZ-5198") + @FixFor({ "DBZ-5198", "DBZ-7828" }) public void shouldHandleBracketsInSnapshotSelect() throws InterruptedException, SQLException { connection.execute( "CREATE TABLE [user detail] (id int PRIMARY KEY, name varchar(30))", - "INSERT INTO [user detail] VALUES(1, 'k')"); + "INSERT INTO [user detail] VALUES(1, 'k')", + "INSERT INTO [user detail] VALUES(2, 'k')"); TestHelper.enableTableCdc(connection, "user detail"); final Configuration config = TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.user detail") .with(SqlServerConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "[dbo].[user detail]") + .with(SqlServerConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".[dbo].[user detail]", "SELECT * FROM [dbo].[user detail] WHERE id = 2") .build(); start(SqlServerConnector.class, config); @@ -585,7 +587,7 @@ public void shouldHandleBracketsInSnapshotSelect() throws InterruptedException, assertThat(recordsForTopic.get(0).key()).isNotNull(); Struct value = (Struct) ((Struct) recordsForTopic.get(0).value()).get("after"); System.out.println("DATA: " + value); - assertThat(value.get("id")).isEqualTo(1); + assertThat(value.get("id")).isEqualTo(2); assertThat(value.get("name")).isEqualTo("k"); stopConnector(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java b/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java index d6ab01e15..0dd524bb0 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java @@ -8,6 +8,8 @@ import java.util.List; import java.util.Map; +import io.debezium.spi.schema.DataCollectionId; + /** * A configuration describing the task to be performed during snapshotting. */ @@ -16,11 +18,11 @@ public class SnapshottingTask { private final boolean snapshotSchema; private final boolean snapshotData; private final List dataCollections; - private final Map filterQueries; + private final Map filterQueries; private final boolean onDemand; - public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List dataCollections, Map filterQueries, boolean onDemand) { + public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List dataCollections, Map filterQueries, boolean onDemand) { this.snapshotSchema = snapshotSchema; this.snapshotData = snapshotData; this.dataCollections = dataCollections; @@ -53,7 +55,7 @@ public List getDataCollections() { /** * Map of query statement overrides by data collection */ - public Map getFilterQueries() { + public Map getFilterQueries() { return filterQueries; } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java index aa7e05239..f3c0594c2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java @@ -5,10 +5,6 @@ */ package io.debezium.pipeline.source.spi; -import java.util.Map; -import java.util.stream.Collectors; - -import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.spi.OffsetContext; @@ -45,11 +41,5 @@ public interface SnapshotChangeEventSource

filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() - .collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter)); - - return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable, true); - } + SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration); } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 9e4f760c8..bcdc445b7 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -45,6 +45,8 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher.SnapshotReceiver; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; @@ -118,8 +120,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO Set dataCollectionsToBeSnapshotted = getDataCollectionPattern(snapshottingTask.getDataCollections()); - Map snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries().entrySet().stream() - .collect(Collectors.toMap(e -> TableId.parse(e.getKey()), Map.Entry::getValue)); + Map snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries(); preSnapshot(); @@ -167,7 +168,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO if (snapshottingTask.snapshotData()) { LOGGER.info("Snapshot step 7 - Snapshotting data"); - createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable, snapshottingTask); + createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable); } else { LOGGER.info("Snapshot step 7 - Skipping snapshotting of data"); @@ -234,13 +235,21 @@ public Connection createSnapshotConnection() throws SQLException { return connection; } + @Override + public SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration) { + + Map filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() + .collect(Collectors.toMap(k -> TableId.parse(k.getDataCollection().toString()), AdditionalCondition::getFilter)); + + return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable, true); + } + public SnapshottingTask getSnapshottingTask(P partition, O previousOffset) { final Snapshotter snapshotter = snapshotterService.getSnapshotter(); List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); - Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable(); boolean offsetExists = previousOffset != null; boolean snapshotInProgress = false; @@ -442,8 +451,7 @@ protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotConte private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext, - Queue connectionPool, Map snapshotSelectOverridesByTable, - SnapshottingTask snapshottingTask) + Queue connectionPool, Map snapshotSelectOverridesByTable) throws Exception { tryStartingSnapshot(snapshotContext);