DBZ-7828 Correct filter query in snapshot task to handle SQLServer table quote

This commit is contained in:
mfvitale 2024-05-08 12:43:24 +02:00 committed by Jiri Pechanec
parent 6d49102c9b
commit c3a8ba4afb
7 changed files with 35 additions and 37 deletions

View File

@ -1402,22 +1402,18 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this);
}
public Optional<String> getSnapshotFilterQueryForCollection(CollectionId collectionId) {
return Optional.ofNullable(getSnapshotFilterQueryByCollection().get(collectionId.dbName() + "." + collectionId.name()));
}
public Map<String, String> getSnapshotFilterQueryByCollection() {
public Map<DataCollectionId, String> getSnapshotFilterQueryByCollection() {
String collectionList = getConfig().getString(SNAPSHOT_FILTER_QUERY_BY_COLLECTION);
if (collectionList == null) {
return Collections.emptyMap();
}
Map<String, String> snapshotFilterQueryByCollection = new HashMap<>();
Map<DataCollectionId, String> snapshotFilterQueryByCollection = new HashMap<>();
for (String collection : collectionList.split(",")) {
snapshotFilterQueryByCollection.put(
collection,
CollectionId.parse(collection),
getConfig().getString(
new StringBuilder().append(SNAPSHOT_FILTER_QUERY_BY_COLLECTION).append(".")
.append(collection).toString()));

View File

@ -118,8 +118,8 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
@Override
public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext previousOffset, SnapshotConfiguration snapshotConfiguration) {
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));
Map<DataCollectionId, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> CollectionId.parse(k.getDataCollection().toString()), AdditionalCondition::getFilter));
return new SnapshottingTask(false, true, snapshotConfiguration.getDataCollections(), filtersByTable, true);
}
@ -290,7 +290,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
CollectionId collectionId, MongoDbConnection mongo,
Map<String, String> snapshotFilterQueryForCollection)
Map<DataCollectionId, String> snapshotFilterQueryForCollection)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
LOGGER.info("\t Exporting data for collection '{}'", collectionId);
@ -303,7 +303,7 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
final int batchSize = taskContext.getConnectorConfig().getSnapshotFetchSize();
long docs = 0;
Optional<String> snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId.dbName() + "." + collectionId.name()));
Optional<String> snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId));
Bson filterQuery = Document.parse(snapshotFilterForCollectionId.orElse("{}"));
try (MongoCursor<BsonDocument> cursor = collection.find(filterQuery).batchSize(batchSize).iterator()) {

View File

@ -32,6 +32,7 @@
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
@ -63,8 +64,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre
boolean snapshotSchema = true;
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
Map<DataCollectionId, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable();
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;

View File

@ -564,17 +564,19 @@ public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
}
@Test
@FixFor("DBZ-5198")
@FixFor({ "DBZ-5198", "DBZ-7828" })
public void shouldHandleBracketsInSnapshotSelect() throws InterruptedException, SQLException {
connection.execute(
"CREATE TABLE [user detail] (id int PRIMARY KEY, name varchar(30))",
"INSERT INTO [user detail] VALUES(1, 'k')");
"INSERT INTO [user detail] VALUES(1, 'k')",
"INSERT INTO [user detail] VALUES(2, 'k')");
TestHelper.enableTableCdc(connection, "user detail");
final Configuration config = TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.user detail")
.with(SqlServerConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "[dbo].[user detail]")
.with(SqlServerConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + ".[dbo].[user detail]", "SELECT * FROM [dbo].[user detail] WHERE id = 2")
.build();
start(SqlServerConnector.class, config);
@ -585,7 +587,7 @@ public void shouldHandleBracketsInSnapshotSelect() throws InterruptedException,
assertThat(recordsForTopic.get(0).key()).isNotNull();
Struct value = (Struct) ((Struct) recordsForTopic.get(0).value()).get("after");
System.out.println("DATA: " + value);
assertThat(value.get("id")).isEqualTo(1);
assertThat(value.get("id")).isEqualTo(2);
assertThat(value.get("name")).isEqualTo("k");
stopConnector();

View File

@ -8,6 +8,8 @@
import java.util.List;
import java.util.Map;
import io.debezium.spi.schema.DataCollectionId;
/**
* A configuration describing the task to be performed during snapshotting.
*/
@ -16,11 +18,11 @@ public class SnapshottingTask {
private final boolean snapshotSchema;
private final boolean snapshotData;
private final List<String> dataCollections;
private final Map<String, String> filterQueries;
private final Map<DataCollectionId, String> filterQueries;
private final boolean onDemand;
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List<String> dataCollections, Map<String, String> filterQueries, boolean onDemand) {
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List<String> dataCollections, Map<DataCollectionId, String> filterQueries, boolean onDemand) {
this.snapshotSchema = snapshotSchema;
this.snapshotData = snapshotData;
this.dataCollections = dataCollections;
@ -53,7 +55,7 @@ public List<String> getDataCollections() {
/**
* Map of query statement overrides by data collection
*/
public Map<String, String> getFilterQueries() {
public Map<DataCollectionId, String> getFilterQueries() {
return filterQueries;
}

View File

@ -5,10 +5,6 @@
*/
package io.debezium.pipeline.source.spi;
import java.util.Map;
import java.util.stream.Collectors;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.spi.OffsetContext;
@ -45,11 +41,5 @@ public interface SnapshotChangeEventSource<P extends Partition, O extends Offset
/**
* Returns the blocking snapshotting task based on the snapshot configuration from the signal.
*/
default SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration) {
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));
return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable, true);
}
SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration);
}

View File

@ -45,6 +45,8 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
@ -118,8 +120,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
Set<Pattern> dataCollectionsToBeSnapshotted = getDataCollectionPattern(snapshottingTask.getDataCollections());
Map<DataCollectionId, String> snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries().entrySet().stream()
.collect(Collectors.toMap(e -> TableId.parse(e.getKey()), Map.Entry::getValue));
Map<DataCollectionId, String> snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries();
preSnapshot();
@ -167,7 +168,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
if (snapshottingTask.snapshotData()) {
LOGGER.info("Snapshot step 7 - Snapshotting data");
createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable, snapshottingTask);
createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable);
}
else {
LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");
@ -234,13 +235,21 @@ public Connection createSnapshotConnection() throws SQLException {
return connection;
}
@Override
public SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration) {
Map<DataCollectionId, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> TableId.parse(k.getDataCollection().toString()), AdditionalCondition::getFilter));
return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable, true);
}
public SnapshottingTask getSnapshottingTask(P partition, O previousOffset) {
final Snapshotter snapshotter = snapshotterService.getSnapshotter();
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
Map<DataCollectionId, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable();
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;
@ -442,8 +451,7 @@ protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotConte
private void createDataEvents(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<P, O> snapshotContext,
Queue<JdbcConnection> connectionPool, Map<DataCollectionId, String> snapshotSelectOverridesByTable,
SnapshottingTask snapshottingTask)
Queue<JdbcConnection> connectionPool, Map<DataCollectionId, String> snapshotSelectOverridesByTable)
throws Exception {
tryStartingSnapshot(snapshotContext);