From 472162e46e2a0c1412c4e514caa548bae0f9c62e Mon Sep 17 00:00:00 2001 From: PlugaruT Date: Wed, 18 Jan 2023 21:15:19 +0200 Subject: [PATCH] DBZ-6023 Add support for providing a surrogate key when triggering incremental snapshots For tables that have a composite Primary Key, SQL queries for incremental snapshot perform bad. This change allows to provide a surrogate key that will be used for the queries. The key should be unique in order for the snapshot to guarantee consistency. --- ...bIncrementalSnapshotChangeEventSource.java | 9 +++++-- .../MongoDbIncrementalSnapshotContext.java | 9 ++++--- ...yIncrementalSnapshotChangeEventSource.java | 9 ++++--- .../signal/ExecuteSnapshotKafkaSignal.java | 8 +++++- .../mysql/signal/KafkaSignalThread.java | 3 ++- .../signal/AbstractSnapshotSignal.java | 1 + .../pipeline/signal/ExecuteSnapshot.java | 12 ++++++--- ...tIncrementalSnapshotChangeEventSource.java | 27 +++++++++++++------ .../AbstractIncrementalSnapshotContext.java | 14 +++++++--- .../snapshot/incremental/DataCollection.java | 14 +++++++++- .../IncrementalSnapshotChangeEventSource.java | 3 ++- .../IncrementalSnapshotContext.java | 2 +- 12 files changed, 82 insertions(+), 29 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java index 66786bb05..b3c7e4aee 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotChangeEventSource.java @@ -338,11 +338,16 @@ private Object[] readMaximumKey() throws InterruptedException { @Override @SuppressWarnings("unchecked") public void addDataCollectionNamesToSnapshot(MongoDbPartition partition, List dataCollectionIds, - Optional additionalCondition, OffsetContext offsetContext) + Optional additionalCondition, Optional surrogateKey, OffsetContext offsetContext) throws InterruptedException { if (additionalCondition != null && additionalCondition.isPresent()) { throw new UnsupportedOperationException("Additional condition not supported for MongoDB"); } + + if (surrogateKey != null && surrogateKey.isPresent()) { + throw new UnsupportedOperationException("Surrogate key not supported for MongoDB"); + } + context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); final boolean shouldReadChunk = !context.snapshotRunning(); final String rsName = replicaSets.all().get(0).replicaSetName(); @@ -350,7 +355,7 @@ public void addDataCollectionNamesToSnapshot(MongoDbPartition partition, List rsName + "." + x) .collect(Collectors.toList()); - final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds, null); + final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds, null, null); if (shouldReadChunk) { progressListener.snapshotStarted(partition); progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds.stream() diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java index 9333c2d9b..16ee0f4d0 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbIncrementalSnapshotContext.java @@ -195,7 +195,7 @@ private List> stringToDataCollections(String dataCollectionsSt try { List> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef); List> dataCollectionsList = dataCollections.stream() - .map(x -> new DataCollection((T) CollectionId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID)), null)) + .map(x -> new DataCollection((T) CollectionId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID)), null, null)) .filter(x -> x.getId() != null) .collect(Collectors.toList()); return dataCollectionsList; @@ -224,9 +224,10 @@ private void addTablesIdsToSnapshot(List> dataCollectionIds) { } @SuppressWarnings("unchecked") - public List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional _additionalCondition) { + public List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional _additionalCondition, + Optional surrogateKey) { final List> newDataCollectionIds = dataCollectionIds.stream() - .map(x -> new DataCollection((T) CollectionId.parse(x), null)) + .map(x -> new DataCollection((T) CollectionId.parse(x), null, null)) .filter(x -> x.getId() != null) // Remove collections with incorrectly formatted name .collect(Collectors.toList()); addTablesIdsToSnapshot(newDataCollectionIds); @@ -242,7 +243,7 @@ public void stopSnapshot() { @SuppressWarnings("unchecked") public boolean removeDataCollectionFromSnapshot(String dataCollectionId) { final T collectionId = (T) CollectionId.parse(dataCollectionId); - return dataCollectionsToSnapshot.remove(new DataCollection(collectionId, null)); + return dataCollectionsToSnapshot.remove(new DataCollection(collectionId, null, null)); } protected static IncrementalSnapshotContext init(MongoDbIncrementalSnapshotContext context, Map offsets) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java index 4ec4b1687..2938a376a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java @@ -174,8 +174,9 @@ public void stopSnapshot(List dataCollectionIds, long signalOffset) { removeDataCollectionsFromSnapshot(signal, partition, offsetContext); } - public void enqueueDataCollectionNamesToSnapshot(List dataCollectionIds, long signalOffset, Optional additionalCondition) { - getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset, additionalCondition)); + public void enqueueDataCollectionNamesToSnapshot(List dataCollectionIds, long signalOffset, Optional additionalCondition, + Optional surrogateKey) { + getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset, additionalCondition, surrogateKey)); } public void enqueuePauseSnapshot() { @@ -285,7 +286,9 @@ else if (signal instanceof ResumeSnapshotKafkaSignal) { private void addDataCollectionNamesToSnapshot(ExecuteSnapshotKafkaSignal executeSnapshotSignal, MySqlPartition partition, OffsetContext offsetContext) throws InterruptedException { - super.addDataCollectionNamesToSnapshot(partition, executeSnapshotSignal.getDataCollections(), executeSnapshotSignal.getAdditionalCondition(), offsetContext); + super.addDataCollectionNamesToSnapshot(partition, executeSnapshotSignal.getDataCollections(), executeSnapshotSignal.getAdditionalCondition(), + executeSnapshotSignal.getSurrogateKey(), + offsetContext); getContext().setSignalOffset(executeSnapshotSignal.getSignalOffset()); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ExecuteSnapshotKafkaSignal.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ExecuteSnapshotKafkaSignal.java index 414cb2e6c..3c8f2d6f2 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ExecuteSnapshotKafkaSignal.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/ExecuteSnapshotKafkaSignal.java @@ -12,11 +12,13 @@ public class ExecuteSnapshotKafkaSignal implements KafkaSignal { private final List dataCollections; private final long signalOffset; private final Optional additionalCondition; + private final Optional surrogateKey; - public ExecuteSnapshotKafkaSignal(List dataCollections, long signalOffset, Optional additionalCondition) { + public ExecuteSnapshotKafkaSignal(List dataCollections, long signalOffset, Optional additionalCondition, Optional surrogateKey) { this.dataCollections = dataCollections; this.signalOffset = signalOffset; this.additionalCondition = additionalCondition; + this.surrogateKey = surrogateKey; } public List getDataCollections() { @@ -30,4 +32,8 @@ public long getSignalOffset() { public Optional getAdditionalCondition() { return additionalCondition; } + + public Optional getSurrogateKey() { + return surrogateKey; + } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java index 0f63e1649..11181fe9a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/signal/KafkaSignalThread.java @@ -176,10 +176,11 @@ private void executeSnapshot(Document data, long signalOffset) { if (dataCollections != null) { ExecuteSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data); Optional additionalCondition = ExecuteSnapshot.getAdditionalCondition(data); + Optional surrogateKey = ExecuteSnapshot.getSurrogateKey(data); LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}'", snapshotType, dataCollections, additionalCondition.orElse("No condition passed")); if (snapshotType == ExecuteSnapshot.SnapshotType.INCREMENTAL) { - eventSource.enqueueDataCollectionNamesToSnapshot(dataCollections, signalOffset, additionalCondition); + eventSource.enqueueDataCollectionNamesToSnapshot(dataCollections, signalOffset, additionalCondition, surrogateKey); } } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/AbstractSnapshotSignal.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/AbstractSnapshotSignal.java index b12d0b5c8..5476d7c0a 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/AbstractSnapshotSignal.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/AbstractSnapshotSignal.java @@ -20,6 +20,7 @@ public abstract class AbstractSnapshotSignal

implements Sig protected static final String FIELD_DATA_COLLECTIONS = "data-collections"; protected static final String FIELD_TYPE = "type"; protected static final String FIELD_ADDITIONAL_CONDITION = "additional-condition"; + protected static final String FIELD_SURROGATE_KEY = "surrogate-key"; public enum SnapshotType { INCREMENTAL diff --git a/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java b/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java index c6d000952..9d5d2a740 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/signal/ExecuteSnapshot.java @@ -47,12 +47,13 @@ public boolean arrived(Payload

signalPayload) throws InterruptedException { } SnapshotType type = getSnapshotType(signalPayload.data); Optional additionalCondition = getAdditionalCondition(signalPayload.data); - LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}'", type, dataCollections, - additionalCondition.orElse("No condition passed")); + Optional surrogateKey = getSurrogateKey(signalPayload.data); + LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}' and surrogate key '{}'", type, dataCollections, + additionalCondition.orElse("No condition passed"), surrogateKey.orElse("PK of table will be used")); switch (type) { case INCREMENTAL: dispatcher.getIncrementalSnapshotChangeEventSource().addDataCollectionNamesToSnapshot( - signalPayload.partition, dataCollections, additionalCondition, signalPayload.offsetContext); + signalPayload.partition, dataCollections, additionalCondition, surrogateKey, signalPayload.offsetContext); break; } return true; @@ -75,4 +76,9 @@ public static Optional getAdditionalCondition(Document data) { String additionalCondition = data.getString(FIELD_ADDITIONAL_CONDITION); return (additionalCondition == null || additionalCondition.trim().isEmpty()) ? Optional.empty() : Optional.of(additionalCondition); } + + public static Optional getSurrogateKey(Document data) { + String surrogateKey = data.getString(FIELD_SURROGATE_KEY); + return (surrogateKey == null || surrogateKey.trim().isEmpty()) ? Optional.empty() : Optional.of(surrogateKey); + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java index 60a55b193..fe5bb92e6 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotChangeEventSource.java @@ -215,7 +215,7 @@ protected String buildChunkQuery(Table table, int limit, Optional additi addLowerBound(table, sql); condition = sql.toString(); } - final String orderBy = getKeyMapper().getKeyKolumns(table).stream() + final String orderBy = getQueryColumns(table).stream() .map(c -> jdbcConnection.quotedColumnIdString(c.name())) .collect(Collectors.joining(", ")); return jdbcConnection.buildSelectWithRowLimits(table.id(), @@ -248,7 +248,7 @@ private void addLowerBound(Table table, StringBuilder sql) { // For four columns // (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?) // etc. - final List pkColumns = getKeyMapper().getKeyKolumns(table); + final List pkColumns = getQueryColumns(table); if (pkColumns.size() > 1) { sql.append('('); } @@ -274,7 +274,7 @@ private void addLowerBound(Table table, StringBuilder sql) { } protected String buildMaxPrimaryKeyQuery(Table table, Optional additionalCondition) { - final String orderBy = getKeyMapper().getKeyKolumns(table).stream() + final String orderBy = getQueryColumns(table).stream() .map(c -> jdbcConnection.quotedColumnIdString(c.name())) .collect(Collectors.joining(" DESC, ")) + " DESC"; return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(), @@ -400,7 +400,7 @@ private boolean isTableInvalid(P partition) { nextDataCollection(partition); return true; } - if (getKeyMapper().getKeyKolumns(currentTable).isEmpty()) { + if (getQueryColumns(currentTable).isEmpty()) { LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId); nextDataCollection(partition); return true; @@ -461,7 +461,8 @@ private void nextDataCollection(P partition) { @Override @SuppressWarnings("unchecked") - public void addDataCollectionNamesToSnapshot(P partition, List dataCollectionIds, Optional additionalCondition, OffsetContext offsetContext) + public void addDataCollectionNamesToSnapshot(P partition, List dataCollectionIds, Optional additionalCondition, Optional surrogateKey, + OffsetContext offsetContext) throws InterruptedException { context = (IncrementalSnapshotContext) offsetContext.getIncrementalSnapshotContext(); boolean shouldReadChunk = !context.snapshotRunning(); @@ -471,7 +472,7 @@ public void addDataCollectionNamesToSnapshot(P partition, List dataColle LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", dataCollectionIds, expandedDataCollectionIds); } - final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(expandedDataCollectionIds, additionalCondition); + final List> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(expandedDataCollectionIds, additionalCondition, surrogateKey); if (shouldReadChunk) { progressListener.snapshotStarted(partition); progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds.stream() @@ -545,7 +546,7 @@ public void stopSnapshot(P partition, List dataCollectionIds, OffsetCont } protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) { - for (Iterator i = getKeyMapper().getKeyKolumns(table).iterator(); i.hasNext();) { + for (Iterator i = getQueryColumns(table).iterator(); i.hasNext();) { final Column key = i.next(); sql.append(jdbcConnection.quotedColumnIdString(key.name())).append(predicate); if (i.hasNext()) { @@ -716,7 +717,7 @@ private Object[] keyFromRow(Object[] row) { if (row == null) { return null; } - final List keyColumns = getKeyMapper().getKeyKolumns(currentTable); + final List keyColumns = getQueryColumns(currentTable); final Object[] key = new Object[keyColumns.size()]; for (int i = 0; i < keyColumns.size(); i++) { final Object fieldValue = row[keyColumns.get(i).position() - 1]; @@ -760,4 +761,14 @@ protected Table refreshTableSchema(Table table) throws SQLException { private KeyMapper getKeyMapper() { return connectorConfig.getKeyMapper() == null ? table -> table.primaryKeyColumns() : connectorConfig.getKeyMapper(); } + + private List getQueryColumns(Table table) { + if (context != null && context.currentDataCollectionId() != null) { + Optional surrogateKey = context.currentDataCollectionId().getSurrogateKey(); + if (surrogateKey.isPresent()) { + return Collections.singletonList(table.columnWithName(surrogateKey.get())); + } + } + return getKeyMapper().getKeyKolumns(table); + } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java index 018b89b16..19ead1ad5 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotContext.java @@ -54,6 +54,9 @@ public class AbstractIncrementalSnapshotContext implements IncrementalSnapsho public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION = DATA_COLLECTIONS_TO_SNAPSHOT_KEY + "_additional_condition"; + public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY = DATA_COLLECTIONS_TO_SNAPSHOT_KEY + + "_surrogate_key"; + public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key"; public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key"; @@ -183,6 +186,8 @@ private String dataCollectionsToSnapshotAsString() { map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString()); map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION, x.getAdditionalCondition().orElse(null)); + map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, + x.getSurrogateKey().orElse(null)); return map; }) .collect(Collectors.toList()); @@ -198,7 +203,8 @@ private List> stringToDataCollections(String dataCollectionsSt List> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef); List> dataCollectionsList = dataCollections.stream() .map(x -> new DataCollection((T) TableId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID), useCatalogBeforeSchema), - Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)))) + Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)), + Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY)))) .collect(Collectors.toList()); return dataCollectionsList; } @@ -226,9 +232,9 @@ private void addTablesIdsToSnapshot(List> dataCollectionIds) { } @SuppressWarnings("unchecked") - public List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional additionalCondition) { + public List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional additionalCondition, Optional surrogateKey) { final List> newDataCollectionIds = dataCollectionIds.stream() - .map(x -> new DataCollection((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition)) + .map(x -> new DataCollection((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition, surrogateKey)) .collect(Collectors.toList()); addTablesIdsToSnapshot(newDataCollectionIds); return newDataCollectionIds; @@ -243,7 +249,7 @@ public void stopSnapshot() { @SuppressWarnings("unchecked") public boolean removeDataCollectionFromSnapshot(String dataCollectionId) { final T collectionId = (T) TableId.parse(dataCollectionId, useCatalogBeforeSchema); - return dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection(collectionId, null))); + return dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection(collectionId, null, null))); } protected static IncrementalSnapshotContext init(AbstractIncrementalSnapshotContext context, Map offsets) { diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java index 6095bdcf2..8ed74bcd9 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/DataCollection.java @@ -20,9 +20,12 @@ public class DataCollection { private Optional additionalCondition; - public DataCollection(T id, Optional additionalCondition) { + private Optional surrogateKey; + + public DataCollection(T id, Optional additionalCondition, Optional surrogateKey) { this.id = id; this.additionalCondition = additionalCondition == null ? Optional.empty() : additionalCondition; + this.surrogateKey = surrogateKey == null ? Optional.empty() : surrogateKey; } public T getId() { @@ -41,6 +44,14 @@ public void setAdditionalCondition(Optional additionalCondition) { this.additionalCondition = additionalCondition; } + public Optional getSurrogateKey() { + return surrogateKey; + } + + public void setSurrogateKey(Optional surrogateKey) { + this.surrogateKey = surrogateKey; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -63,6 +74,7 @@ public String toString() { return "DataCollection{" + "id=" + id + ", additionalCondition=" + additionalCondition + + ", surrogateKey=" + surrogateKey + '}'; } } diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java index 4fe428e34..d6319db23 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotChangeEventSource.java @@ -31,7 +31,8 @@ public interface IncrementalSnapshotChangeEventSource

dataCollectionIds, Optional additionalCondition, OffsetContext offsetContext) + void addDataCollectionNamesToSnapshot(P partition, List dataCollectionIds, Optional additionalCondition, Optional surrogateKey, + OffsetContext offsetContext) throws InterruptedException; void stopSnapshot(P partition, List dataCollectionIds, OffsetContext offsetContext); diff --git a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java index e877c6875..0408c6216 100644 --- a/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java +++ b/debezium-core/src/main/java/io/debezium/pipeline/source/snapshot/incremental/IncrementalSnapshotContext.java @@ -17,7 +17,7 @@ public interface IncrementalSnapshotContext { DataCollection nextDataCollection(); - List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional additionalCondition); + List> addDataCollectionNamesToSnapshot(List dataCollectionIds, Optional additionalCondition, Optional surrogateKey); int dataCollectionsToBeSnapshottedCount();