DBZ-6731 Blocking snapshot takes configuration from signal payload

This commit is contained in:
mfvitale 2023-08-03 11:44:51 +02:00 committed by Jiri Pechanec
parent b0be5b4808
commit 542b3619fa
38 changed files with 757 additions and 245 deletions

View File

@ -14,7 +14,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -37,6 +36,7 @@
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow;
import io.debezium.pipeline.signal.actions.snapshotting.OpenIncrementalSnapshotWindow;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.DataCollection;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
@ -389,31 +389,31 @@ private Object[] readMaximumKey() throws InterruptedException {
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(SignalPayload<MongoDbPartition> signalPayload,
List<String> dataCollectionIds,
Optional<String> additionalCondition, Optional<String> surrogateKey)
SnapshotConfiguration snapshotConfiguration)
throws InterruptedException {
final MongoDbPartition partition = signalPayload.partition;
final OffsetContext offsetContext = signalPayload.offsetContext;
final String correlationId = signalPayload.id;
if (additionalCondition != null && additionalCondition.isPresent()) {
if (!snapshotConfiguration.getAdditionalConditions().isEmpty()) {
throw new UnsupportedOperationException("Additional condition not supported for MongoDB");
}
if (surrogateKey != null && surrogateKey.isPresent()) {
if (!Strings.isNullOrEmpty(snapshotConfiguration.getSurrogateKey())) {
throw new UnsupportedOperationException("Surrogate key not supported for MongoDB");
}
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
final boolean shouldReadChunk = !context.snapshotRunning();
final String rsName = replicaSets.all().get(0).replicaSetName();
dataCollectionIds = dataCollectionIds
List<String> dataCollectionIds = snapshotConfiguration.getDataCollections()
.stream()
.map(x -> rsName + "." + x)
.map(x -> rsName + "." + x.toString())
.collect(Collectors.toList());
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, Optional.empty(),
Optional.empty());
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, dataCollectionIds, List.of(), "");
if (shouldReadChunk) {
progressListener.snapshotStarted(partition);
@ -442,10 +442,11 @@ public void notifyReplicaSets(ReplicaSetNotifier<CollectionId> notifier, OffsetC
@Override
@SuppressWarnings("unchecked")
public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionIds) {
public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionPatterns) {
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
if (context.snapshotRunning()) {
if (dataCollectionIds == null || dataCollectionIds.isEmpty()) {
if (dataCollectionPatterns == null || dataCollectionPatterns.isEmpty()) {
LOGGER.info("Stopping incremental snapshot.");
try {
// This must be called prior to closeWindow to ensure that the correct state is set
@ -467,9 +468,10 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext
}
}
else {
LOGGER.info("Removing '{}' collections from incremental snapshot", dataCollectionIds);
LOGGER.info("Removing '{}' collections from incremental snapshot", dataCollectionPatterns);
final String rsName = replicaSets.all().get(0).replicaSetName();
dataCollectionIds = dataCollectionIds.stream().map(x -> rsName + "." + x).collect(Collectors.toList());
final List<String> dataCollectionIds = dataCollectionPatterns.stream().map(x -> rsName + "." + x.toString()).collect(Collectors.toList());
for (String dataCollectionId : dataCollectionIds) {
final CollectionId collectionId = CollectionId.parse(dataCollectionId);
if (currentCollection != null && currentCollection.id().equals(collectionId)) {
@ -489,11 +491,10 @@ public void stopSnapshot(MongoDbPartition partition, OffsetContext offsetContext
}
}
List<String> finalDataCollectionIds = dataCollectionIds;
notifyReplicaSets(
(incrementalSnapshotContext, replicaSetPartition, replicaSetOffsetContext) -> notificationService.incrementalSnapshotNotificationService()
.notifyAborted(incrementalSnapshotContext, replicaSetPartition, replicaSetOffsetContext,
finalDataCollectionIds),
dataCollectionIds),
offsetContext);
}
}

View File

@ -31,6 +31,7 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.pipeline.source.snapshot.incremental.DataCollection;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.relational.Table;
@ -185,7 +186,7 @@ private String dataCollectionsToSnapshotAsString() {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString());
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION,
x.getAdditionalCondition().orElse(null));
x.getAdditionalCondition().isEmpty() ? null : x.getAdditionalCondition().orElse(null));
return map;
})
.collect(Collectors.toList());
@ -230,8 +231,8 @@ private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
}
@SuppressWarnings("unchecked")
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, Optional<String> _additionalCondition,
Optional<String> surrogateKey) {
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, List<AdditionalCondition> additionalCondition,
String surrogateKey) {
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> new DataCollection<T>((T) CollectionId.parse(x)))
.filter(x -> x.getId() != null) // Remove collections with incorrectly formatted name

View File

@ -8,12 +8,16 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
@ -40,7 +44,10 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
@ -145,7 +152,7 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
executor.submit(() -> {
try {
taskContext.configureLoggingContext(replicaSet.replicaSetName());
snapshotReplicaSet(context, mongoDbSnapshotContext, replicaSet);
snapshotReplicaSet(context, mongoDbSnapshotContext, replicaSet, snapshottingTask);
}
catch (Throwable t) {
LOGGER.error("Snapshot for replica set {} failed", replicaSet.replicaSetName(), t);
@ -177,21 +184,28 @@ protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSourceContex
}
@Override
protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext offsetContext, boolean isBlockingSnapshot) {
public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext previousOffset, SnapshotConfiguration snapshotConfiguration) {
if (isBlockingSnapshot) {
return new MongoDbSnapshottingTask(replicaSets.all());
}
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));
return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable);
}
@Override
public SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbOffsetContext offsetContext) {
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
// If no snapshot should occur, return task with no replica sets
if (connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
LOGGER.info("According to the connector configuration, no snapshot will occur.");
return new MongoDbSnapshottingTask(Collections.emptyList());
return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of());
}
if (offsetContext == null) {
LOGGER.info("No previous offset has been found");
return new MongoDbSnapshottingTask(replicaSets.all());
return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
}
// Collect which replica-sets require being snapshotted
@ -199,7 +213,7 @@ protected SnapshottingTask getSnapshottingTask(MongoDbPartition partition, Mongo
.filter(replicaSet -> isSnapshotExpected(partition, replicaSet, offsetContext))
.collect(Collectors.toList());
return new MongoDbSnapshottingTask(replicaSetsToSnapshot);
return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
}
@Override
@ -207,9 +221,11 @@ protected SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoD
return new MongoDbSnapshotContext(partition);
}
private void snapshotReplicaSet(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet) throws InterruptedException {
private void snapshotReplicaSet(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet,
SnapshottingTask snapshottingTask)
throws InterruptedException {
try (MongoDbConnection mongo = connections.get(replicaSet, snapshotCtx.partition)) {
createDataEvents(sourceCtx, snapshotCtx, replicaSet, mongo);
createDataEvents(sourceCtx, snapshotCtx, replicaSet, mongo, snapshottingTask);
}
}
@ -281,13 +297,13 @@ private void initReplicaSetSnapshotStartOffsets(MongoDbSnapshotContext snapshotC
}
private void createDataEvents(ChangeEventSourceContext sourceCtx, MongoDbSnapshotContext snapshotCtx, ReplicaSet replicaSet,
MongoDbConnection mongo)
MongoDbConnection mongo, SnapshottingTask snapshottingTask)
throws InterruptedException {
initReplicaSetSnapshotStartOffsets(snapshotCtx, replicaSet, mongo);
SnapshotReceiver<MongoDbPartition> snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver();
snapshotCtx.offset.preSnapshotStart();
createDataEventsForReplicaSet(sourceCtx, snapshotCtx, snapshotReceiver, replicaSet, mongo);
createDataEventsForReplicaSet(sourceCtx, snapshotCtx, snapshotReceiver, replicaSet, mongo, snapshottingTask);
snapshotCtx.offset.preSnapshotCompletion();
snapshotReceiver.completeSnapshot();
@ -300,12 +316,12 @@ private void createDataEvents(ChangeEventSourceContext sourceCtx, MongoDbSnapsho
private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, MongoDbConnection mongo)
ReplicaSet replicaSet, MongoDbConnection mongo, SnapshottingTask snapshottingTask)
throws InterruptedException {
final String rsName = replicaSet.replicaSetName();
final MongoDbOffsetContext offsetContext = (MongoDbOffsetContext) snapshotContext.offset;
final MongoDbOffsetContext offsetContext = snapshotContext.offset;
final ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
snapshotContext.lastCollection = false;
@ -313,7 +329,10 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
LOGGER.info("Beginning snapshot of '{}' at {}", rsName, rsOffsetContext.getOffset());
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(mongo.collections()).collect(Collectors.toList());
Set<Pattern> dataCollectionPattern = getDataCollectionPattern(snapshottingTask.getDataCollections());
final List<CollectionId> collections = determineDataCollectionsToBeSnapshotted(mongo.collections(), dataCollectionPattern)
.collect(Collectors.toList());
snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, collections);
if (connectorConfig.getSnapshotMaxThreads() > 1) {
// Since multiple snapshot threads are to be used, create a thread pool and initiate the snapshot.
@ -351,7 +370,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
snapshotReceiver,
replicaSet,
id,
mongo);
mongo, snapshottingTask.getFilterQueries());
}
}
catch (InterruptedException e) {
@ -396,7 +415,7 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
snapshotReceiver,
replicaSet,
collectionId,
mongo);
mongo, snapshottingTask.getFilterQueries());
}
}
@ -406,7 +425,8 @@ private void createDataEventsForReplicaSet(ChangeEventSourceContext sourceContex
private void createDataEventsForCollection(ChangeEventSourceContext sourceContext,
MongoDbSnapshotContext snapshotContext,
SnapshotReceiver<MongoDbPartition> snapshotReceiver,
ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo)
ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongo,
Map<String, String> snapshotFilterQueryForCollection)
throws InterruptedException {
long exportStart = clock.currentTimeInMillis();
@ -419,7 +439,8 @@ private void createDataEventsForCollection(ChangeEventSourceContext sourceContex
final int batchSize = taskContext.getConnectorConfig().getSnapshotFetchSize();
long docs = 0;
Bson filterQuery = Document.parse(connectorConfig.getSnapshotFilterQueryForCollection(collectionId).orElseGet(() -> "{}"));
Optional<String> snapshotFilterForCollectionId = Optional.ofNullable(snapshotFilterQueryForCollection.get(collectionId.dbName() + "." + collectionId.name()));
Bson filterQuery = Document.parse(snapshotFilterForCollectionId.orElse("{}"));
try (MongoCursor<BsonDocument> cursor = collection.find(filterQuery).batchSize(batchSize).iterator()) {
snapshotContext.lastRecordInCollection = false;
@ -474,14 +495,14 @@ private Clock getClock() {
/**
* A configuration describing the task to be performed during snapshotting.
*
* @see AbstractSnapshotChangeEventSource.SnapshottingTask
* @see SnapshottingTask
*/
public static class MongoDbSnapshottingTask extends SnapshottingTask {
private final List<ReplicaSet> replicaSetsToSnapshot;
public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot) {
super(false, !replicaSetsToSnapshot.isEmpty());
public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot, List<String> dataCollections, Map<String, String> filterQueries) {
super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries);
this.replicaSetsToSnapshot = replicaSetsToSnapshot;
}

View File

@ -50,9 +50,13 @@ public class BlockingSnapshotIT extends AbstractMongoConnectorIT {
private static final String DATABASE_NAME = "dbA";
private static final String COLLECTION_NAME = "c1";
private static final String COLLECTION2_NAME = "c2";
private static final String SIGNAL_COLLECTION_NAME = DATABASE_NAME + ".signals";
private static final String FULL_COLLECTION_NAME = DATABASE_NAME + "." + COLLECTION_NAME;
private static final String FULL_COLLECTION2_NAME = DATABASE_NAME + "." + COLLECTION2_NAME;
private static final String DOCUMENT_ID = "_id";
@Before
@ -82,7 +86,7 @@ public void executeBlockingSnapshot() throws Exception {
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2);
sendAdHocBlockingSnapshotSignal(fullDataCollectionName());
sendAdHocBlockingSnapshotSignal("[A-z].*" + fullDataCollectionName());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
@ -90,9 +94,7 @@ public void executeBlockingSnapshot() throws Exception {
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
int signalingRecords = 1;
assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords);
assertStreamingRecordsArePresent(ROW_COUNT);
}
@ -110,7 +112,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
Thread.sleep(2000); // Let's start stream some insert
sendAdHocBlockingSnapshotSignal(fullDataCollectionName());
sendAdHocBlockingSnapshotSignal("[A-z].*" + fullDataCollectionName());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
@ -122,11 +124,32 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
int signalingRecords = 1 + // from streaming
1; // from snapshot
int signalingRecords = 1; // from streaming
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords),
getExpectedValues(totalSnapshotRecords));
getExpectedValues(totalSnapshotRecords), topicName());
}
@Test
public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
// Testing.Print.enable();
populateDataCollection(dataCollectionNames().get(1).toString());
startConnector(Function.identity());
waitForStreamingRunning("mongodb", "mongo1", getStreamingNamespace(), "0");
sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(
Map.of(fullDataCollectionNames().get(1), "{ aa: { $lt: 500 } }"),
"[A-z].*" + fullDataCollectionNames().get(1));
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
int signalingRecords = 1; // from streaming
assertRecordsWithValuesPresent(500 + signalingRecords, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1));
}
protected Class<MongoDbConnector> connectorClass() {
@ -141,10 +164,11 @@ protected Configuration.Builder config() {
return TestHelper.getConfiguration(mongo)
.edit()
.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME)
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, fullDataCollectionName())
.with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, String.join(",", fullDataCollectionNames()))
.with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME)
.with(MongoDbConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(MongoDbConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*dbA.c1")
.with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL);
}
@ -152,14 +176,26 @@ protected String dataCollectionName() {
return COLLECTION_NAME;
}
protected List<String> dataCollectionNames() {
return List.of(COLLECTION_NAME, COLLECTION2_NAME);
}
protected String fullDataCollectionName() {
return FULL_COLLECTION_NAME;
}
protected List<String> fullDataCollectionNames() {
return List.of(FULL_COLLECTION_NAME, FULL_COLLECTION2_NAME);
}
protected String topicName() {
return "mongo1" + "." + fullDataCollectionName();
}
protected List<String> topicNames() {
return fullDataCollectionNames().stream().map(x -> "mongo1." + x).collect(Collectors.toList());
}
protected void populateDataCollection(String dataCollectionName) {
final Document[] documents = new Document[ROW_COUNT];
for (int i = 0; i < ROW_COUNT; i++) {
@ -230,19 +266,19 @@ private Future<?> executeAsync(Runnable operation) {
private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException {
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()));
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), topicName());
}
private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException {
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 2).boxed().collect(Collectors.toList()));
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 2).boxed().collect(Collectors.toList()), topicName());
}
private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues) throws InterruptedException {
private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues, String topicName) throws InterruptedException {
SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10);
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName).stream()
.map(record -> extractFieldValue(record, "aa"))
.collect(Collectors.toList());
assertThat(actual).containsAll(expectedValues);
@ -301,6 +337,19 @@ protected void sendAdHocBlockingSnapshotSignal(String... dataCollectionIds) {
Document.parse("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList + "]}}"));
}
protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String... dataCollectionIds) {
final String conditions = additionalConditions.entrySet().stream()
.map(e -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", e.getKey(), e.getValue())).collect(
Collectors.joining(","));
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> "\"" + x + "\"")
.collect(Collectors.joining(", "));
insertDocuments("dbA", "signals",
Document.parse("{\"type\": \"execute-snapshot\", \"payload\": {\"type\": \"BLOCKING\",\"data-collections\": [" + dataCollectionIdsList
+ "], \"additional-conditions\": [" + conditions + "]}}"));
}
protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
startConnector(custConfig, loggingCompletion());
}

View File

@ -58,7 +58,6 @@ public class NotificationsIT extends AbstractMongoConnectorIT {
private static final String DATABASE_NAME = "dbA";
private static final String COLLECTION_NAME = "c1";
private static final String SIGNAL_COLLECTION_NAME = DATABASE_NAME + ".signals";
private static final String FULL_COLLECTION_NAME = DATABASE_NAME + "." + COLLECTION_NAME;
@Before

View File

@ -8,7 +8,6 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
@ -19,6 +18,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
@ -221,13 +221,12 @@ protected void sendEvent(MySqlPartition partition, EventDispatcher<MySqlPartitio
@Override
public void addDataCollectionNamesToSnapshot(SignalPayload<MySqlPartition> signalPayload,
List<String> dataCollectionIds,
Optional<String> additionalCondition, Optional<String> surrogateKey)
SnapshotConfiguration snapshotConfiguration)
throws InterruptedException {
final Map<String, Object> additionalData = signalPayload.additionalData;
super.addDataCollectionNamesToSnapshot(signalPayload, dataCollectionIds, additionalCondition, surrogateKey);
super.addDataCollectionNamesToSnapshot(signalPayload, snapshotConfiguration);
getContext().setSignalOffset((Long) additionalData.get(KafkaSignalChannel.CHANNEL_OFFSET));
}

View File

@ -44,6 +44,7 @@
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.Table;
@ -86,28 +87,30 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, Main
}
@Override
protected SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset, boolean isBlockingSnapshot) {
public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) {
if (isBlockingSnapshot) {
return new SnapshottingTask(true, true);
}
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false);
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
}
LOGGER.info("No previous offset has been found");
if (connectorConfig.getSnapshotMode().includeData()) {
if (this.connectorConfig.getSnapshotMode().includeData()) {
LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
}
else {
LOGGER.info("According to the connector configuration only schema will be snapshotted");
}
return new SnapshottingTask(connectorConfig.getSnapshotMode().includeSchema(), connectorConfig.getSnapshotMode().includeData());
return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeSchema(), this.connectorConfig.getSnapshotMode().includeData(),
dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable);
}
@Override
@ -560,7 +563,7 @@ private String quote(TableId id) {
@Override
protected OptionalLong rowCountForTable(TableId tableId) {
if (getSnapshotSelectOverridesByTable(tableId) != null) {
if (getSnapshotSelectOverridesByTable(tableId, connectorConfig.getSnapshotSelectOverridesByTable()) != null) {
return super.rowCountForTable(tableId);
}
OptionalLong rowCount = connection.getEstimatedTableSize(tableId);

View File

@ -71,8 +71,8 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
.with(MySqlConnectorConfig.SNAPSHOT_MODE_TABLES, DATABASE.qualifiedTableName("a"))
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
}
@ -103,7 +103,7 @@ protected String topicName() {
@Override
protected List<String> topicNames() {
return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("c"));
return List.of(DATABASE.topicForTable("a"), DATABASE.topicForTable("b"));
}
@Override
@ -114,7 +114,7 @@ protected String tableName() {
@Override
protected List<String> tableNames() {
final String tableA = TableId.parse(DATABASE.qualifiedTableName("a")).toQuotedString('`');
final String tableB = TableId.parse(DATABASE.qualifiedTableName("c")).toQuotedString('`');
final String tableB = TableId.parse(DATABASE.qualifiedTableName("b")).toQuotedString('`');
return List.of(tableA, tableB);
}
@ -135,7 +135,7 @@ protected String tableDataCollectionId() {
@Override
protected List<String> tableDataCollectionIds() {
return List.of(tableNameId().toString(), tableNameId("c").toString());
return List.of(tableNameId().toString(), tableNameId("b").toString());
}
private TableId tableNameId() {

View File

@ -7,6 +7,11 @@ CREATE TABLE a (
aa INTEGER
) AUTO_INCREMENT = 1;
CREATE TABLE b (
pk INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
aa INTEGER
) AUTO_INCREMENT = 1;
CREATE TABLE debezium_signal (
id varchar(64),
type varchar(32),

View File

@ -11,6 +11,7 @@
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
@ -25,6 +26,7 @@
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
@ -59,18 +61,18 @@ public OracleSnapshotChangeEventSource(OracleConnectorConfig connectorConfig, Ma
}
@Override
protected SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset, boolean isBlockingSnapshot) {
public SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData;
if (isBlockingSnapshot) {
return new SnapshottingTask(true, true);
}
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
if (OracleConnectorConfig.SnapshotMode.ALWAYS == connectorConfig.getSnapshotMode()) {
if (OracleConnectorConfig.SnapshotMode.ALWAYS == this.connectorConfig.getSnapshotMode()) {
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
snapshotData = connectorConfig.getSnapshotMode().includeData();
snapshotData = this.connectorConfig.getSnapshotMode().includeData();
}
// found a previous offset and the earlier snapshot has completed
else if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
@ -80,7 +82,7 @@ else if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
}
else {
LOGGER.info("No previous offset has been found.");
snapshotData = connectorConfig.getSnapshotMode().includeData();
snapshotData = this.connectorConfig.getSnapshotMode().includeData();
}
if (snapshotData && snapshotSchema) {
@ -90,7 +92,7 @@ else if (snapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
}
@Override

View File

@ -33,9 +33,13 @@ public void before() throws Exception {
connection = TestHelper.testConnection();
TestHelper.dropTable(connection, "a");
TestHelper.dropTable(connection, "b");
connection.execute("CREATE TABLE a (pk numeric(9,0) primary key, aa numeric(9,0))");
connection.execute("CREATE TABLE b (pk numeric(9,0) primary key, aa numeric(9,0))");
connection.execute("GRANT INSERT on a to " + TestHelper.getConnectorUserName());
connection.execute("GRANT INSERT on b to " + TestHelper.getConnectorUserName());
TestHelper.streamTable(connection, "a");
TestHelper.streamTable(connection, "b");
TestHelper.dropTable(connection, "debezium_signal");
connection.execute("CREATE TABLE debezium_signal (id varchar2(64), type varchar2(32), data varchar2(2048))");
@ -52,6 +56,7 @@ public void after() throws Exception {
stopConnector();
if (connection != null) {
TestHelper.dropTable(connection, "a");
TestHelper.dropTable(connection, "b");
TestHelper.dropTable(connection, "debezium_signal");
connection.close();
}
@ -85,7 +90,7 @@ protected String topicName() {
@Override
protected List<String> topicNames() {
return List.of("server1.DEBEZIUM.A");
return List.of("server1.DEBEZIUM.A", "server1.DEBEZIUM.B");
}
@Override
@ -95,7 +100,7 @@ protected String tableName() {
@Override
protected List<String> tableNames() {
return List.of("DEBEZIUM.A");
return List.of("DEBEZIUM.A", "DEBEZIUM.B");
}
@Override
@ -105,7 +110,7 @@ protected String tableDataCollectionId() {
@Override
protected List<String> tableDataCollectionIds() {
return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A");
return List.of(TestHelper.getDatabaseName() + ".DEBEZIUM.A", TestHelper.getDatabaseName() + ".DEBEZIUM.B");
}
@Override
@ -118,7 +123,8 @@ protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL)
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A")
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A")
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}

View File

@ -8,6 +8,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -24,6 +25,7 @@
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
@ -57,13 +59,13 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig
}
@Override
protected SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset, boolean isBlockingSnapshot) {
public SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
if (isBlockingSnapshot) {
return new SnapshottingTask(true, true);
}
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
snapshotData = snapshotter.shouldSnapshot();
if (snapshotData) {
@ -74,7 +76,7 @@ protected SnapshottingTask getSnapshottingTask(PostgresPartition partition, Post
snapshotSchema = false;
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
}
@Override
@ -107,7 +109,7 @@ protected Set<TableId> getAllTableIds(RelationalSnapshotContext<PostgresPartitio
@Override
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext)
throws SQLException, InterruptedException {
throws SQLException {
final Duration lockTimeout = connectorConfig.snapshotLockTimeout();
final Optional<String> lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables);

View File

@ -24,7 +24,9 @@ public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
private static final String TOPIC_NAME = "test_server.s1.a";
private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;" +
"CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE SCHEMA s1;" +
"CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));" +
"CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048))";
@Before
@ -49,7 +51,7 @@ public void after() {
protected Configuration.Builder config() {
return TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal")
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
@ -69,7 +71,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a");
.with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a");
}
@Override

View File

@ -33,6 +33,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Strings;
/**
@ -488,7 +489,8 @@ public String getConnectorName() {
}
@Override
public Map<TableId, String> getSnapshotSelectOverridesByTable() {
public Map<DataCollectionId, String> getSnapshotSelectOverridesByTable() {
List<String> tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ",");
if (tableValues == null) {
@ -498,9 +500,18 @@ public Map<TableId, String> getSnapshotSelectOverridesByTable() {
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableValues) {
String statementOverride = getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
if (statementOverride == null) {
LOGGER.warn("Detected snapshot.select.statement.overrides for {} but no statement property {} defined",
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table, table);
continue;
}
snapshotSelectOverridesByTable.put(
TableId.parse(table, new SqlServerTableIdPredicates()),
getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table));
}
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);

View File

@ -25,6 +25,7 @@
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.relational.Column;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
@ -58,13 +59,13 @@ public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConf
}
@Override
protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, SqlServerOffsetContext previousOffset, boolean isBlockingSnapshot) {
public SnapshottingTask getSnapshottingTask(SqlServerPartition partition, SqlServerOffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
if (isBlockingSnapshot) {
return new SnapshottingTask(true, true);
}
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
@ -74,16 +75,16 @@ protected SnapshottingTask getSnapshottingTask(SqlServerPartition partition, Sql
}
else {
LOGGER.info("No previous offset has been found");
if (connectorConfig.getSnapshotMode().includeData()) {
if (this.connectorConfig.getSnapshotMode().includeData()) {
LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
}
else {
LOGGER.info("According to the connector configuration only schema will be snapshotted");
}
snapshotData = connectorConfig.getSnapshotMode().includeData();
snapshotData = this.connectorConfig.getSnapshotMode().includeData();
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
}
@Override

View File

@ -31,6 +31,7 @@ public void before() throws SQLException {
connection = TestHelper.testConnection();
connection.execute(
"CREATE TABLE a (pk int primary key, aa int)",
"CREATE TABLE b (pk int primary key, aa int)",
"CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))");
TestHelper.enableTableCdc(connection, "debezium_signal");
TestHelper.adjustCdcPollingInterval(connection, POLLING_INTERVAL);
@ -69,7 +70,7 @@ protected String topicName() {
@Override
protected List<String> topicNames() {
return List.of("server1.testDB1.dbo.a");
return List.of("server1.testDB1.dbo.a", "server1.testDB1.dbo.b");
}
@Override
@ -79,7 +80,7 @@ protected String tableName() {
@Override
protected List<String> tableNames() {
return List.of("testDB1.dbo.a");
return List.of("testDB1.dbo.a", "testDB1.dbo.b");
}
@Override
@ -106,7 +107,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
return TestHelper.defaultConfig()
.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL)
.with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal")
.with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
.with(SqlServerConnectorConfig.SNAPSHOT_MODE_TABLES, tableName())
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
}

View File

@ -17,7 +17,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@ -998,10 +997,10 @@ public EnumSet<Envelope.Operation> getSkippedOperations() {
return skippedOperations;
}
public Set<Pattern> getDataCollectionsToBeSnapshotted() {
public List<String> getDataCollectionsToBeSnapshotted() {
return Optional.ofNullable(config.getString(SNAPSHOT_MODE_TABLES))
.map(tables -> Strings.setOfRegex(tables, Pattern.CASE_INSENSITIVE))
.orElseGet(Collections::emptySet);
.map(expr -> Arrays.asList(Strings.RegExSplitter.split(expr)))
.orElseGet(Collections::emptyList);
}
public Map<String, String> getCustomMetricTags() {

View File

@ -34,6 +34,8 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.actions.SignalActionProvider;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
@ -196,7 +198,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
}
public void doBlockingSnapshot(P partition, OffsetContext offsetContext) {
public void doBlockingSnapshot(P partition, OffsetContext offsetContext, SnapshotConfiguration snapshotConfiguration) {
blockingSnapshotExecutor.submit(() -> {
@ -211,7 +213,9 @@ public void doBlockingSnapshot(P partition, OffsetContext offsetContext) {
previousLogContext.set(taskContext.configureLoggingContext("snapshot"));
LOGGER.info("Starting snapshot");
SnapshotResult<O> snapshotResult = doSnapshot(snapshotSource, context, partition, (O) offsetContext);
SnapshottingTask snapshottingTask = snapshotSource.getBlockingSnapshottingTask(partition, (O) offsetContext, snapshotConfiguration);
SnapshotResult<O> snapshotResult = doSnapshot(snapshotSource, context, partition, (O) offsetContext, snapshottingTask);
if (running && snapshotResult.isCompletedOrSkipped()) {
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
@ -227,6 +231,16 @@ public void doBlockingSnapshot(P partition, OffsetContext offsetContext) {
protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotSource, ChangeEventSourceContext context, P partition, O previousOffset)
throws InterruptedException {
SnapshottingTask snapshottingTask = snapshotSource.getSnapshottingTask(partition, previousOffset);
return doSnapshot(snapshotSource, context, partition, previousOffset, snapshottingTask);
}
protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotSource, ChangeEventSourceContext context, P partition, O previousOffset,
SnapshottingTask snapshottingTask)
throws InterruptedException {
CatchUpStreamingResult catchUpStreamingResult = executeCatchUpStreaming(context, snapshotSource, partition, previousOffset);
if (catchUpStreamingResult.performedCatchUpStreaming) {
streamingConnected(false);
@ -235,7 +249,8 @@ protected SnapshotResult<O> doSnapshot(SnapshotChangeEventSource<P, O> snapshotS
commitOffsetLock.unlock();
}
eventDispatcher.setEventListener(snapshotMetrics);
SnapshotResult<O> snapshotResult = snapshotSource.execute(context, partition, previousOffset);
SnapshotResult<O> snapshotResult = snapshotSource.execute(context, partition, previousOffset, snapshottingTask);
LOGGER.info("Snapshot ended with {}", snapshotResult);
if (snapshotResult.getStatus() == SnapshotResultStatus.COMPLETED || schema.tableInformationComplete()) {

View File

@ -20,9 +20,13 @@ public abstract class AbstractSnapshotSignal<P extends Partition> implements Sig
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSnapshotSignal.class);
protected static final String FIELD_DATA_COLLECTIONS = "data-collections";
protected static final String FIELD_DATA_COLLECTION = "data-collection";
protected static final String FIELD_TYPE = "type";
@Deprecated
protected static final String FIELD_ADDITIONAL_CONDITION = "additional-condition";
protected static final String FIELD_ADDITIONAL_CONDITIONS = "additional-conditions";
protected static final String FIELD_SURROGATE_KEY = "surrogate-key";
protected static final String FIELD_FILTER = "filter";
public enum SnapshotType {
INCREMENTAL,

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.signal.actions.snapshotting;
import java.util.regex.Pattern;
/**
* Contains filtering information for snapshot
*/
public class AdditionalCondition {
/**
* Tha data collection to which the filter applies
*/
private Pattern dataCollection;
/**
* In case of an incremental snapshot specifies a condition based on the field(s) of the data collection(s).
* For the blocking snapshot specifies the query statement for the connector to run on the data collection when it takes a snapshot
*/
private String filter;
public Pattern getDataCollection() {
return dataCollection;
}
public String getFilter() {
return filter;
}
@Override
public String toString() {
return "AdditionalCondition{" +
"dataCollection=" + dataCollection +
", filter='" + filter + '\'' +
'}';
}
public static final class AdditionalConditionBuilder {
private Pattern dataCollection;
private String filter;
private AdditionalConditionBuilder() {
}
public static AdditionalConditionBuilder builder() {
return new AdditionalConditionBuilder();
}
public AdditionalConditionBuilder dataCollection(Pattern dataCollection) {
this.dataCollection = dataCollection;
return this;
}
public AdditionalConditionBuilder filter(String filter) {
this.filter = filter;
return this;
}
public AdditionalCondition build() {
AdditionalCondition additionalCondition = new AdditionalCondition();
additionalCondition.filter = this.filter;
additionalCondition.dataCollection = this.dataCollection;
return additionalCondition;
}
}
}

View File

@ -7,6 +7,7 @@
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -14,6 +15,7 @@
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.Value;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.signal.SignalPayload;
@ -36,6 +38,7 @@ public class ExecuteSnapshot<P extends Partition> extends AbstractSnapshotSignal
private static final Logger LOGGER = LoggerFactory.getLogger(ExecuteSnapshot.class);
public static final String NAME = "execute-snapshot";
private static final String MATCH_ALL_PATTERN = ".*";
private final EventDispatcher<P, ? extends DataCollectionId> dispatcher;
private final ChangeEventSourceCoordinator<P, ? extends OffsetContext> changeEventSourceCoordinator;
@ -47,29 +50,62 @@ public ExecuteSnapshot(EventDispatcher<P, ? extends DataCollectionId> dispatcher
@Override
public boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException {
final List<String> dataCollections = getDataCollections(signalPayload.data);
if (dataCollections == null) {
return false;
}
SnapshotType type = getSnapshotType(signalPayload.data);
Optional<String> additionalCondition = getAdditionalCondition(signalPayload.data);
List<AdditionalCondition> additionalConditions = getAdditionalConditions(signalPayload.data, type);
Optional<String> surrogateKey = getSurrogateKey(signalPayload.data);
LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}' and surrogate key '{}'", type, dataCollections,
additionalCondition.orElse("No condition passed"), surrogateKey.orElse("PK of table will be used"));
LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional conditions '{}' and surrogate key '{}'", type, dataCollections,
additionalConditions, surrogateKey.orElse("PK of table will be used"));
SnapshotConfiguration.Builder snapsthoConfigurationBuilder = SnapshotConfiguration.Builder.builder();
snapsthoConfigurationBuilder.dataCollections(dataCollections);
snapsthoConfigurationBuilder.surrogateKey(surrogateKey.orElse(""));
additionalConditions.forEach(snapsthoConfigurationBuilder::addCondition);
switch (type) {
case INCREMENTAL:
dispatcher.getIncrementalSnapshotChangeEventSource().addDataCollectionNamesToSnapshot(
signalPayload, dataCollections, additionalCondition, surrogateKey);
signalPayload, snapsthoConfigurationBuilder.build());
break;
case BLOCKING:
changeEventSourceCoordinator.doBlockingSnapshot(signalPayload.partition, signalPayload.offsetContext);
changeEventSourceCoordinator.doBlockingSnapshot(signalPayload.partition, signalPayload.offsetContext, snapsthoConfigurationBuilder.build());
break;
}
return true;
}
private List<AdditionalCondition> getAdditionalConditions(Document data, SnapshotType type) {
// TODO remove in 2.5 release
Optional<String> oldAdditionalConditionField = getAdditionalCondition(data);
if (oldAdditionalConditionField.isPresent() && type.equals(SnapshotType.INCREMENTAL)) {
return List.of(AdditionalCondition.AdditionalConditionBuilder.builder()
.dataCollection(Pattern.compile(MATCH_ALL_PATTERN, Pattern.CASE_INSENSITIVE))
.filter(oldAdditionalConditionField.orElse(""))
.build());
}
return Optional.ofNullable(data.getArray(FIELD_ADDITIONAL_CONDITIONS)).orElse(Array.create()).streamValues()
.map(this::buildAdditionalCondition)
.collect(Collectors.toList());
}
private AdditionalCondition buildAdditionalCondition(Value value) {
return AdditionalCondition.AdditionalConditionBuilder.builder()
.dataCollection(Pattern.compile(value.asDocument().getString(FIELD_DATA_COLLECTION), Pattern.CASE_INSENSITIVE))
.filter(value.asDocument().getString(FIELD_FILTER))
.build();
}
public static List<String> getDataCollections(Document data) {
final Array dataCollectionsArray = data.getArray(FIELD_DATA_COLLECTIONS);
if (dataCollectionsArray == null || dataCollectionsArray.isEmpty()) {
LOGGER.warn(
@ -82,6 +118,11 @@ public static List<String> getDataCollections(Document data) {
.collect(Collectors.toList());
}
/**
* TODO remove in 2.5 release
* @deprecated Use {getAdditionalConditions} instead.
*/
@Deprecated
public static Optional<String> getAdditionalCondition(Document data) {
String additionalCondition = data.getString(FIELD_ADDITIONAL_CONDITION);
return Strings.isNullOrBlank(additionalCondition) ? Optional.empty() : Optional.of(additionalCondition);

View File

@ -0,0 +1,71 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.signal.actions.snapshotting;
import java.util.ArrayList;
import java.util.List;
/**
* Contains information required for the snapshot
*/
public class SnapshotConfiguration {
/**
* this is a list of regular expressions
*/
private List<String> dataCollections;
private List<AdditionalCondition> additionalConditions;
private String surrogateKey;
public List<String> getDataCollections() {
return dataCollections;
}
public List<AdditionalCondition> getAdditionalConditions() {
return additionalConditions;
}
public String getSurrogateKey() {
return surrogateKey;
}
public static final class Builder {
private List<String> dataCollections;
private final List<AdditionalCondition> additionalConditions = new ArrayList<>();
private String surrogateKey;
private Builder() {
}
public static Builder builder() {
return new Builder();
}
public Builder dataCollections(List<String> dataCollections) {
this.dataCollections = dataCollections;
return this;
}
public Builder addCondition(AdditionalCondition additionalCondition) {
this.additionalConditions.add(additionalCondition);
return this;
}
public Builder surrogateKey(String surrogateKey) {
this.surrogateKey = surrogateKey;
return this;
}
public SnapshotConfiguration build() {
SnapshotConfiguration snapshotConfiguration = new SnapshotConfiguration();
snapshotConfiguration.surrogateKey = this.surrogateKey;
snapshotConfiguration.dataCollections = this.dataCollections;
snapshotConfiguration.additionalConditions = this.additionalConditions;
return snapshotConfiguration;
}
}
}

View File

@ -7,8 +7,10 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
@ -27,6 +29,7 @@
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
/**
@ -61,9 +64,7 @@ protected Offsets<P, OffsetContext> getOffsets(SnapshotContext<P, O> ctx, O prev
}
@Override
public SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException {
SnapshottingTask snapshottingTask = getSnapshottingTask(partition, previousOffset, context.isPaused());
public SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset, SnapshottingTask snapshottingTask) throws InterruptedException {
final SnapshotContext<P, O> ctx;
try {
@ -118,9 +119,9 @@ public SnapshotResult<O> execute(ChangeEventSourceContext context, P partition,
}
}
protected <T extends DataCollectionId> Stream<T> determineDataCollectionsToBeSnapshotted(final Collection<T> allDataCollections) {
final Set<Pattern> snapshotAllowedDataCollections = connectorConfig.getDataCollectionsToBeSnapshotted();
if (snapshotAllowedDataCollections.size() == 0) {
protected <T extends DataCollectionId> Stream<T> determineDataCollectionsToBeSnapshotted(final Collection<T> allDataCollections,
Set<Pattern> snapshotAllowedDataCollections) {
if (snapshotAllowedDataCollections.isEmpty()) {
return allDataCollections.stream();
}
else {
@ -172,7 +173,7 @@ protected abstract SnapshotResult<O> doExecute(ChangeEventSourceContext context,
/**
* Returns the snapshotting task based on the previous offset (if available) and the connector's snapshotting mode.
*/
protected abstract SnapshottingTask getSnapshottingTask(P partition, O previousOffset, boolean isBlockingSnapshot);
public abstract SnapshottingTask getSnapshottingTask(P partition, O previousOffset);
/**
* Prepares the taking of a snapshot and returns an initial {@link SnapshotContext}.
@ -206,6 +207,13 @@ protected void completed(SnapshotContext<P, O> snapshotContext) {
protected void aborted(SnapshotContext<P, O> snapshotContext) {
}
protected Set<Pattern> getDataCollectionPattern(List<String> dataCollections) {
return dataCollections.stream()
.map(tables -> Strings.setOfRegex(tables, Pattern.CASE_INSENSITIVE))
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
/**
* Mutable context which is populated in the course of snapshotting
*/
@ -222,46 +230,4 @@ public void close() throws Exception {
}
}
/**
* A configuration describing the task to be performed during snapshotting.
*/
public static class SnapshottingTask {
private final boolean snapshotSchema;
private final boolean snapshotData;
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData) {
this.snapshotSchema = snapshotSchema;
this.snapshotData = snapshotData;
}
/**
* Whether data (rows in captured tables) should be snapshotted.
*/
public boolean snapshotData() {
return snapshotData;
}
/**
* Whether the schema of captured tables should be snapshotted.
*/
public boolean snapshotSchema() {
return snapshotSchema;
}
/**
* Whether to skip the snapshot phase.
*
* By default this method will skip performing a snapshot if both {@link #snapshotSchema()} and
* {@link #snapshotData()} return {@code false}.
*/
public boolean shouldSkipSnapshot() {
return !snapshotSchema() && !snapshotData();
}
@Override
public String toString() {
return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + "]";
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.pipeline.source;
import java.util.List;
import java.util.Map;
/**
* A configuration describing the task to be performed during snapshotting.
*/
public class SnapshottingTask {
private final boolean snapshotSchema;
private final boolean snapshotData;
private final List<String> dataCollections;
private final Map<String, String> filterQueries;
public SnapshottingTask(boolean snapshotSchema, boolean snapshotData, List<String> dataCollections, Map<String, String> filterQueries) {
this.snapshotSchema = snapshotSchema;
this.snapshotData = snapshotData;
this.dataCollections = dataCollections;
this.filterQueries = filterQueries;
}
/**
* Whether data (rows in captured tables) should be snapshotted.
*/
public boolean snapshotData() {
return snapshotData;
}
/**
* Whether the schema of captured tables should be snapshotted.
*/
public boolean snapshotSchema() {
return snapshotSchema;
}
/**
* List of regular expression defining the data collection to snapshot
*
*/
public List<String> getDataCollections() {
return dataCollections;
}
/**
* Map of query statement overrides by data collection
*/
public Map<String, String> getFilterQueries() {
return filterQueries;
}
/**
* Whether to skip the snapshot phase.
*
* By default, this method will skip performing a snapshot if both {@link #snapshotSchema()} and
* {@link #snapshotData()} return {@code false}.
*/
public boolean shouldSkipSnapshot() {
return !snapshotSchema() && !snapshotData();
}
@Override
public String toString() {
return "SnapshottingTask [snapshotSchema=" + snapshotSchema + ", snapshotData=" + snapshotData + "]";
}
}

View File

@ -38,6 +38,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
@ -489,8 +490,7 @@ private void nextDataCollection(P partition, OffsetContext offsetContext) {
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, List<String> dataCollectionIds,
Optional<String> additionalCondition, Optional<String> surrogateKey)
public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, SnapshotConfiguration snapshotConfiguration)
throws InterruptedException {
final OffsetContext offsetContext = signalPayload.offsetContext;
@ -500,13 +500,14 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, Lis
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
boolean shouldReadChunk = !context.snapshotRunning();
List<String> expandedDataCollectionIds = expandDataCollectionIds(dataCollectionIds);
if (expandedDataCollectionIds.size() > dataCollectionIds.size()) {
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", dataCollectionIds, expandedDataCollectionIds);
List<String> expandedDataCollectionIds = expandDataCollectionIds(snapshotConfiguration.getDataCollections());
if (expandedDataCollectionIds.size() > snapshotConfiguration.getDataCollections().size()) {
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", snapshotConfiguration.getDataCollections(), expandedDataCollectionIds);
}
final List<DataCollection<T>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, expandedDataCollectionIds, additionalCondition,
surrogateKey);
final List<DataCollection<T>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(correlationId, expandedDataCollectionIds,
snapshotConfiguration.getAdditionalConditions(),
snapshotConfiguration.getSurrogateKey());
if (shouldReadChunk) {
List<T> monitoredDataCollections = newDataCollectionIds.stream()
@ -523,6 +524,7 @@ public void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, Lis
@Override
@SuppressWarnings("unchecked")
public void stopSnapshot(P partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionIds) {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
LOGGER.trace("Stopping incremental snapshot with context {}", context);
if (context.snapshotRunning()) {
@ -604,6 +606,7 @@ protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String p
* all matching explicit data collection ids.
*/
private List<String> expandDataCollectionIds(List<String> dataCollectionIds) {
return dataCollectionIds
.stream()
.flatMap(x -> {

View File

@ -20,6 +20,7 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -31,6 +32,7 @@
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.HexConverter;
@ -189,9 +191,8 @@ private String dataCollectionsToSnapshotAsString() {
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString());
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION,
x.getAdditionalCondition().orElse(null));
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY,
x.getSurrogateKey().orElse(null));
x.getAdditionalCondition().isEmpty() ? null : x.getAdditionalCondition().orElse(null));
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, x.getSurrogateKey().isEmpty() ? null : x.getSurrogateKey().orElse(null));
return map;
})
.collect(Collectors.toList());
@ -207,8 +208,8 @@ private List<DataCollection<T>> stringToDataCollections(String dataCollectionsSt
List<LinkedHashMap<String, String>> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef);
List<DataCollection<T>> dataCollectionsList = dataCollections.stream()
.map(x -> new DataCollection<T>((T) TableId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID), useCatalogBeforeSchema),
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)),
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY))))
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)).orElse(""),
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY)).orElse("")))
.collect(Collectors.toList());
return dataCollectionsList;
}
@ -237,16 +238,28 @@ private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
}
@SuppressWarnings("unchecked")
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, Optional<String> additionalCondition,
Optional<String> surrogateKey) {
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, List<AdditionalCondition> additionalCondition,
String surrogateKey) {
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> new DataCollection<T>((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition, surrogateKey))
.map(buildDataCollection(additionalCondition, surrogateKey))
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
this.correlationId = correlationId;
return newDataCollectionIds;
}
private Function<String, DataCollection<T>> buildDataCollection(List<AdditionalCondition> additionalCondition, String surrogateKey) {
return expandedCollectionName -> {
String filter = additionalCondition.stream()
.filter(condition -> condition.getDataCollection().matcher(expandedCollectionName).matches())
.map(AdditionalCondition::getFilter)
.findFirst()
.orElse("");
return new DataCollection<T>((T) TableId.parse(expandedCollectionName, useCatalogBeforeSchema), filter, surrogateKey);
};
}
@Override
public void stopSnapshot() {
this.dataCollectionsToSnapshot.clear();

View File

@ -8,6 +8,8 @@
import java.util.Objects;
import java.util.Optional;
import io.debezium.util.Strings;
/**
* A class describing DataCollection for incremental snapshot
*
@ -18,15 +20,15 @@ public class DataCollection<T> {
private final T id;
private final Optional<String> additionalCondition;
private final String additionalCondition;
private final Optional<String> surrogateKey;
private final String surrogateKey;
public DataCollection(T id) {
this(id, Optional.empty(), Optional.empty());
this(id, "", "");
}
public DataCollection(T id, Optional<String> additionalCondition, Optional<String> surrogateKey) {
public DataCollection(T id, String additionalCondition, String surrogateKey) {
Objects.requireNonNull(additionalCondition);
Objects.requireNonNull(surrogateKey);
@ -40,11 +42,11 @@ public T getId() {
}
public Optional<String> getAdditionalCondition() {
return additionalCondition;
return Strings.isNullOrEmpty(additionalCondition) ? Optional.empty() : Optional.of(additionalCondition);
}
public Optional<String> getSurrogateKey() {
return surrogateKey;
return Strings.isNullOrEmpty(surrogateKey) ? Optional.empty() : Optional.of(surrogateKey);
}
@Override

View File

@ -7,9 +7,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.spi.schema.DataCollectionId;
@ -33,8 +33,7 @@ public interface IncrementalSnapshotChangeEventSource<P extends Partition, T ext
void init(P partition, OffsetContext offsetContext);
void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, List<String> dataCollectionIds,
Optional<String> additionalCondition, Optional<String> surrogateKey)
void addDataCollectionNamesToSnapshot(SignalPayload<P> signalPayload, SnapshotConfiguration snapshotConfiguration)
throws InterruptedException;
void stopSnapshot(P partition, OffsetContext offsetContext, Map<String, Object> additionalData, List<String> dataCollectionIds);

View File

@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Optional;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.relational.Table;
public interface IncrementalSnapshotContext<T> {
@ -17,8 +18,8 @@ public interface IncrementalSnapshotContext<T> {
DataCollection<T> nextDataCollection();
List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, Optional<String> additionalCondition,
Optional<String> surrogateKey);
List<DataCollection<T>> addDataCollectionNamesToSnapshot(String correlationId, List<String> dataCollectionIds, List<AdditionalCondition> additionalCondition,
String surrogateKey);
int dataCollectionsToBeSnapshottedCount();

View File

@ -5,6 +5,12 @@
*/
package io.debezium.pipeline.source.spi;
import java.util.Map;
import java.util.stream.Collectors;
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
@ -22,15 +28,28 @@ public interface SnapshotChangeEventSource<P extends Partition, O extends Offset
* the case, they should abort their processing and perform any clean-up needed, such as rolling back pending
* transactions, releasing locks etc.
*
* @param context
* contextual information for this source's execution
* @param partition
* the source partition from which the snapshot should be taken
* @param previousOffset
* previous offset restored from Kafka
* @param context contextual information for this source's execution
* @param partition the source partition from which the snapshot should be taken
* @param previousOffset previous offset restored from Kafka
* @param snapshottingTask
* @return an indicator to the position at which the snapshot was taken
* @throws InterruptedException
* in case the snapshot was aborted before completion
* @throws InterruptedException in case the snapshot was aborted before completion
*/
SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset) throws InterruptedException;
SnapshotResult<O> execute(ChangeEventSourceContext context, P partition, O previousOffset, SnapshottingTask snapshottingTask) throws InterruptedException;
/**
* Returns the snapshotting task based on the previous offset (if available) and the connector's snapshotting mode.
*/
SnapshottingTask getSnapshottingTask(P partition, O previousOffset);
/**
* Returns the blocking snapshotting task based on the snapshot configuration from the signal.
*/
default SnapshottingTask getBlockingSnapshottingTask(P partition, O previousOffset, SnapshotConfiguration snapshotConfiguration) {
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));
return new SnapshottingTask(true, true, snapshotConfiguration.getDataCollections(), filtersByTable);
}
}

View File

@ -17,6 +17,8 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
@ -40,6 +42,7 @@
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.FieldNameSelector.FieldNamer;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Strings;
@ -50,6 +53,8 @@
*/
public abstract class RelationalDatabaseConnectorConfig extends CommonConnectorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RelationalDatabaseConnectorConfig.class);
protected static final String SCHEMA_INCLUDE_LIST_NAME = "schema.include.list";
protected static final String SCHEMA_EXCLUDE_LIST_NAME = "schema.exclude.list";
protected static final String DATABASE_INCLUDE_LIST_NAME = "database.include.list";
@ -734,7 +739,8 @@ private static int validateTableExcludeList(Configuration config, Field field, V
/**
* Returns any SELECT overrides, if present.
*/
public Map<TableId, String> getSnapshotSelectOverridesByTable() {
public Map<DataCollectionId, String> getSnapshotSelectOverridesByTable() {
List<String> tableValues = getConfig().getTrimmedStrings(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, ",");
if (tableValues == null) {
@ -744,9 +750,18 @@ public Map<TableId, String> getSnapshotSelectOverridesByTable() {
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableValues) {
String statementOverride = getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
if (statementOverride == null) {
LOGGER.warn("Detected snapshot.select.statement.overrides for {} but no statement property {} defined",
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table, table);
continue;
}
snapshotSelectOverridesByTable.put(
TableId.parse(table),
getConfig().getString(SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table));
}
return Collections.unmodifiableMap(snapshotSelectOverridesByTable);

View File

@ -46,6 +46,7 @@
import io.debezium.pipeline.EventDispatcher.SnapshotReceiver;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
@ -55,6 +56,7 @@
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
@ -109,6 +111,11 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
Throwable exceptionWhileSnapshot = null;
try {
Set<Pattern> dataCollectionsToBeSnapshotted = getDataCollectionPattern(snapshottingTask.getDataCollections());
Map<DataCollectionId, String> snapshotSelectOverridesByTable = snapshottingTask.getFilterQueries().entrySet().stream()
.collect(Collectors.toMap(e -> TableId.parse(e.getKey()), Map.Entry::getValue));
preSnapshot();
LOGGER.info("Snapshot step 1 - Preparing");
@ -124,7 +131,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
// Note that there's a minor race condition here: a new table matching the filters could be created between
// this call and the determination of the initial snapshot position below; this seems acceptable, though
determineCapturedTables(ctx);
determineCapturedTables(ctx, dataCollectionsToBeSnapshotted);
snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, ctx.capturedTables);
// Init jdbc connection pool for reading table schema and data
connectionPool = createConnectionPool(ctx);
@ -155,7 +162,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
if (snapshottingTask.snapshotData()) {
LOGGER.info("Snapshot step 7 - Snapshotting data");
createDataEvents(context, ctx, connectionPool);
createDataEvents(context, ctx, connectionPool, snapshotSelectOverridesByTable);
}
else {
LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");
@ -268,9 +275,9 @@ private Set<TableId> addSignalingCollectionAndSort(Set<TableId> capturedTables)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
private void determineCapturedTables(RelationalSnapshotContext<P, O> ctx) throws Exception {
private void determineCapturedTables(RelationalSnapshotContext<P, O> ctx, Set<Pattern> dataCollectionsToBeSnapshotted) throws Exception {
Set<TableId> allTableIds = getAllTableIds(ctx);
Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds).collect(Collectors.toSet());
Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet());
Set<TableId> capturedTables = new HashSet<>();
Set<TableId> capturedSchemaTables = new HashSet<>();
@ -393,7 +400,7 @@ protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotConte
private void createDataEvents(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<P, O> snapshotContext,
Queue<JdbcConnection> connectionPool)
Queue<JdbcConnection> connectionPool, Map<DataCollectionId, String> snapshotSelectOverridesByTable)
throws Exception {
tryStartingSnapshot(snapshotContext);
@ -407,7 +414,7 @@ private void createDataEvents(ChangeEventSourceContext sourceContext,
Map<TableId, String> queryTables = new HashMap<>();
Map<TableId, OptionalLong> rowCountTables = new LinkedHashMap<>();
for (TableId tableId : snapshotContext.capturedTables) {
final Optional<String> selectStatement = determineSnapshotSelect(snapshotContext, tableId);
final Optional<String> selectStatement = determineSnapshotSelect(snapshotContext, tableId, snapshotSelectOverridesByTable);
if (selectStatement.isPresent()) {
LOGGER.info("For table '{}' using select statement: '{}'", tableId, selectStatement.get());
queryTables.put(tableId, selectStatement.get());
@ -626,10 +633,12 @@ protected ChangeRecordEmitter<P> getChangeRecordEmitter(P partition, O offset, T
* defaulting to a statement provided by the DB-specific change event source.
*
* @param tableId the table to generate a query for
* @param snapshotSelectOverridesByTable the select overrides by table
* @return a valid query string or empty if table will not be snapshotted
*/
private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O> snapshotContext, TableId tableId) {
String overriddenSelect = getSnapshotSelectOverridesByTable(tableId);
private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O> snapshotContext, TableId tableId,
Map<DataCollectionId, String> snapshotSelectOverridesByTable) {
String overriddenSelect = getSnapshotSelectOverridesByTable(tableId, snapshotSelectOverridesByTable);
if (overriddenSelect != null) {
return Optional.of(enhanceOverriddenSelect(snapshotContext, overriddenSelect, tableId));
}
@ -639,8 +648,7 @@ private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O>
return getSnapshotSelect(snapshotContext, tableId, columns);
}
protected String getSnapshotSelectOverridesByTable(TableId tableId) {
Map<TableId, String> snapshotSelectOverrides = connectorConfig.getSnapshotSelectOverridesByTable();
protected String getSnapshotSelectOverridesByTable(TableId tableId, Map<DataCollectionId, String> snapshotSelectOverrides) {
String overriddenSelect = snapshotSelectOverrides.get(tableId);
// try without catalog id, as this might or might not be populated based on the given connector

View File

@ -1118,7 +1118,7 @@ private Strings() {
* If a comma is part of expression then it can be prepended with <code>'\'</code> so
* it will not act as a separator.
*/
private static class RegExSplitter implements Tokenizer {
public static class RegExSplitter implements Tokenizer {
public static String[] split(String identifier) {
TokenStream stream = new TokenStream(identifier, new RegExSplitter(), true);

View File

@ -123,7 +123,7 @@ public void testBuildQueryTwoPkColumnsWithAdditionalConditionWithSurrogateKey()
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1", "pk2").create();
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2"));
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2");
assertThat(source.buildChunkQuery(table, Optional.of("\"val1\"=foo")))
.isEqualTo("SELECT * FROM \"s1\".\"table1\" WHERE \"val1\"=foo ORDER BY \"pk2\" LIMIT 1024");
context.nextChunkPosition(new Object[]{ 1, 5 });
@ -202,7 +202,7 @@ public void testBuildQueryTwoPkColumnsWithSurrogateKey() {
.addColumn(val1)
.addColumn(val2)
.setPrimaryKeyNames("pk1", "pk2").create();
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2"));
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2");
assertThat(source.buildChunkQuery(table, Optional.empty()))
.isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" LIMIT 1024");
}
@ -250,7 +250,7 @@ public void testMaxQueryWithSurrogateKey() {
final Column val2 = Column.editor().name("val2").create();
final Table table = Table.editor().tableId(new TableId(null, "s1", "table1")).addColumn(pk1).addColumn(pk2)
.addColumn(val1).addColumn(val2).setPrimaryKeyNames("pk1", "pk2").create();
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), Optional.empty(), Optional.of("pk2"));
context.addDataCollectionNamesToSnapshot("12345", List.of(table.id().toString()), List.of(), "pk2");
assertThat(source.buildMaxPrimaryKeyQuery(table, Optional.empty()))
.isEqualTo("SELECT * FROM \"s1\".\"table1\" ORDER BY \"pk2\" DESC LIMIT 1");
}

View File

@ -602,6 +602,19 @@ protected SourceRecords consumeRecordsByTopic(int numRecords, int breakAfterNull
return records;
}
/**
* Try to consume and capture all available records from the connector.
*
*
* @return the collector into which the records were captured; never null
* @throws InterruptedException if the thread was interrupted while waiting for a record to be returned
*/
protected SourceRecords consumeAvailableRecordsByTopic() throws InterruptedException {
SourceRecords records = new SourceRecords();
consumeAvailableRecords(records::add);
return records;
}
/**
* Try to consume and capture exactly the specified number of records from the connector.
*

View File

@ -12,7 +12,6 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
@ -75,16 +74,17 @@ public void executeBlockingSnapshot() throws Exception {
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), BLOCKING, tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
signalingRecords = 1;
assertRecordsFromSnapshotAndStreamingArePresent((ROW_COUNT * 2) + signalingRecords);
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
insertRecords(ROW_COUNT, ROW_COUNT * 2);
assertStreamingRecordsArePresent(ROW_COUNT + signalingRecords);
assertStreamingRecordsArePresent(ROW_COUNT);
}
@ -102,7 +102,7 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
Thread.sleep(2000); // Let's start stream some insert
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), BLOCKING, tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
@ -114,11 +114,32 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {
insertRecords(ROW_COUNT, (ROW_COUNT * 2));
signalingRecords = 1 + // from streaming
1; // from snapshot
signalingRecords = 1; // from streaming
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords),
getExpectedValues(totalSnapshotRecords));
getExpectedValues(totalSnapshotRecords), topicName());
}
@Test
public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
// Testing.Print.enable();
populateTable(tableNames().get(1).toString());
startConnectorWithSnapshot(x -> mutableConfig(false, false));
waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(
Map.of(tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", tableNames().get(1))), "", BLOCKING,
tableDataCollectionIds().get(1).toString());
waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
signalingRecords = 1; // from streaming
assertRecordsWithValuesPresent(500 + signalingRecords, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString());
}
protected int insertMaxSleep() {
@ -180,21 +201,21 @@ private Future<?> executeAsync(Runnable operation) {
private void assertStreamingRecordsArePresent(int expectedRecords) throws InterruptedException {
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()));
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(2000, 2999).boxed().collect(Collectors.toList()), topicName());
}
private void assertRecordsFromSnapshotAndStreamingArePresent(int expectedRecords) throws InterruptedException {
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList()));
assertRecordsWithValuesPresent(expectedRecords, IntStream.range(0, expectedRecords - 1).boxed().collect(Collectors.toList()), topicName());
}
private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues) throws InterruptedException {
private void assertRecordsWithValuesPresent(int expectedRecords, List<Integer> expectedValues, String topicName) throws InterruptedException {
SourceRecords snapshotAndStreamingRecords = consumeRecordsByTopic(expectedRecords, 10);
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName()).stream()
List<Integer> actual = snapshotAndStreamingRecords.recordsForTopic(topicName).stream()
.map(s -> ((Struct) s.value()).getStruct("after").getInt32(valueFieldName()))
.collect(Collectors.toList());
assertThat(snapshotAndStreamingRecords.allRecordsInOrder().size()).isEqualTo(expectedRecords);
assertThat(actual).containsAll(expectedValues);
}

View File

@ -15,7 +15,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -709,7 +708,33 @@ public void snapshotWithAdditionalCondition() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(),
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "",
tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount,
x -> true, null);
assertEquals(expectedCount, dbChanges.size());
assertTrue(dbChanges.values().stream().allMatch(v -> (((Struct) v.value()).getStruct("after")
.getInt32(valueFieldName())).equals(expectedValue)));
}
@Test
public void snapshotWithNewAdditionalConditionsField() throws Exception {
// Testing.Print.enable();
int expectedCount = 10, expectedValue = 12345678;
populateTable();
populateTableWithSpecificValue(2000, expectedCount, expectedValue);
waitForCdcTransactionPropagation(3);
final Configuration config = config().build();
startAndConsumeTillEnd(connectorClass(), config);
waitForConnectorToStart();
waitForAvailableRecords(1, TimeUnit.SECONDS);
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionId(), String.format("aa = %s", expectedValue)), "",
tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount,
@ -728,7 +753,7 @@ public void shouldExecuteRegularSnapshotWhenAdditionalConditionEmpty() throws Ex
final int recordsCount = ROW_COUNT;
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of("\"\""), Optional.empty(), tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("\"\"", "", tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(recordsCount,
x -> true, null);
@ -751,8 +776,7 @@ public void snapshotWithAdditionalConditionWithRestart() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.empty(),
tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "", tableDataCollectionId());
final AtomicInteger recordCounter = new AtomicInteger();
final AtomicBoolean restarted = new AtomicBoolean();
@ -778,7 +802,7 @@ public void snapshotWithSurrogateKey() throws Exception {
populateTable();
startConnector();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.of("\"aa\""), tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "\"aa\"", tableDataCollectionId());
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(expectedRecordCount);
@ -803,8 +827,7 @@ public void snapshotWithAdditionalConditionWithSurrogateKey() throws Exception {
// there shouldn't be any snapshot records
assertNoRecordsToConsume();
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.of(String.format("\"aa = %s\"", expectedValue)), Optional.of("\"aa\""),
tableDataCollectionId());
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String.format("\"aa = %s\"", expectedValue), "\"aa\"", tableDataCollectionId());
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedCount,
x -> true, null);

View File

@ -13,7 +13,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@ -29,6 +28,7 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.util.Strings;
public abstract class AbstractSnapshotTest<T extends SourceConnector> extends AbstractConnectorTest {
@ -117,6 +117,12 @@ protected void populateTable() throws SQLException {
}
}
protected void populateTable(String table) throws SQLException {
try (JdbcConnection connection = databaseConnection()) {
populateTable(connection, table);
}
}
protected void populateTableWithSpecificValue(int startRow, int count, int value) throws SQLException {
try (JdbcConnection connection = databaseConnection()) {
populateTableWithSpecificValue(connection, tableName(), startRow, count, value);
@ -285,16 +291,22 @@ protected int getMaximumEnqueuedRecordCount() {
}
protected void sendAdHocSnapshotSignal(String... dataCollectionIds) throws SQLException {
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional.empty(), Optional.empty(), dataCollectionIds);
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", dataCollectionIds);
}
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey,
String... dataCollectionIds) {
sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(additionalCondition, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL,
dataCollectionIds);
}
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Optional<String> additionalCondition, Optional<String> surrogateKey,
protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String surrogateKey,
String... dataCollectionIds) {
sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(additionalConditions, surrogateKey, AbstractSnapshotSignal.SnapshotType.INCREMENTAL,
dataCollectionIds);
}
protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(String additionalCondition, String surrogateKey,
AbstractSnapshotSignal.SnapshotType snapshotType,
String... dataCollectionIds) {
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
@ -302,20 +314,20 @@ protected void sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey(Op
.collect(Collectors.joining(", "));
try (JdbcConnection connection = databaseConnection()) {
String query;
if (additionalCondition.isPresent() && surrogateKey.isPresent()) {
if (!Strings.isNullOrEmpty(additionalCondition) && !Strings.isNullOrEmpty(surrogateKey)) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s, \"surrogate-key\": %s}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get(), surrogateKey.get());
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition, surrogateKey);
}
else if (additionalCondition.isPresent()) {
else if (!Strings.isNullOrEmpty(additionalCondition)) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-condition\": %s}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition.get());
signalTableName(), snapshotType.toString(), dataCollectionIdsList, additionalCondition);
}
else if (surrogateKey.isPresent()) {
else if (!Strings.isNullOrEmpty(surrogateKey)) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey.get());
signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey);
}
else {
query = String.format(
@ -329,4 +341,47 @@ else if (surrogateKey.isPresent()) {
logger.warn("Failed to send signal", e);
}
}
protected void sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map<String, String> additionalConditions, String surrogateKey,
AbstractSnapshotSignal.SnapshotType snapshotType,
String... dataCollectionIds) {
final String dataCollectionIdsList = Arrays.stream(dataCollectionIds)
.map(x -> '"' + x + '"')
.collect(Collectors.joining(", "));
try (JdbcConnection connection = databaseConnection()) {
String query;
if (!additionalConditions.isEmpty() && !Strings.isNullOrEmpty(surrogateKey)) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s], \"surrogate-key\": %s}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, buildAdditionalConditions(additionalConditions), surrogateKey);
}
else if (!additionalConditions.isEmpty()) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"additional-conditions\": [%s]}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, buildAdditionalConditions(additionalConditions));
}
else if (!Strings.isNullOrEmpty(surrogateKey)) {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s], \"surrogate-key\": %s}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList, surrogateKey);
}
else {
query = String.format(
"INSERT INTO %s VALUES('ad-hoc', 'execute-snapshot', '{\"type\": \"%s\",\"data-collections\": [%s]}')",
signalTableName(), snapshotType.toString(), dataCollectionIdsList);
}
logger.info("Sending signal with query {}", query);
connection.execute(query);
}
catch (Exception e) {
logger.warn("Failed to send signal", e);
}
}
private static String buildAdditionalConditions(Map<String, String> additionalConditions) {
return additionalConditions.entrySet().stream()
.map(cond -> String.format("{\"data-collection\": \"%s\", \"filter\": \"%s\"}", cond.getKey(), cond.getValue()))
.collect(Collectors.joining(","));
}
}