From 542b3619fa55e32f03fbc148d17b9dbce59ef2e1 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Thu, 3 Aug 2023 11:44:51 +0200 Subject: [PATCH] DBZ-6731 Blocking snapshot takes configuration from signal payload --- ...bIncrementalSnapshotChangeEventSource.java | 31 ++++---- .../MongoDbIncrementalSnapshotContext.java | 7 +- .../MongoDbSnapshotChangeEventSource.java | 67 +++++++++++------ .../connector/mongodb/BlockingSnapshotIT.java | 75 +++++++++++++++---- .../connector/mongodb/NotificationsIT.java | 1 - ...yIncrementalSnapshotChangeEventSource.java | 7 +- .../mysql/MySqlSnapshotChangeEventSource.java | 19 +++-- .../connector/mysql/BlockingSnapshotIT.java | 8 +- .../resources/ddl/blocking_snapshot-test.sql | 5 ++ .../OracleSnapshotChangeEventSource.java | 18 +++-- .../connector/oracle/BlockingSnapshotIT.java | 14 +++- .../PostgresSnapshotChangeEventSource.java | 14 ++-- .../postgresql/BlockingSnapshotIT.java | 8 +- .../sqlserver/SqlServerConnectorConfig.java | 13 +++- .../SqlServerSnapshotChangeEventSource.java | 15 ++-- .../sqlserver/BlockingSnapshotIT.java | 7 +- .../config/CommonConnectorConfig.java | 7 +- .../ChangeEventSourceCoordinator.java | 21 +++++- .../actions/AbstractSnapshotSignal.java | 4 + .../snapshotting/AdditionalCondition.java | 70 +++++++++++++++++ .../actions/snapshotting/ExecuteSnapshot.java | 51 +++++++++++-- .../snapshotting/SnapshotConfiguration.java | 71 ++++++++++++++++++ .../AbstractSnapshotChangeEventSource.java | 64 ++++------------ .../pipeline/source/SnapshottingTask.java | 71 ++++++++++++++++++ ...tIncrementalSnapshotChangeEventSource.java | 17 +++-- .../AbstractIncrementalSnapshotContext.java | 29 +++++-- .../snapshot/incremental/DataCollection.java | 14 ++-- .../IncrementalSnapshotChangeEventSource.java | 5 +- .../IncrementalSnapshotContext.java | 5 +- .../source/spi/SnapshotChangeEventSource.java | 37 ++++++--- .../RelationalDatabaseConnectorConfig.java | 17 ++++- .../RelationalSnapshotChangeEventSource.java | 28 ++++--- .../main/java/io/debezium/util/Strings.java | 2 +- ...nalBasedSnapshotChangeEventSourceTest.java | 6 +- .../embedded/AbstractConnectorTest.java | 13 ++++ .../AbstractBlockingSnapshotTest.java | 47 ++++++++---- .../AbstractIncrementalSnapshotTest.java | 39 ++++++++-- .../incremental/AbstractSnapshotTest.java | 75 ++++++++++++++++--- 38 files changed, 757 insertions(+), 245 deletions(-) create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/AdditionalCondition.java create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/SnapshotConfiguration.java create mode 100644 debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index bcfa13713..1db16e91c 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -14,7 +14,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -37,6 +36,7 @@ import io.debezium.pipeline.signal.SignalPayload; import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow; import io.debezium.pipeline.signal.actions.snapshotting.OpenIncrementalSnapshotWindow; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.DataCollection; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; @@ -389,31 +389,31 @@ private Object[] readMaximumKey() throws InterruptedException { @Override @SuppressWarnings("unchecked") public void addDataCollectionNamesToSnapshot(SignalPayload signalPayload, - List dataCollectionIds, - Optional additionalCondition, Optional surrogateKey) + SnapshotConfiguration snapshotConfiguration) throws InterruptedException { final MongoDbPartition partition = signalPayload.partition; final OffsetContext offsetContext = signalPayload.offsetContext; final String correlationId = signalPayload.id; - if (additionalCondition != null && additionalCondition.isPresent()) { + if (!snapshotConfiguration.getAdditionalConditions().isEmpty()) { throw new UnsupportedOperationException("Additional condition not supported for MongoDB"); } - if (surrogateKey != null && surrogateKey.isPresent()) { + if (!Strings.isNullOrEmpty(snapshotConfiguration.getSurrogateKey())) { throw new UnsupportedOperationException("Surrogate key not supported for MongoDB"); } context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); final boolean shouldReadChunk = !context.snapshotRunning(); final String rsName = replicaSets.all().get(0).replicaSetName(); - dataCollectionIds = dataCollectionIds + List dataCollectionIds = snapshotConfiguration.getDataCollections() .stream() - .map(x -> rsName + "." + x) + .map(x -> rsName + "." + x.toString()) .collect(Collectors.toList()); - final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, Optional.empty(), - Optional.empty()); + + final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, List.of(), ""); + if (shouldReadChunk) { progressListener.snapshotStarted(partition); @@ -442,10 +442,11 @@ public void notifyReplicaSets(ReplicaSetNotifier notifier, OffsetC @Override @SuppressWarnings("unchecked") - public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext, Map additionalData, List dataCollectionIds) { + public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext, Map additionalData, List dataCollectionPatterns) { + context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); if (context.snapshotRunning()) { - if (dataCollectionIds == null || dataCollectionIds.isEmpty()) { + if (dataCollectionPatterns == null || dataCollectionPatterns.isEmpty()) { LOGGER.info("Stopping incremental snapshot."); try { // This must be called prior to closeWindow to ensure that the correct state is set @@ -467,9 +468,10 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext } } else { - LOGGER.info("Removing '{}' collections from incremental snapshot", dataCollectionIds); + LOGGER.info("Removing '{}' collections from incremental snapshot", dataCollectionPatterns); final String rsName = replicaSets.all().get(0).replicaSetName(); - dataCollectionIds = dataCollectionIds.stream().map(x -> rsName + "." + x).collect(Collectors.toList()); + final List dataCollectionIds = dataCollectionPatterns.stream().map(x -> rsName + "." + x.toString()).collect(Collectors.toList()); + for (String dataCollectionId : dataCollectionIds) { final CollectionId collectionId = CollectionId.parse(dataCollectionId); if (currentCollection != null && currentCollection.id().equals(collectionId)) { @@ -489,11 +491,10 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext } } - List finalDataCollectionIds = dataCollectionIds; notifyReplicaSets( (incrementalSnapshotContext, replicaSetPartition, replicaSetOffsetContext) -> notificationService.incrementalSnapshotNotificationService() .notifyAborted(incrementalSnapshotContext, replicaSetPartition, replicaSetOffsetContext, - finalDataCollectionIds), + dataCollectionIds), offsetContext); } } diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java index 0f34ecc72..eadb328e7 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java @@ -31,6 +31,7 @@ import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; import io.debezium.pipeline.source.snapshot.incremental.DataCollection; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext; import io.debezium.relational.Table; @@ -185,7 +186,7 @@ private String dataCollectionsToSnapshotAsString() { LinkedHashMap map = new LinkedHashMap<>(); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString()); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION, - x.getAdditionalCondition().orElse(null)); + x.getAdditionalCondition().isEmpty() ? null : x.getAdditionalCondition().orElse(null)); return map; }) .collect(Collectors.toList()); @@ -230,8 +231,8 @@ private void addTablesIdsToSnapshot(List> dataCollectionIds) { } @SuppressWarnings("unchecked") - public List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, Optional _additionalCondition, - Optional surrogateKey) { + public List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, List additionalCondition, + String surrogateKey) { final List> newDataCollectionIds = dataCollectionIds.stream() .map(x -> new DataCollection((T) CollectionId.parse(x))) .filter(x -> x.getId() != null) // Remove collections with incorrectly formatted name diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java index 3c8ffa01b..642c59e2f 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.java @@ -8,12 +8,16 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.bson.BsonDocument; @@ -40,7 +44,10 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.EventDispatcher.SnapshotReceiver; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.ChangeRecordEmitter; @@ -145,7 +152,7 @@ protected SnapshotResult doExecute(ChangeEventSourceContex executor.submit(() -> { try { taskContext.configureLoggingContext(replicaSet.replicaSetName()); - snapshotReplicaSet(context, mongoDbSnapshotContext, replicaSet); + snapshotReplicaSet(context, mongoDbSnapshotContext, replicaSet, snapshottingTask); } catch (Throwable t) { LOGGER.error("Snapshot for replica set {} failed", replicaSet.replicaSetName(), t); @@ -177,21 +184,28 @@ protected SnapshotResult doExecute(ChangeEventSourceContex } @Override - protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext offsetContext, boolean isBlockingSnapshot) { + public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext previousOffset, SnapshotConfiguration snapshotConfiguration) { - if (isBlockingSnapshot) { - return new MongoDbSnapshottingTask(replicaSets.all()); - } + Map filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() + .collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter)); + + return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable); + } + + @Override + public SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext offsetContext) { + + List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); // If no snapshot should occur, return task with no replica sets - if (connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) { + if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) { LOGGER.info("According to the connector configuration, no snapshot will occur."); - return new MongoDbSnapshottingTask(Collections.emptyList()); + return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of()); } if (offsetContext == null) { LOGGER.info("No previous offset has been found"); - return new MongoDbSnapshottingTask(replicaSets.all()); + return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection()); } // Collect which replica-sets require being snapshotted @@ -199,7 +213,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo .filter(replicaSet -> isSnapshotExpected(partition, replicaSet, offsetContext)) .collect(Collectors.toList()); - return new MongoDbSnapshottingTask(replicaSetsToSnapshot); + return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection()); } @Override @@ -207,9 +221,11 @@ protected SnapshotContext prepare(MongoD return new MongoDbSnapshotContext(partition); } - private void snapshotReplicaSet(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet) throws InterruptedException { + private void snapshotReplicaSet(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet, + SnapshottingTask snapshottingTask) + throws InterruptedException { try (MongoDbConnection mongo = connections.get(replicaSet, snapshotCtx.partition)) { - createDataEvents(sourceCtx, snapshotCtx, replicaSet, mongo); + createDataEvents(sourceCtx, snapshotCtx, replicaSet, mongo, snapshottingTask); } } @@ -281,13 +297,13 @@ private void initReplicaSetSnapshotStartOffsets(MongoDbSnapshotContext snapshotC } private void createDataEvents(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet, - MongoDbConnection mongo) + MongoDbConnection mongo, SnapshottingTask snapshottingTask) throws InterruptedException { initReplicaSetSnapshotStartOffsets(snapshotCtx, replicaSet, mongo); SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); snapshotCtx.offset.preSnapshotStart(); - createDataEventsForReplicaSet(sourceCtx, snapshotCtx, snapshotReceiver, replicaSet, mongo); + createDataEventsForReplicaSet(sourceCtx, snapshotCtx, snapshotReceiver, replicaSet, mongo, snapshottingTask); snapshotCtx.offset.preSnapshotCompletion(); snapshotReceiver.completeSnapshot(); @@ -300,12 +316,12 @@ private void createDataEvents(ChangeEventSourceContext sourceCtx, MongoDbSnapsho private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, - ReplicaSet replicaSet, MongoDbConnection mongo) + ReplicaSet replicaSet, MongoDbConnection mongo, SnapshottingTask snapshottingTask) throws InterruptedException { final String rsName = replicaSet.replicaSetName(); - final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) snapshotContext.offset; + final MongoDbOffsetContext offsetContext = snapshotContext.offset; final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet); snapshotContext.lastCollection = false; @@ -313,7 +329,10 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex LOGGER.info("Beginning snapshot of '{}' at {}", rsName, rsOffsetContext.getOffset()); - final List collections = determineDataCollectionsToBeSnapshotted(mongo.collections()).collect(Collectors.toList()); + Set dataCollectionPattern = getDataCollectionPattern(snapshottingTask.getDataCollections()); + + final List collections = determineDataCollectionsToBeSnapshotted(mongo.collections(), dataCollectionPattern) + .collect(Collectors.toList()); snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, collections); if (connectorConfig.getSnapshotMaxThreads() > 1) { // Since multiple snapshot threads are to be used, create a thread pool and initiate the snapshot. @@ -351,7 +370,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex snapshotReceiver, replicaSet, id, - mongo); + mongo, snapshottingTask.getFilterQueries()); } } catch (InterruptedException e) { @@ -396,7 +415,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex snapshotReceiver, replicaSet, collectionId, - mongo); + mongo, snapshottingTask.getFilterQueries()); } } @@ -406,7 +425,8 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex private void createDataEventsForCollection(ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, SnapshotReceiver snapshotReceiver, - ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo) + ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo, + Map snapshotFilterQueryForCollection) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); @@ -419,7 +439,8 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex final int batchSize = taskContext.getConnectorConfig().getSnapshotFetchSize(); long docs = 0; - Bson filterQuery = Document.parse(connectorConfig.getSnapshotFilterQueryForCollection(collectionId).orElseGet(() -> "{}")); + Optional snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId.dbName() + "." + collectionId.name())); + Bson filterQuery = Document.parse(snapshotFilterForCollectionId.orElse("{}")); try (MongoCursor cursor = collection.find(filterQuery).batchSize(batchSize).iterator()) { snapshotContext.lastRecordInCollection = false; @@ -474,14 +495,14 @@ private Clock getClock() { /** * A configuration describing the task to be performed during snapshotting. * - * @see AbstractSnapshotChangeEventSource.SnapshottingTask + * @see SnapshottingTask */ public static class MongoDbSnapshottingTask extends SnapshottingTask { private final List replicaSetsToSnapshot; - public MongoDbSnapshottingTask(List replicaSetsToSnapshot) { - super(false, !replicaSetsToSnapshot.isEmpty()); + public MongoDbSnapshottingTask(List replicaSetsToSnapshot, List dataCollections, Map filterQueries) { + super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries); this.replicaSetsToSnapshot = replicaSetsToSnapshot; } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/BlockingSnapshotIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/BlockingSnapshotIT.java index 00cf64d6a..b8afb6f88 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/BlockingSnapshotIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/BlockingSnapshotIT.java @@ -50,9 +50,13 @@ public class BlockingSnapshotIT extends AbstractMongoConnectorIT { private static final String DATABASE_NAME = "dbA"; private static final String COLLECTION_NAME = "c1"; + + private static final String COLLECTION2_NAME = "c2"; private static final String SIGNAL_COLLECTION_NAME = DATABASE_NAME + ".signals"; private static final String FULL_COLLECTION_NAME = DATABASE_NAME + "." + COLLECTION_NAME; + private static final String FULL_COLLECTION2_NAME = DATABASE_NAME + "." + COLLECTION2_NAME; + private static final String DOCUMENT_ID = "_id"; @Before @@ -82,7 +86,7 @@ public void executeBlockingSnapshot() throws Exception { assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2); - sendAdHocBlockingSnapshotSignal(fullDataCollectionName()); + sendAdHocBlockingSnapshotSignal("[A-z].*" + fullDataCollectionName()); waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); @@ -90,9 +94,7 @@ public void executeBlockingSnapshot() throws Exception { insertRecords(ROW_COUNT, (ROW_COUNT * 2)); - int signalingRecords = 1; - - assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords); + assertStreamingRecordsArePresent(ROW_COUNT); } @@ -110,7 +112,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception { Thread.sleep(2000); // Let's start stream some insert - sendAdHocBlockingSnapshotSignal(fullDataCollectionName()); + sendAdHocBlockingSnapshotSignal("[A-z].*" + fullDataCollectionName()); waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); @@ -122,11 +124,32 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception { insertRecords(ROW_COUNT, (ROW_COUNT * 2)); - int signalingRecords = 1 + // from streaming - 1; // from snapshot + int signalingRecords = 1; // from streaming assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), - getExpectedValues(totalSnapshotRecords)); + getExpectedValues(totalSnapshotRecords), topicName()); + } + + @Test + public void executeBlockingSnapshotWithAdditionalCondition() throws Exception { + // Testing.Print.enable(); + + populateDataCollection(dataCollectionNames().get(1).toString()); + + startConnector(Function.identity()); + + waitForStreamingRunning("mongodb", "mongo1", getStreamingNamespace(), "0"); + + sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey( + Map.of(fullDataCollectionNames().get(1), "{ aa: { $lt: 500 } }"), + "[A-z].*" + fullDataCollectionNames().get(1)); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + int signalingRecords = 1; // from streaming + + assertRecordsWithValuesPresent(500 + signalingRecords, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1)); + } protected Class connectorClass() { @@ -141,10 +164,11 @@ protected Configuration.Builder config() { return TestHelper.getConfiguration(mongo) .edit() .with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME) - .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, fullDataCollectionName()) + .with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, String.join(",", fullDataCollectionNames())) .with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME) .with(MongoDbConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) + .with(MongoDbConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*dbA.c1") .with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL); } @@ -152,14 +176,26 @@ protected String dataCollectionName() { return COLLECTION_NAME; } + protected List dataCollectionNames() { + return List.of(COLLECTION_NAME, COLLECTION2_NAME); + } + protected String fullDataCollectionName() { return FULL_COLLECTION_NAME; } + protected List fullDataCollectionNames() { + return List.of(FULL_COLLECTION_NAME, FULL_COLLECTION2_NAME); + } + protected String topicName() { return "mongo1" + "." + fullDataCollectionName(); } + protected List topicNames() { + return fullDataCollectionNames().stream().map(x -> "mongo1." + x).collect(Collectors.toList()); + } + protected void populateDataCollection(String dataCollectionName) { final Document[] documents = new Document[ROW_COUNT]; for (int i = 0; i < ROW_COUNT; i++) { @@ -230,19 +266,19 @@ private Future executeAsync(Runnable operation) { private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException { - assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList())); + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), topicName()); } private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException { - assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 2).boxed().collect(Collectors.toList())); + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 2).boxed().collect(Collectors.toList()), topicName()); } - private void assertRecordsWithValuesPresent(int expectedRecords, List expectedValues) throws InterruptedException { + private void assertRecordsWithValuesPresent(int expectedRecords, List expectedValues, String topicName) throws InterruptedException { SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10); assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords); - List actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() + List actual = snapshotAndStreamingRecords.recordsForTopic(topicName).stream() .map(record -> extractFieldValue(record, "aa")) .collect(Collectors.toList()); assertThat(actual).containsAll(expectedValues); @@ -301,6 +337,19 @@ protected void sendAdHocBlockingSnapshotSignal(String... dataCollectionIds) { Document.parse("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList + "]}}")); } + protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map additionalConditions, String... dataCollectionIds) { + + final String conditions = additionalConditions.entrySet().stream() + .map(e -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", e.getKey(), e.getValue())).collect( + Collectors.joining(",")); + final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) + .map(x -> "\"" + x + "\"") + .collect(Collectors.joining(", ")); + insertDocuments("dbA", "signals", + Document.parse("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList + + "], \"additional-conditions\": [" + conditions + "]}}")); + } + protected void startConnector(Function custConfig) { startConnector(custConfig, loggingCompletion()); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java index 710b64ce4..46aeed396 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/NotificationsIT.java @@ -58,7 +58,6 @@ public class NotificationsIT extends AbstractMongoConnectorIT { private static final String DATABASE_NAME = "dbA"; private static final String COLLECTION_NAME = "c1"; - private static final String SIGNAL_COLLECTION_NAME = DATABASE_NAME + ".signals"; private static final String FULL_COLLECTION_NAME = DATABASE_NAME + "." + COLLECTION_NAME; @Before diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java index 77c6cea50..30c69c50a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java @@ -8,7 +8,6 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Consumer; import org.slf4j.Logger; @@ -19,6 +18,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.signal.SignalPayload; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.signal.channels.KafkaSignalChannel; import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.spi.DataChangeEventListener; @@ -221,13 +221,12 @@ protected void sendEvent(MySqlPartition partition, EventDispatcher signalPayload, - List dataCollectionIds, - Optional additionalCondition, Optional surrogateKey) + SnapshotConfiguration snapshotConfiguration) throws InterruptedException { final Map additionalData = signalPayload.additionalData; - super.addDataCollectionNamesToSnapshot(signalPayload, dataCollectionIds, additionalCondition, surrogateKey); + super.addDataCollectionNamesToSnapshot(signalPayload, snapshotConfiguration); getContext().setSignalOffset((Long) additionalData.get(KafkaSignalChannel.CHANNEL_OFFSET)); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index e3d7232fc..6be617b88 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -44,6 +44,7 @@ import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.RelationalTableFilters; import io.debezium.relational.Table; @@ -86,28 +87,30 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, Main } @Override - protected SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset, boolean isBlockingSnapshot) { + public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) { - if (isBlockingSnapshot) { - return new SnapshottingTask(true, true); - } + List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); // found a previous offset and the earlier snapshot has completed if (previousOffset != null && !previousOffset.isSnapshotRunning()) { LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); - return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false); + return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable); } LOGGER.info("No previous offset has been found"); - if (connectorConfig.getSnapshotMode().includeData()) { + if (this.connectorConfig.getSnapshotMode().includeData()) { LOGGER.info("According to the connector configuration both schema and data will be snapshotted"); } else { LOGGER.info("According to the connector configuration only schema will be snapshotted"); } - return new SnapshottingTask(connectorConfig.getSnapshotMode().includeSchema(), connectorConfig.getSnapshotMode().includeData()); + return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeSchema(), this.connectorConfig.getSnapshotMode().includeData(), + dataCollectionsToBeSnapshotted, + snapshotSelectOverridesByTable); } @Override @@ -560,7 +563,7 @@ private String quote(TableId id) { @Override protected OptionalLong rowCountForTable(TableId tableId) { - if (getSnapshotSelectOverridesByTable(tableId) != null) { + if (getSnapshotSelectOverridesByTable(tableId, connectorConfig.getSnapshotSelectOverridesByTable()) != null) { return super.rowCountForTable(tableId); } OptionalLong rowCount = connection.getEstimatedTableSize(tableId); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java index f1d8b658d..f8d26c5c4 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java @@ -71,8 +71,8 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) - .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl) + .with(MySqlConnectorConfig.SNAPSHOT_MODE_TABLES, DATABASE.qualifiedTableName("a")) .with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO); } @@ -103,7 +103,7 @@ protected String topicName() { @Override protected List topicNames() { - return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("c")); + return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("b")); } @Override @@ -114,7 +114,7 @@ protected String tableName() { @Override protected List tableNames() { final String tableA = TableId.parse(DATABASE.qualifiedTableName("a")).toQuotedString('`'); - final String tableB = TableId.parse(DATABASE.qualifiedTableName("c")).toQuotedString('`'); + final String tableB = TableId.parse(DATABASE.qualifiedTableName("b")).toQuotedString('`'); return List.of(tableA, tableB); } @@ -135,7 +135,7 @@ protected String tableDataCollectionId() { @Override protected List tableDataCollectionIds() { - return List.of(tableNameId().toString(), tableNameId("c").toString()); + return List.of(tableNameId().toString(), tableNameId("b").toString()); } private TableId tableNameId() { diff --git a/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql b/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql index fcb9906ea..49306bcda 100644 --- a/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql +++ b/debezium-connector-mysql/src/test/resources/ddl/blocking_snapshot-test.sql @@ -7,6 +7,11 @@ CREATE TABLE a ( aa INTEGER ) AUTO_INCREMENT = 1; +CREATE TABLE b ( + pk INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + aa INTEGER +) AUTO_INCREMENT = 1; + CREATE TABLE debezium_signal ( id varchar(64), type varchar(32), diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java index 14b5a1eb2..e12cc84e9 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/OracleSnapshotChangeEventSource.java @@ -11,6 +11,7 @@ import java.time.Instant; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -25,6 +26,7 @@ import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.RelationalSnapshotChangeEventSource; @@ -59,18 +61,18 @@ public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, Ma } @Override - protected SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset, boolean isBlockingSnapshot) { + public SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset) { boolean snapshotSchema = true; boolean snapshotData; - if (isBlockingSnapshot) { - return new SnapshottingTask(true, true); - } + List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); // for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN - if (OracleConnectorConfig.SnapshotMode.ALWAYS == connectorConfig.getSnapshotMode()) { + if (OracleConnectorConfig.SnapshotMode.ALWAYS == this.connectorConfig.getSnapshotMode()) { LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset."); - snapshotData = connectorConfig.getSnapshotMode().includeData(); + snapshotData = this.connectorConfig.getSnapshotMode().includeData(); } // found a previous offset and the earlier snapshot has completed else if (previousOffset != null && !previousOffset.isSnapshotRunning()) { @@ -80,7 +82,7 @@ else if (previousOffset != null && !previousOffset.isSnapshotRunning()) { } else { LOGGER.info("No previous offset has been found."); - snapshotData = connectorConfig.getSnapshotMode().includeData(); + snapshotData = this.connectorConfig.getSnapshotMode().includeData(); } if (snapshotData && snapshotSchema) { @@ -90,7 +92,7 @@ else if (snapshotSchema) { LOGGER.info("According to the connector configuration only schema will be snapshot."); } - return new SnapshottingTask(snapshotSchema, snapshotData); + return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable); } @Override diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java index c7eb3d878..468310e4c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java @@ -33,9 +33,13 @@ public void before() throws Exception { connection = TestHelper.testConnection(); TestHelper.dropTable(connection, "a"); + TestHelper.dropTable(connection, "b"); connection.execute("CREATE TABLE a (pk numeric(9,0) primary key, aa numeric(9,0))"); + connection.execute("CREATE TABLE b (pk numeric(9,0) primary key, aa numeric(9,0))"); connection.execute("GRANT INSERT on a to " + TestHelper.getConnectorUserName()); + connection.execute("GRANT INSERT on b to " + TestHelper.getConnectorUserName()); TestHelper.streamTable(connection, "a"); + TestHelper.streamTable(connection, "b"); TestHelper.dropTable(connection, "debezium_signal"); connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))"); @@ -52,6 +56,7 @@ public void after() throws Exception { stopConnector(); if (connection != null) { TestHelper.dropTable(connection, "a"); + TestHelper.dropTable(connection, "b"); TestHelper.dropTable(connection, "debezium_signal"); connection.close(); } @@ -85,7 +90,7 @@ protected String topicName() { @Override protected List topicNames() { - return List.of("server1.DEBEZIUM.A"); + return List.of("server1.DEBEZIUM.A", "server1.DEBEZIUM.B"); } @Override @@ -95,7 +100,7 @@ protected String tableName() { @Override protected List tableNames() { - return List.of("DEBEZIUM.A"); + return List.of("DEBEZIUM.A", "DEBEZIUM.B"); } @Override @@ -105,7 +110,7 @@ protected String tableDataCollectionId() { @Override protected List tableDataCollectionIds() { - return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A"); + return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A", TestHelper.getDatabaseName() + ".DEBEZIUM.B"); } @Override @@ -118,7 +123,8 @@ protected Configuration.Builder config() { return TestHelper.defaultConfig() .with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL) .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL") - .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A") + .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") + .with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A") .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index 38e34e3e3..8323bfa7a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -8,6 +8,7 @@ import java.sql.SQLException; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -24,6 +25,7 @@ import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.Table; @@ -57,13 +59,13 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig } @Override - protected SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset, boolean isBlockingSnapshot) { + public SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) { boolean snapshotSchema = true; boolean snapshotData = true; - if (isBlockingSnapshot) { - return new SnapshottingTask(true, true); - } + List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); snapshotData = snapshotter.shouldSnapshot(); if (snapshotData) { @@ -74,7 +76,7 @@ protected SnapshottingTask getSnapshottingTask(PostgresPartition partition, Post snapshotSchema = false; } - return new SnapshottingTask(snapshotSchema, snapshotData); + return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable); } @Override @@ -107,7 +109,7 @@ protected Set getAllTableIds(RelationalSnapshotContext snapshotContext) - throws SQLException, InterruptedException { + throws SQLException { final Duration lockTimeout = connectorConfig.snapshotLockTimeout(); final Optional lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java index b77fc9984..b1f496db3 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/BlockingSnapshotIT.java @@ -24,7 +24,9 @@ public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { private static final String TOPIC_NAME = "test_server.s1.a"; private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" + - "CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + + "CREATE SCHEMA s1;" + + "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + + "CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));" + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"; @Before @@ -49,7 +51,7 @@ public void after() { protected Configuration.Builder config() { return TestHelper.defaultConfig() - .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal") .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) @@ -69,7 +71,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10) .with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1") - .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a"); + .with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a"); } @Override diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java index 80f02e1ca..367cea2e9 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnectorConfig.java @@ -33,6 +33,7 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.history.HistoryRecordComparator; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Strings; /** @@ -488,7 +489,8 @@ public String getConnectorName() { } @Override - public Map getSnapshotSelectOverridesByTable() { + public Map getSnapshotSelectOverridesByTable() { + List tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ","); if (tableValues == null) { @@ -498,9 +500,18 @@ public Map getSnapshotSelectOverridesByTable() { Map snapshotSelectOverridesByTable = new HashMap<>(); for (String table : tableValues) { + + String statementOverride = getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table); + if (statementOverride == null) { + LOGGER.warn("Detected snapshot.select.statement.overrides for {} but no statement property {} defined", + SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table, table); + continue; + } + snapshotSelectOverridesByTable.put( TableId.parse(table, new SqlServerTableIdPredicates()), getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table)); + } return Collections.unmodifiableMap(snapshotSelectOverridesByTable); diff --git a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java index ef1b86370..270778abd 100644 --- a/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java +++ b/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerSnapshotChangeEventSource.java @@ -25,6 +25,7 @@ import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.relational.Column; import io.debezium.relational.RelationalSnapshotChangeEventSource; @@ -58,13 +59,13 @@ public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConf } @Override - protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, SqlServerOffsetContext previousOffset, boolean isBlockingSnapshot) { + public SnapshottingTask getSnapshottingTask(SqlServerPartition partition, SqlServerOffsetContext previousOffset) { boolean snapshotSchema = true; boolean snapshotData = true; - if (isBlockingSnapshot) { - return new SnapshottingTask(true, true); - } + List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); + Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); // found a previous offset and the earlier snapshot has completed if (previousOffset != null && !previousOffset.isSnapshotRunning()) { @@ -74,16 +75,16 @@ protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, Sql } else { LOGGER.info("No previous offset has been found"); - if (connectorConfig.getSnapshotMode().includeData()) { + if (this.connectorConfig.getSnapshotMode().includeData()) { LOGGER.info("According to the connector configuration both schema and data will be snapshotted"); } else { LOGGER.info("According to the connector configuration only schema will be snapshotted"); } - snapshotData = connectorConfig.getSnapshotMode().includeData(); + snapshotData = this.connectorConfig.getSnapshotMode().includeData(); } - return new SnapshottingTask(snapshotSchema, snapshotData); + return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable); } @Override diff --git a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java index 2a439e557..3f92fe5ed 100644 --- a/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java +++ b/debezium-connector-sqlserver/src/test/java/io/debezium/connector/sqlserver/BlockingSnapshotIT.java @@ -31,6 +31,7 @@ public void before() throws SQLException { connection = TestHelper.testConnection(); connection.execute( "CREATE TABLE a (pk int primary key, aa int)", + "CREATE TABLE b (pk int primary key, aa int)", "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"); TestHelper.enableTableCdc(connection, "debezium_signal"); TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL); @@ -69,7 +70,7 @@ protected String topicName() { @Override protected List topicNames() { - return List.of("server1.testDB1.dbo.a"); + return List.of("server1.testDB1.dbo.a", "server1.testDB1.dbo.b"); } @Override @@ -79,7 +80,7 @@ protected String tableName() { @Override protected List tableNames() { - return List.of("testDB1.dbo.a"); + return List.of("testDB1.dbo.a", "testDB1.dbo.b"); } @Override @@ -106,7 +107,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s return TestHelper.defaultConfig() .with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL) .with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal") - .with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList) + .with(SqlServerConnectorConfig.SNAPSHOT_MODE_TABLES, tableName()) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl); } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 60253f853..82fe957fa 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -17,7 +17,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -998,10 +997,10 @@ public EnumSet getSkippedOperations() { return skippedOperations; } - public Set getDataCollectionsToBeSnapshotted() { + public List getDataCollectionsToBeSnapshotted() { return Optional.ofNullable(config.getString(SNAPSHOT_MODE_TABLES)) - .map(tables -> Strings.setOfRegex(tables, Pattern.CASE_INSENSITIVE)) - .orElseGet(Collections::emptySet); + .map(expr -> Arrays.asList(Strings.RegExSplitter.split(expr))) + .orElseGet(Collections::emptyList); } public Map getCustomMetricTags() { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java index 12a801a7e..70841bdb4 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/ChangeEventSourceCoordinator.java @@ -34,6 +34,8 @@ import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.signal.SignalProcessor; import io.debezium.pipeline.signal.actions.SignalActionProvider; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSource; import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext; @@ -196,7 +198,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps } } - public void doBlockingSnapshot(P partition, OffsetContext offsetContext) { + public void doBlockingSnapshot(P partition, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) { blockingSnapshotExecutor.submit(() -> { @@ -211,7 +213,9 @@ public void doBlockingSnapshot(P partition, OffsetContext offsetContext) { previousLogContext.set(taskContext.configureLoggingContext("snapshot")); LOGGER.info("Starting snapshot"); - SnapshotResult snapshotResult = doSnapshot(snapshotSource, context, partition, (O) offsetContext); + + SnapshottingTask snapshottingTask = snapshotSource.getBlockingSnapshottingTask(partition, (O) offsetContext, snapshotConfiguration); + SnapshotResult snapshotResult = doSnapshot(snapshotSource, context, partition, (O) offsetContext, snapshottingTask); if (running && snapshotResult.isCompletedOrSkipped()) { previousLogContext.set(taskContext.configureLoggingContext("streaming", partition)); @@ -227,6 +231,16 @@ public void doBlockingSnapshot(P partition, OffsetContext offsetContext) { protected SnapshotResult doSnapshot(SnapshotChangeEventSource snapshotSource, ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException { + + SnapshottingTask snapshottingTask = snapshotSource.getSnapshottingTask(partition, previousOffset); + + return doSnapshot(snapshotSource, context, partition, previousOffset, snapshottingTask); + } + + protected SnapshotResult doSnapshot(SnapshotChangeEventSource snapshotSource, ChangeEventSourceContext context, P partition, O previousOffset, + SnapshottingTask snapshottingTask) + throws InterruptedException { + CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(context, snapshotSource, partition, previousOffset); if (catchUpStreamingResult.performedCatchUpStreaming) { streamingConnected(false); @@ -235,7 +249,8 @@ protected SnapshotResult doSnapshot(SnapshotChangeEventSource snapshotS commitOffsetLock.unlock(); } eventDispatcher.setEventListener(snapshotMetrics); - SnapshotResult snapshotResult = snapshotSource.execute(context, partition, previousOffset); + + SnapshotResult snapshotResult = snapshotSource.execute(context, partition, previousOffset, snapshottingTask); LOGGER.info("Snapshot ended with {}", snapshotResult); if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/AbstractSnapshotSignal.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/AbstractSnapshotSignal.java index 9138a9876..b45388931 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/AbstractSnapshotSignal.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/AbstractSnapshotSignal.java @@ -20,9 +20,13 @@ public abstract class AbstractSnapshotSignal

implements Sig private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSnapshotSignal.class); protected static final String FIELD_DATA_COLLECTIONS = "data-collections"; + protected static final String FIELD_DATA_COLLECTION = "data-collection"; protected static final String FIELD_TYPE = "type"; + @Deprecated protected static final String FIELD_ADDITIONAL_CONDITION = "additional-condition"; + protected static final String FIELD_ADDITIONAL_CONDITIONS = "additional-conditions"; protected static final String FIELD_SURROGATE_KEY = "surrogate-key"; + protected static final String FIELD_FILTER = "filter"; public enum SnapshotType { INCREMENTAL, diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/AdditionalCondition.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/AdditionalCondition.java new file mode 100644 index 000000000..e99def868 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/AdditionalCondition.java @@ -0,0 +1,70 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.signal.actions.snapshotting; + +import java.util.regex.Pattern; + +/** + * Contains filtering information for snapshot + */ +public class AdditionalCondition { + + /** + * Tha data collection to which the filter applies + */ + private Pattern dataCollection; + + /** + * In case of an incremental snapshot specifies a condition based on the field(s) of the data collection(s). + * For the blocking snapshot specifies the query statement for the connector to run on the data collection when it takes a snapshot + */ + private String filter; + + public Pattern getDataCollection() { + return dataCollection; + } + + public String getFilter() { + return filter; + } + + @Override + public String toString() { + return "AdditionalCondition{" + + "dataCollection=" + dataCollection + + ", filter='" + filter + '\'' + + '}'; + } + + public static final class AdditionalConditionBuilder { + private Pattern dataCollection; + private String filter; + + private AdditionalConditionBuilder() { + } + + public static AdditionalConditionBuilder builder() { + return new AdditionalConditionBuilder(); + } + + public AdditionalConditionBuilder dataCollection(Pattern dataCollection) { + this.dataCollection = dataCollection; + return this; + } + + public AdditionalConditionBuilder filter(String filter) { + this.filter = filter; + return this; + } + + public AdditionalCondition build() { + AdditionalCondition additionalCondition = new AdditionalCondition(); + additionalCondition.filter = this.filter; + additionalCondition.dataCollection = this.dataCollection; + return additionalCondition; + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/ExecuteSnapshot.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/ExecuteSnapshot.java index fe8097f11..e30cfcf62 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/ExecuteSnapshot.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/ExecuteSnapshot.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Optional; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -14,6 +15,7 @@ import io.debezium.document.Array; import io.debezium.document.Document; +import io.debezium.document.Value; import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.signal.SignalPayload; @@ -36,6 +38,7 @@ public class ExecuteSnapshot

extends AbstractSnapshotSignal private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteSnapshot.class); public static final String NAME = "execute-snapshot"; + private static final String MATCH_ALL_PATTERN = ".*"; private final EventDispatcher dispatcher; private final ChangeEventSourceCoordinator changeEventSourceCoordinator; @@ -47,29 +50,62 @@ public ExecuteSnapshot(EventDispatcher dispatcher @Override public boolean arrived(SignalPayload

signalPayload) throws InterruptedException { + final List dataCollections = getDataCollections(signalPayload.data); if (dataCollections == null) { return false; } SnapshotType type = getSnapshotType(signalPayload.data); - Optional additionalCondition = getAdditionalCondition(signalPayload.data); + + List additionalConditions = getAdditionalConditions(signalPayload.data, type); Optional surrogateKey = getSurrogateKey(signalPayload.data); - LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}' and surrogate key '{}'", type, dataCollections, - additionalCondition.orElse("No condition passed"), surrogateKey.orElse("PK of table will be used")); + + LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional conditions '{}' and surrogate key '{}'", type, dataCollections, + additionalConditions, surrogateKey.orElse("PK of table will be used")); + + SnapshotConfiguration.Builder snapsthoConfigurationBuilder = SnapshotConfiguration.Builder.builder(); + snapsthoConfigurationBuilder.dataCollections(dataCollections); + snapsthoConfigurationBuilder.surrogateKey(surrogateKey.orElse("")); + additionalConditions.forEach(snapsthoConfigurationBuilder::addCondition); switch (type) { case INCREMENTAL: dispatcher.getIncrementalSnapshotChangeEventSource().addDataCollectionNamesToSnapshot( - signalPayload, dataCollections, additionalCondition, surrogateKey); + signalPayload, snapsthoConfigurationBuilder.build()); break; case BLOCKING: - changeEventSourceCoordinator.doBlockingSnapshot(signalPayload.partition, signalPayload.offsetContext); + changeEventSourceCoordinator.doBlockingSnapshot(signalPayload.partition, signalPayload.offsetContext, snapsthoConfigurationBuilder.build()); break; } return true; } + private List getAdditionalConditions(Document data, SnapshotType type) { + + // TODO remove in 2.5 release + Optional oldAdditionalConditionField = getAdditionalCondition(data); + if (oldAdditionalConditionField.isPresent() && type.equals(SnapshotType.INCREMENTAL)) { + return List.of(AdditionalCondition.AdditionalConditionBuilder.builder() + .dataCollection(Pattern.compile(MATCH_ALL_PATTERN, Pattern.CASE_INSENSITIVE)) + .filter(oldAdditionalConditionField.orElse("")) + .build()); + } + + return Optional.ofNullable(data.getArray(FIELD_ADDITIONAL_CONDITIONS)).orElse(Array.create()).streamValues() + .map(this::buildAdditionalCondition) + .collect(Collectors.toList()); + } + + private AdditionalCondition buildAdditionalCondition(Value value) { + + return AdditionalCondition.AdditionalConditionBuilder.builder() + .dataCollection(Pattern.compile(value.asDocument().getString(FIELD_DATA_COLLECTION), Pattern.CASE_INSENSITIVE)) + .filter(value.asDocument().getString(FIELD_FILTER)) + .build(); + } + public static List getDataCollections(Document data) { + final Array dataCollectionsArray = data.getArray(FIELD_DATA_COLLECTIONS); if (dataCollectionsArray == null || dataCollectionsArray.isEmpty()) { LOGGER.warn( @@ -82,6 +118,11 @@ public static List getDataCollections(Document data) { .collect(Collectors.toList()); } + /** + * TODO remove in 2.5 release + * @deprecated Use {getAdditionalConditions} instead. + */ + @Deprecated public static Optional getAdditionalCondition(Document data) { String additionalCondition = data.getString(FIELD_ADDITIONAL_CONDITION); return Strings.isNullOrBlank(additionalCondition) ? Optional.empty() : Optional.of(additionalCondition); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/SnapshotConfiguration.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/SnapshotConfiguration.java new file mode 100644 index 000000000..feba107a0 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/actions/snapshotting/SnapshotConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.signal.actions.snapshotting; + +import java.util.ArrayList; +import java.util.List; + +/** + * Contains information required for the snapshot + */ +public class SnapshotConfiguration { + + /** + * this is a list of regular expressions + */ + private List dataCollections; + + private List additionalConditions; + private String surrogateKey; + + public List getDataCollections() { + return dataCollections; + } + + public List getAdditionalConditions() { + return additionalConditions; + } + + public String getSurrogateKey() { + return surrogateKey; + } + + public static final class Builder { + private List dataCollections; + private final List additionalConditions = new ArrayList<>(); + private String surrogateKey; + + private Builder() { + } + + public static Builder builder() { + return new Builder(); + } + + public Builder dataCollections(List dataCollections) { + this.dataCollections = dataCollections; + return this; + } + + public Builder addCondition(AdditionalCondition additionalCondition) { + this.additionalConditions.add(additionalCondition); + return this; + } + + public Builder surrogateKey(String surrogateKey) { + this.surrogateKey = surrogateKey; + return this; + } + + public SnapshotConfiguration build() { + SnapshotConfiguration snapshotConfiguration = new SnapshotConfiguration(); + snapshotConfiguration.surrogateKey = this.surrogateKey; + snapshotConfiguration.dataCollections = this.dataCollections; + snapshotConfiguration.additionalConditions = this.additionalConditions; + return snapshotConfiguration; + } + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java index ce4a7de43..ab4b26b5d 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/AbstractSnapshotChangeEventSource.java @@ -7,8 +7,10 @@ import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -27,6 +29,7 @@ import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.Metronome; +import io.debezium.util.Strings; import io.debezium.util.Threads; /** @@ -61,9 +64,7 @@ protected Offsets getOffsets(SnapshotContext ctx, O prev } @Override - public SnapshotResult execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException { - - SnapshottingTask snapshottingTask = getSnapshottingTask(partition, previousOffset, context.isPaused()); + public SnapshotResult execute(ChangeEventSourceContext context, P partition, O previousOffset, SnapshottingTask snapshottingTask) throws InterruptedException { final SnapshotContext ctx; try { @@ -118,9 +119,9 @@ public SnapshotResult execute(ChangeEventSourceContext context, P partition, } } - protected Stream determineDataCollectionsToBeSnapshotted(final Collection allDataCollections) { - final Set snapshotAllowedDataCollections = connectorConfig.getDataCollectionsToBeSnapshotted(); - if (snapshotAllowedDataCollections.size() == 0) { + protected Stream determineDataCollectionsToBeSnapshotted(final Collection allDataCollections, + Set snapshotAllowedDataCollections) { + if (snapshotAllowedDataCollections.isEmpty()) { return allDataCollections.stream(); } else { @@ -172,7 +173,7 @@ protected abstract SnapshotResult doExecute(ChangeEventSourceContext context, /** * Returns the snapshotting task based on the previous offset (if available) and the connector's snapshotting mode. */ - protected abstract SnapshottingTask getSnapshottingTask(P partition, O previousOffset, boolean isBlockingSnapshot); + public abstract SnapshottingTask getSnapshottingTask(P partition, O previousOffset); /** * Prepares the taking of a snapshot and returns an initial {@link SnapshotContext}. @@ -206,6 +207,13 @@ protected void completed(SnapshotContext snapshotContext) { protected void aborted(SnapshotContext snapshotContext) { } + protected Set getDataCollectionPattern(List dataCollections) { + return dataCollections.stream() + .map(tables -> Strings.setOfRegex(tables, Pattern.CASE_INSENSITIVE)) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + /** * Mutable context which is populated in the course of snapshotting */ @@ -222,46 +230,4 @@ public void close() throws Exception { } } - /** - * A configuration describing the task to be performed during snapshotting. - */ - public static class SnapshottingTask { - - private final boolean snapshotSchema; - private final boolean snapshotData; - - public SnapshottingTask(boolean snapshotSchema, boolean snapshotData) { - this.snapshotSchema = snapshotSchema; - this.snapshotData = snapshotData; - } - - /** - * Whether data (rows in captured tables) should be snapshotted. - */ - public boolean snapshotData() { - return snapshotData; - } - - /** - * Whether the schema of captured tables should be snapshotted. - */ - public boolean snapshotSchema() { - return snapshotSchema; - } - - /** - * Whether to skip the snapshot phase. - * - * By default this method will skip performing a snapshot if both {@link #snapshotSchema()} and - * {@link #snapshotData()} return {@code false}. - */ - public boolean shouldSkipSnapshot() { - return !snapshotSchema() && !snapshotData(); - } - - @Override - public String toString() { - return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + "]"; - } - } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java b/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java new file mode 100644 index 000000000..50d47dca9 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/SnapshottingTask.java @@ -0,0 +1,71 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.pipeline.source; + +import java.util.List; +import java.util.Map; + +/** + * A configuration describing the task to be performed during snapshotting. + */ +public class SnapshottingTask { + + private final boolean snapshotSchema; + private final boolean snapshotData; + private final List dataCollections; + private final Map filterQueries; + + public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List dataCollections, Map filterQueries) { + this.snapshotSchema = snapshotSchema; + this.snapshotData = snapshotData; + this.dataCollections = dataCollections; + this.filterQueries = filterQueries; + } + + /** + * Whether data (rows in captured tables) should be snapshotted. + */ + public boolean snapshotData() { + return snapshotData; + } + + /** + * Whether the schema of captured tables should be snapshotted. + */ + public boolean snapshotSchema() { + return snapshotSchema; + } + + /** + * List of regular expression defining the data collection to snapshot + * + */ + public List getDataCollections() { + return dataCollections; + } + + /** + * Map of query statement overrides by data collection + */ + public Map getFilterQueries() { + return filterQueries; + } + + /** + * Whether to skip the snapshot phase. + * + * By default, this method will skip performing a snapshot if both {@link #snapshotSchema()} and + * {@link #snapshotData()} return {@code false}. + */ + public boolean shouldSkipSnapshot() { + return !snapshotSchema() && !snapshotData(); + } + + @Override + public String toString() { + return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + "]"; + } +} diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java index c797d8e40..8a2d628b6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java @@ -38,6 +38,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.signal.SignalPayload; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.source.spi.DataChangeEventListener; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.spi.ChangeRecordEmitter; @@ -489,8 +490,7 @@ private void nextDataCollection(P partition, OffsetContext offsetContext) { @Override @SuppressWarnings("unchecked") - public void addDataCollectionNamesToSnapshot(SignalPayload

signalPayload, List dataCollectionIds, - Optional additionalCondition, Optional surrogateKey) + public void addDataCollectionNamesToSnapshot(SignalPayload

signalPayload, SnapshotConfiguration snapshotConfiguration) throws InterruptedException { final OffsetContext offsetContext = signalPayload.offsetContext; @@ -500,13 +500,14 @@ public void addDataCollectionNamesToSnapshot(SignalPayload

signalPayload, Lis context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); boolean shouldReadChunk = !context.snapshotRunning(); - List expandedDataCollectionIds = expandDataCollectionIds(dataCollectionIds); - if (expandedDataCollectionIds.size() > dataCollectionIds.size()) { - LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", dataCollectionIds, expandedDataCollectionIds); + List expandedDataCollectionIds = expandDataCollectionIds(snapshotConfiguration.getDataCollections()); + if (expandedDataCollectionIds.size() > snapshotConfiguration.getDataCollections().size()) { + LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", snapshotConfiguration.getDataCollections(), expandedDataCollectionIds); } - final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, expandedDataCollectionIds, additionalCondition, - surrogateKey); + final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, expandedDataCollectionIds, + snapshotConfiguration.getAdditionalConditions(), + snapshotConfiguration.getSurrogateKey()); if (shouldReadChunk) { List monitoredDataCollections = newDataCollectionIds.stream() @@ -523,6 +524,7 @@ public void addDataCollectionNamesToSnapshot(SignalPayload

signalPayload, Lis @Override @SuppressWarnings("unchecked") public void stopSnapshot(P partition, OffsetContext offsetContext, Map additionalData, List dataCollectionIds) { + context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); LOGGER.trace("Stopping incremental snapshot with context {}", context); if (context.snapshotRunning()) { @@ -604,6 +606,7 @@ protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String p * all matching explicit data collection ids. */ private List expandDataCollectionIds(List dataCollectionIds) { + return dataCollectionIds .stream() .flatMap(x -> { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java index 441c899f9..e2fb25620 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java @@ -20,6 +20,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -31,6 +32,7 @@ import io.debezium.DebeziumException; import io.debezium.annotation.NotThreadSafe; +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; import io.debezium.relational.Table; import io.debezium.relational.TableId; import io.debezium.util.HexConverter; @@ -189,9 +191,8 @@ private String dataCollectionsToSnapshotAsString() { LinkedHashMap map = new LinkedHashMap<>(); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString()); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION, - x.getAdditionalCondition().orElse(null)); - map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, - x.getSurrogateKey().orElse(null)); + x.getAdditionalCondition().isEmpty() ? null : x.getAdditionalCondition().orElse(null)); + map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, x.getSurrogateKey().isEmpty() ? null : x.getSurrogateKey().orElse(null)); return map; }) .collect(Collectors.toList()); @@ -207,8 +208,8 @@ private List> stringToDataCollections(String dataCollectionsSt List> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef); List> dataCollectionsList = dataCollections.stream() .map(x -> new DataCollection((T) TableId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID), useCatalogBeforeSchema), - Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)), - Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY)))) + Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)).orElse(""), + Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY)).orElse(""))) .collect(Collectors.toList()); return dataCollectionsList; } @@ -237,16 +238,28 @@ private void addTablesIdsToSnapshot(List> dataCollectionIds) { } @SuppressWarnings("unchecked") - public List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, Optional additionalCondition, - Optional surrogateKey) { + public List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, List additionalCondition, + String surrogateKey) { + final List> newDataCollectionIds = dataCollectionIds.stream() - .map(x -> new DataCollection((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition, surrogateKey)) + .map(buildDataCollection(additionalCondition, surrogateKey)) .collect(Collectors.toList()); addTablesIdsToSnapshot(newDataCollectionIds); this.correlationId = correlationId; return newDataCollectionIds; } + private Function> buildDataCollection(List additionalCondition, String surrogateKey) { + return expandedCollectionName -> { + String filter = additionalCondition.stream() + .filter(condition -> condition.getDataCollection().matcher(expandedCollectionName).matches()) + .map(AdditionalCondition::getFilter) + .findFirst() + .orElse(""); + return new DataCollection((T) TableId.parse(expandedCollectionName, useCatalogBeforeSchema), filter, surrogateKey); + }; + } + @Override public void stopSnapshot() { this.dataCollectionsToSnapshot.clear(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java index e825c6ba4..6172cb34a 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java @@ -8,6 +8,8 @@ import java.util.Objects; import java.util.Optional; +import io.debezium.util.Strings; + /** * A class describing DataCollection for incremental snapshot * @@ -18,15 +20,15 @@ public class DataCollection { private final T id; - private final Optional additionalCondition; + private final String additionalCondition; - private final Optional surrogateKey; + private final String surrogateKey; public DataCollection(T id) { - this(id, Optional.empty(), Optional.empty()); + this(id, "", ""); } - public DataCollection(T id, Optional additionalCondition, Optional surrogateKey) { + public DataCollection(T id, String additionalCondition, String surrogateKey) { Objects.requireNonNull(additionalCondition); Objects.requireNonNull(surrogateKey); @@ -40,11 +42,11 @@ public T getId() { } public Optional getAdditionalCondition() { - return additionalCondition; + return Strings.isNullOrEmpty(additionalCondition) ? Optional.empty() : Optional.of(additionalCondition); } public Optional getSurrogateKey() { - return surrogateKey; + return Strings.isNullOrEmpty(surrogateKey) ? Optional.empty() : Optional.of(surrogateKey); } @Override diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java index 7999d1764..ebdfbeb9e 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java @@ -7,9 +7,9 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import io.debezium.pipeline.signal.SignalPayload; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; import io.debezium.spi.schema.DataCollectionId; @@ -33,8 +33,7 @@ public interface IncrementalSnapshotChangeEventSource

signalPayload, List dataCollectionIds, - Optional additionalCondition, Optional surrogateKey) + void addDataCollectionNamesToSnapshot(SignalPayload

signalPayload, SnapshotConfiguration snapshotConfiguration) throws InterruptedException; void stopSnapshot(P partition, OffsetContext offsetContext, Map additionalData, List dataCollectionIds); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java index 653bef3f9..7d335af7f 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Optional; +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; import io.debezium.relational.Table; public interface IncrementalSnapshotContext { @@ -17,8 +18,8 @@ public interface IncrementalSnapshotContext { DataCollection nextDataCollection(); - List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, Optional additionalCondition, - Optional surrogateKey); + List> addDataCollectionNamesToSnapshot(String correlationId, List dataCollectionIds, List additionalCondition, + String surrogateKey); int dataCollectionsToBeSnapshottedCount(); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java index 75a8f0e18..641f67dc2 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/spi/SnapshotChangeEventSource.java @@ -5,6 +5,12 @@ */ package io.debezium.pipeline.source.spi; +import java.util.Map; +import java.util.stream.Collectors; + +import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition; +import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.spi.OffsetContext; import io.debezium.pipeline.spi.Partition; import io.debezium.pipeline.spi.SnapshotResult; @@ -22,15 +28,28 @@ public interface SnapshotChangeEventSource

execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException; + SnapshotResult execute(ChangeEventSourceContext context, P partition, O previousOffset, SnapshottingTask snapshottingTask) throws InterruptedException; + + /** + * Returns the snapshotting task based on the previous offset (if available) and the connector's snapshotting mode. + */ + SnapshottingTask getSnapshottingTask(P partition, O previousOffset); + + /** + * Returns the blocking snapshotting task based on the snapshot configuration from the signal. + */ + default SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration) { + + Map filtersByTable = snapshotConfiguration.getAdditionalConditions().stream() + .collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter)); + + return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable); + } } diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java index a6ab386f0..a13aed716 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalDatabaseConnectorConfig.java @@ -17,6 +17,8 @@ import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.ConfigDefinition; @@ -40,6 +42,7 @@ import io.debezium.schema.FieldNameSelector; import io.debezium.schema.FieldNameSelector.FieldNamer; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Strings; @@ -50,6 +53,8 @@ */ public abstract class RelationalDatabaseConnectorConfig extends CommonConnectorConfig { + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalDatabaseConnectorConfig.class); + protected static final String SCHEMA_INCLUDE_LIST_NAME = "schema.include.list"; protected static final String SCHEMA_EXCLUDE_LIST_NAME = "schema.exclude.list"; protected static final String DATABASE_INCLUDE_LIST_NAME = "database.include.list"; @@ -734,7 +739,8 @@ private static int validateTableExcludeList(Configuration config, Field field, V /** * Returns any SELECT overrides, if present. */ - public Map getSnapshotSelectOverridesByTable() { + public Map getSnapshotSelectOverridesByTable() { + List tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ","); if (tableValues == null) { @@ -744,9 +750,18 @@ public Map getSnapshotSelectOverridesByTable() { Map snapshotSelectOverridesByTable = new HashMap<>(); for (String table : tableValues) { + + String statementOverride = getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table); + if (statementOverride == null) { + LOGGER.warn("Detected snapshot.select.statement.overrides for {} but no statement property {} defined", + SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table, table); + continue; + } + snapshotSelectOverridesByTable.put( TableId.parse(table), getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table)); + } return Collections.unmodifiableMap(snapshotSelectOverridesByTable); diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index c192ba646..146ba5b08 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -46,6 +46,7 @@ import io.debezium.pipeline.EventDispatcher.SnapshotReceiver; import io.debezium.pipeline.notification.NotificationService; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; +import io.debezium.pipeline.source.SnapshottingTask; import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; @@ -55,6 +56,7 @@ import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder; import io.debezium.schema.SchemaChangeEvent; +import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.ColumnUtils; import io.debezium.util.Strings; @@ -109,6 +111,11 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO Throwable exceptionWhileSnapshot = null; try { + Set dataCollectionsToBeSnapshotted = getDataCollectionPattern(snapshottingTask.getDataCollections()); + + Map snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries().entrySet().stream() + .collect(Collectors.toMap(e -> TableId.parse(e.getKey()), Map.Entry::getValue)); + preSnapshot(); LOGGER.info("Snapshot step 1 - Preparing"); @@ -124,7 +131,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO // Note that there's a minor race condition here: a new table matching the filters could be created between // this call and the determination of the initial snapshot position below; this seems acceptable, though - determineCapturedTables(ctx); + determineCapturedTables(ctx, dataCollectionsToBeSnapshotted); snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, ctx.capturedTables); // Init jdbc connection pool for reading table schema and data connectionPool = createConnectionPool(ctx); @@ -155,7 +162,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO if (snapshottingTask.snapshotData()) { LOGGER.info("Snapshot step 7 - Snapshotting data"); - createDataEvents(context, ctx, connectionPool); + createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable); } else { LOGGER.info("Snapshot step 7 - Skipping snapshotting of data"); @@ -268,9 +275,9 @@ private Set addSignalingCollectionAndSort(Set capturedTables) .collect(Collectors.toCollection(LinkedHashSet::new)); } - private void determineCapturedTables(RelationalSnapshotContext ctx) throws Exception { + private void determineCapturedTables(RelationalSnapshotContext ctx, Set dataCollectionsToBeSnapshotted) throws Exception { Set allTableIds = getAllTableIds(ctx); - Set snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds).collect(Collectors.toSet()); + Set snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet()); Set capturedTables = new HashSet<>(); Set capturedSchemaTables = new HashSet<>(); @@ -393,7 +400,7 @@ protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotConte private void createDataEvents(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext, - Queue connectionPool) + Queue connectionPool, Map snapshotSelectOverridesByTable) throws Exception { tryStartingSnapshot(snapshotContext); @@ -407,7 +414,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext, Map queryTables = new HashMap<>(); Map rowCountTables = new LinkedHashMap<>(); for (TableId tableId : snapshotContext.capturedTables) { - final Optional selectStatement = determineSnapshotSelect(snapshotContext, tableId); + final Optional selectStatement = determineSnapshotSelect(snapshotContext, tableId, snapshotSelectOverridesByTable); if (selectStatement.isPresent()) { LOGGER.info("For table '{}' using select statement: '{}'", tableId, selectStatement.get()); queryTables.put(tableId, selectStatement.get()); @@ -626,10 +633,12 @@ protected ChangeRecordEmitter

getChangeRecordEmitter(P partition, O offset, T * defaulting to a statement provided by the DB-specific change event source. * * @param tableId the table to generate a query for + * @param snapshotSelectOverridesByTable the select overrides by table * @return a valid query string or empty if table will not be snapshotted */ - private Optional determineSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { - String overriddenSelect = getSnapshotSelectOverridesByTable(tableId); + private Optional determineSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId, + Map snapshotSelectOverridesByTable) { + String overriddenSelect = getSnapshotSelectOverridesByTable(tableId, snapshotSelectOverridesByTable); if (overriddenSelect != null) { return Optional.of(enhanceOverriddenSelect(snapshotContext, overriddenSelect, tableId)); } @@ -639,8 +648,7 @@ private Optional determineSnapshotSelect(RelationalSnapshotContext return getSnapshotSelect(snapshotContext, tableId, columns); } - protected String getSnapshotSelectOverridesByTable(TableId tableId) { - Map snapshotSelectOverrides = connectorConfig.getSnapshotSelectOverridesByTable(); + protected String getSnapshotSelectOverridesByTable(TableId tableId, Map snapshotSelectOverrides) { String overriddenSelect = snapshotSelectOverrides.get(tableId); // try without catalog id, as this might or might not be populated based on the given connector diff --git a/debezium-core/src/main/java/io/debezium/util/Strings.java b/debezium-core/src/main/java/io/debezium/util/Strings.java index 97956bda0..e69dd0caf 100644 --- a/debezium-core/src/main/java/io/debezium/util/Strings.java +++ b/debezium-core/src/main/java/io/debezium/util/Strings.java @@ -1118,7 +1118,7 @@ private Strings() { * If a comma is part of expression then it can be prepended with '\' so * it will not act as a separator. */ - private static class RegExSplitter implements Tokenizer { + public static class RegExSplitter implements Tokenizer { public static String[] split(String identifier) { TokenStream stream = new TokenStream(identifier, new RegExSplitter(), true); diff --git a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedSnapshotChangeEventSourceTest.java b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedSnapshotChangeEventSourceTest.java index b32b19743..d5a1f3137 100644 --- a/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedSnapshotChangeEventSourceTest.java +++ b/debezium-core/src/test/java/io/debezium/pipeline/source/snapshot/incremental/SignalBasedSnapshotChangeEventSourceTest.java @@ -123,7 +123,7 @@ public void testBuildQueryTwoPkColumnsWithAdditionalConditionWithSurrogateKey() .addColumn(val1) .addColumn(val2) .setPrimaryKeyNames("pk1", "pk2").create(); - context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2")); + context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2"); assertThat(source.buildChunkQuery(table, Optional.of("\"val1\"=foo"))) .isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk2\" LIMIT 1024"); context.nextChunkPosition(new Object[]{ 1, 5 }); @@ -202,7 +202,7 @@ public void testBuildQueryTwoPkColumnsWithSurrogateKey() { .addColumn(val1) .addColumn(val2) .setPrimaryKeyNames("pk1", "pk2").create(); - context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2")); + context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2"); assertThat(source.buildChunkQuery(table, Optional.empty())) .isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" LIMIT 1024"); } @@ -250,7 +250,7 @@ public void testMaxQueryWithSurrogateKey() { final Column val2 = Column.editor().name("val2").create(); final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2) .addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create(); - context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2")); + context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2"); assertThat(source.buildMaxPrimaryKeyQuery(table, Optional.empty())) .isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" DESC LIMIT 1"); } diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index 2e54089cc..e19af2be6 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -602,6 +602,19 @@ protected SourceRecords consumeRecordsByTopic(int numRecords, int breakAfterNull return records; } + /** + * Try to consume and capture all available records from the connector. + * + * + * @return the collector into which the records were captured; never null + * @throws InterruptedException if the thread was interrupted while waiting for a record to be returned + */ + protected SourceRecords consumeAvailableRecordsByTopic() throws InterruptedException { + SourceRecords records = new SourceRecords(); + consumeAvailableRecords(records::add); + return records; + } + /** * Try to consume and capture exactly the specified number of records from the connector. * diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java index 09792f6d3..c24f5eb2b 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java @@ -12,7 +12,6 @@ import java.sql.SQLException; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; @@ -75,16 +74,17 @@ public void executeBlockingSnapshot() throws Exception { assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2); - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), BLOCKING, tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId()); waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); signalingRecords = 1; + assertRecordsFromSnapshotAndStreamingArePresent((ROW_COUNT * 2) + signalingRecords); - insertRecords(ROW_COUNT, (ROW_COUNT * 2)); + insertRecords(ROW_COUNT, ROW_COUNT * 2); - assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords); + assertStreamingRecordsArePresent(ROW_COUNT); } @@ -102,7 +102,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception { Thread.sleep(2000); // Let's start stream some insert - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), BLOCKING, tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId()); waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); @@ -114,11 +114,32 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception { insertRecords(ROW_COUNT, (ROW_COUNT * 2)); - signalingRecords = 1 + // from streaming - 1; // from snapshot + signalingRecords = 1; // from streaming assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), - getExpectedValues(totalSnapshotRecords)); + getExpectedValues(totalSnapshotRecords), topicName()); + } + + @Test + public void executeBlockingSnapshotWithAdditionalCondition() throws Exception { + // Testing.Print.enable(); + + populateTable(tableNames().get(1).toString()); + + startConnectorWithSnapshot(x -> mutableConfig(false, false)); + + waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); + + sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey( + Map.of(tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", tableNames().get(1))), "", BLOCKING, + tableDataCollectionIds().get(1).toString()); + + waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class); + + signalingRecords = 1; // from streaming + + assertRecordsWithValuesPresent(500 + signalingRecords, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString()); + } protected int insertMaxSleep() { @@ -180,21 +201,21 @@ private Future executeAsync(Runnable operation) { private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException { - assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList())); + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), topicName()); } private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException { - assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList())); + assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList()), topicName()); } - private void assertRecordsWithValuesPresent(int expectedRecords, List expectedValues) throws InterruptedException { + private void assertRecordsWithValuesPresent(int expectedRecords, List expectedValues, String topicName) throws InterruptedException { SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10); - assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords); - List actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream() + List actual = snapshotAndStreamingRecords.recordsForTopic(topicName).stream() .map(s -> ((Struct) s.value()).getStruct("after").getInt32(valueFieldName())) .collect(Collectors.toList()); + assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords); assertThat(actual).containsAll(expectedValues); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java index 59292a770..a51d74752 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotTest.java @@ -15,7 +15,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -709,7 +708,33 @@ public void snapshotWithAdditionalCondition() throws Exception { // there shouldn't be any snapshot records assertNoRecordsToConsume(); - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(), + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "", + tableDataCollectionId()); + + final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount, + x -> true, null); + assertEquals(expectedCount, dbChanges.size()); + assertTrue(dbChanges.values().stream().allMatch(v -> (((Struct) v.value()).getStruct("after") + .getInt32(valueFieldName())).equals(expectedValue))); + } + + @Test + public void snapshotWithNewAdditionalConditionsField() throws Exception { + // Testing.Print.enable(); + + int expectedCount = 10, expectedValue = 12345678; + populateTable(); + populateTableWithSpecificValue(2000, expectedCount, expectedValue); + waitForCdcTransactionPropagation(3); + final Configuration config = config().build(); + startAndConsumeTillEnd(connectorClass(), config); + waitForConnectorToStart(); + + waitForAvailableRecords(1, TimeUnit.SECONDS); + // there shouldn't be any snapshot records + assertNoRecordsToConsume(); + + sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionId(), String.format("aa = %s", expectedValue)), "", tableDataCollectionId()); final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount, @@ -728,7 +753,7 @@ public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Ex final int recordsCount = ROW_COUNT; - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of("\"\""), Optional.empty(), tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("\"\"", "", tableDataCollectionId()); final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(recordsCount, x -> true, null); @@ -751,8 +776,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception { // there shouldn't be any snapshot records assertNoRecordsToConsume(); - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(), - tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "", tableDataCollectionId()); final AtomicInteger recordCounter = new AtomicInteger(); final AtomicBoolean restarted = new AtomicBoolean(); @@ -778,7 +802,7 @@ public void snapshotWithSurrogateKey() throws Exception { populateTable(); startConnector(); - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.of("\"aa\""), tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "\"aa\"", tableDataCollectionId()); final int expectedRecordCount = ROW_COUNT; final Map dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount); @@ -803,8 +827,7 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception { // there shouldn't be any snapshot records assertNoRecordsToConsume(); - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.of("\"aa\""), - tableDataCollectionId()); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "\"aa\"", tableDataCollectionId()); final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount, x -> true, null); diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java index 12d75d531..159a2c0a0 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractSnapshotTest.java @@ -13,7 +13,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -29,6 +28,7 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal; +import io.debezium.util.Strings; public abstract class AbstractSnapshotTest extends AbstractConnectorTest { @@ -117,6 +117,12 @@ protected void populateTable() throws SQLException { } } + protected void populateTable(String table) throws SQLException { + try (JdbcConnection connection = databaseConnection()) { + populateTable(connection, table); + } + } + protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException { try (JdbcConnection connection = databaseConnection()) { populateTableWithSpecificValue(connection, tableName(), startRow, count, value); @@ -285,16 +291,22 @@ protected int getMaximumEnqueuedRecordCount() { } protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException { - sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds); + sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", dataCollectionIds); } - protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, + protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey, String... dataCollectionIds) { sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, dataCollectionIds); } - protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional additionalCondition, Optional surrogateKey, + protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map additionalConditions, String surrogateKey, + String... dataCollectionIds) { + sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(additionalConditions, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL, + dataCollectionIds); + } + + protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey, AbstractSnapshotSignal.SnapshotType snapshotType, String... dataCollectionIds) { final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) @@ -302,20 +314,20 @@ protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Op .collect(Collectors.joining(", ")); try (JdbcConnection connection = databaseConnection()) { String query; - if (additionalCondition.isPresent() && surrogateKey.isPresent()) { + if (!Strings.isNullOrEmpty(additionalCondition) && !Strings.isNullOrEmpty(surrogateKey)) { query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get()); + signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition, surrogateKey); } - else if (additionalCondition.isPresent()) { + else if (!Strings.isNullOrEmpty(additionalCondition)) { query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get()); + signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition); } - else if (surrogateKey.isPresent()) { + else if (!Strings.isNullOrEmpty(surrogateKey)) { query = String.format( "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", - signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get()); + signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey); } else { query = String.format( @@ -329,4 +341,47 @@ else if (surrogateKey.isPresent()) { logger.warn("Failed to send signal", e); } } + + protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map additionalConditions, String surrogateKey, + AbstractSnapshotSignal.SnapshotType snapshotType, + String... dataCollectionIds) { + final String dataCollectionIdsList = Arrays.stream(dataCollectionIds) + .map(x -> '"' + x + '"') + .collect(Collectors.joining(", ")); + try (JdbcConnection connection = databaseConnection()) { + String query; + if (!additionalConditions.isEmpty() && !Strings.isNullOrEmpty(surrogateKey)) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s], \"surrogate-key\": %s}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, buildAdditionalConditions(additionalConditions), surrogateKey); + } + else if (!additionalConditions.isEmpty()) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s]}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, buildAdditionalConditions(additionalConditions)); + } + else if (!Strings.isNullOrEmpty(surrogateKey)) { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey); + } + else { + query = String.format( + "INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')", + signalTableName(), snapshotType.toString(), dataCollectionIdsList); + } + logger.info("Sending signal with query {}", query); + connection.execute(query); + } + catch (Exception e) { + logger.warn("Failed to send signal", e); + } + } + + private static String buildAdditionalConditions(Map additionalConditions) { + + return additionalConditions.entrySet().stream() + .map(cond -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", cond.getKey(), cond.getValue())) + .collect(Collectors.joining(",")); + } }