DBZ-6023 Add support for providing a surrogate key when triggering incremental snapshots

For tables that have a composite Primary Key, SQL queries for incremental snapshot perform bad. This change allows to provide a surrogate key that will be used for the queries. The key should be unique in order for the snapshot to guarantee consistency.
This commit is contained in:
PlugaruT 2023-01-18 21:15:19 +02:00 committed by Jiri Pechanec
parent 0585c6beb8
commit 472162e46e
12 changed files with 82 additions and 29 deletions

View File

@ -338,11 +338,16 @@ private Object[] readMaximumKey() throws InterruptedException {
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(MongoDbPartition partition, List<String> dataCollectionIds,
Optional<String> additionalCondition, OffsetContext offsetContext)
Optional<String> additionalCondition, Optional<String> surrogateKey, OffsetContext offsetContext)
throws InterruptedException {
if (additionalCondition != null && additionalCondition.isPresent()) {
throw new UnsupportedOperationException("Additional condition not supported for MongoDB");
}
if (surrogateKey != null && surrogateKey.isPresent()) {
throw new UnsupportedOperationException("Surrogate key not supported for MongoDB");
}
context = (IncrementalSnapshotContext<CollectionId>) offsetContext.getIncrementalSnapshotContext();
final boolean shouldReadChunk = !context.snapshotRunning();
final String rsName = replicaSets.all().get(0).replicaSetName();
@ -350,7 +355,7 @@ public void addDataCollectionNamesToSnapshot(MongoDbPartition partition, List<St
.stream()
.map(x -> rsName + "." + x)
.collect(Collectors.toList());
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds, null);
final List<DataCollection<CollectionId>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(dataCollectionIds, null, null);
if (shouldReadChunk) {
progressListener.snapshotStarted(partition);
progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds.stream()

View File

@ -195,7 +195,7 @@ private List<DataCollection<T>> stringToDataCollections(String dataCollectionsSt
try {
List<LinkedHashMap<String, String>> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef);
List<DataCollection<T>> dataCollectionsList = dataCollections.stream()
.map(x -> new DataCollection<T>((T) CollectionId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID)), null))
.map(x -> new DataCollection<T>((T) CollectionId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID)), null, null))
.filter(x -> x.getId() != null)
.collect(Collectors.toList());
return dataCollectionsList;
@ -224,9 +224,10 @@ private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
}
@SuppressWarnings("unchecked")
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> _additionalCondition) {
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> _additionalCondition,
Optional<String> surrogateKey) {
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> new DataCollection<T>((T) CollectionId.parse(x), null))
.map(x -> new DataCollection<T>((T) CollectionId.parse(x), null, null))
.filter(x -> x.getId() != null) // Remove collections with incorrectly formatted name
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
@ -242,7 +243,7 @@ public void stopSnapshot() {
@SuppressWarnings("unchecked")
public boolean removeDataCollectionFromSnapshot(String dataCollectionId) {
final T collectionId = (T) CollectionId.parse(dataCollectionId);
return dataCollectionsToSnapshot.remove(new DataCollection<T>(collectionId, null));
return dataCollectionsToSnapshot.remove(new DataCollection<T>(collectionId, null, null));
}
protected static <U> IncrementalSnapshotContext<U> init(MongoDbIncrementalSnapshotContext<U> context, Map<String, ?> offsets) {

View File

@ -174,8 +174,9 @@ public void stopSnapshot(List<String> dataCollectionIds, long signalOffset) {
removeDataCollectionsFromSnapshot(signal, partition, offsetContext);
}
public void enqueueDataCollectionNamesToSnapshot(List<String> dataCollectionIds, long signalOffset, Optional<String> additionalCondition) {
getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset, additionalCondition));
public void enqueueDataCollectionNamesToSnapshot(List<String> dataCollectionIds, long signalOffset, Optional<String> additionalCondition,
Optional<String> surrogateKey) {
getContext().enqueueKafkaSignal(new ExecuteSnapshotKafkaSignal(dataCollectionIds, signalOffset, additionalCondition, surrogateKey));
}
public void enqueuePauseSnapshot() {
@ -285,7 +286,9 @@ else if (signal instanceof ResumeSnapshotKafkaSignal) {
private void addDataCollectionNamesToSnapshot(ExecuteSnapshotKafkaSignal executeSnapshotSignal, MySqlPartition partition, OffsetContext offsetContext)
throws InterruptedException {
super.addDataCollectionNamesToSnapshot(partition, executeSnapshotSignal.getDataCollections(), executeSnapshotSignal.getAdditionalCondition(), offsetContext);
super.addDataCollectionNamesToSnapshot(partition, executeSnapshotSignal.getDataCollections(), executeSnapshotSignal.getAdditionalCondition(),
executeSnapshotSignal.getSurrogateKey(),
offsetContext);
getContext().setSignalOffset(executeSnapshotSignal.getSignalOffset());
}

View File

@ -12,11 +12,13 @@ public class ExecuteSnapshotKafkaSignal implements KafkaSignal {
private final List<String> dataCollections;
private final long signalOffset;
private final Optional<String> additionalCondition;
private final Optional<String> surrogateKey;
public ExecuteSnapshotKafkaSignal(List<String> dataCollections, long signalOffset, Optional<String> additionalCondition) {
public ExecuteSnapshotKafkaSignal(List<String> dataCollections, long signalOffset, Optional<String> additionalCondition, Optional<String> surrogateKey) {
this.dataCollections = dataCollections;
this.signalOffset = signalOffset;
this.additionalCondition = additionalCondition;
this.surrogateKey = surrogateKey;
}
public List<String> getDataCollections() {
@ -30,4 +32,8 @@ public long getSignalOffset() {
public Optional<String> getAdditionalCondition() {
return additionalCondition;
}
public Optional<String> getSurrogateKey() {
return surrogateKey;
}
}

View File

@ -176,10 +176,11 @@ private void executeSnapshot(Document data, long signalOffset) {
if (dataCollections != null) {
ExecuteSnapshot.SnapshotType snapshotType = ExecuteSnapshot.getSnapshotType(data);
Optional<String> additionalCondition = ExecuteSnapshot.getAdditionalCondition(data);
Optional<String> surrogateKey = ExecuteSnapshot.getSurrogateKey(data);
LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}'", snapshotType, dataCollections,
additionalCondition.orElse("No condition passed"));
if (snapshotType == ExecuteSnapshot.SnapshotType.INCREMENTAL) {
eventSource.enqueueDataCollectionNamesToSnapshot(dataCollections, signalOffset, additionalCondition);
eventSource.enqueueDataCollectionNamesToSnapshot(dataCollections, signalOffset, additionalCondition, surrogateKey);
}
}
}

View File

@ -20,6 +20,7 @@ public abstract class AbstractSnapshotSignal<P extends Partition> implements Sig
protected static final String FIELD_DATA_COLLECTIONS = "data-collections";
protected static final String FIELD_TYPE = "type";
protected static final String FIELD_ADDITIONAL_CONDITION = "additional-condition";
protected static final String FIELD_SURROGATE_KEY = "surrogate-key";
public enum SnapshotType {
INCREMENTAL

View File

@ -47,12 +47,13 @@ public boolean arrived(Payload<P> signalPayload) throws InterruptedException {
}
SnapshotType type = getSnapshotType(signalPayload.data);
Optional<String> additionalCondition = getAdditionalCondition(signalPayload.data);
LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}'", type, dataCollections,
additionalCondition.orElse("No condition passed"));
Optional<String> surrogateKey = getSurrogateKey(signalPayload.data);
LOGGER.info("Requested '{}' snapshot of data collections '{}' with additional condition '{}' and surrogate key '{}'", type, dataCollections,
additionalCondition.orElse("No condition passed"), surrogateKey.orElse("PK of table will be used"));
switch (type) {
case INCREMENTAL:
dispatcher.getIncrementalSnapshotChangeEventSource().addDataCollectionNamesToSnapshot(
signalPayload.partition, dataCollections, additionalCondition, signalPayload.offsetContext);
signalPayload.partition, dataCollections, additionalCondition, surrogateKey, signalPayload.offsetContext);
break;
}
return true;
@ -75,4 +76,9 @@ public static Optional<String> getAdditionalCondition(Document data) {
String additionalCondition = data.getString(FIELD_ADDITIONAL_CONDITION);
return (additionalCondition == null || additionalCondition.trim().isEmpty()) ? Optional.empty() : Optional.of(additionalCondition);
}
public static Optional<String> getSurrogateKey(Document data) {
String surrogateKey = data.getString(FIELD_SURROGATE_KEY);
return (surrogateKey == null || surrogateKey.trim().isEmpty()) ? Optional.empty() : Optional.of(surrogateKey);
}
}

View File

@ -215,7 +215,7 @@ protected String buildChunkQuery(Table table, int limit, Optional<String> additi
addLowerBound(table, sql);
condition = sql.toString();
}
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
final String orderBy = getQueryColumns(table).stream()
.map(c -> jdbcConnection.quotedColumnIdString(c.name()))
.collect(Collectors.joining(", "));
return jdbcConnection.buildSelectWithRowLimits(table.id(),
@ -248,7 +248,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
// For four columns
// (k1 > ?) OR (k1 = ? AND k2 > ?) OR (k1 = ? AND k2 = ? AND k3 > ?) OR (k1 = ? AND k2 = ? AND k3 = ? AND k4 > ?)
// etc.
final List<Column> pkColumns = getKeyMapper().getKeyKolumns(table);
final List<Column> pkColumns = getQueryColumns(table);
if (pkColumns.size() > 1) {
sql.append('(');
}
@ -274,7 +274,7 @@ private void addLowerBound(Table table, StringBuilder sql) {
}
protected String buildMaxPrimaryKeyQuery(Table table, Optional<String> additionalCondition) {
final String orderBy = getKeyMapper().getKeyKolumns(table).stream()
final String orderBy = getQueryColumns(table).stream()
.map(c -> jdbcConnection.quotedColumnIdString(c.name()))
.collect(Collectors.joining(" DESC, ")) + " DESC";
return jdbcConnection.buildSelectWithRowLimits(table.id(), 1, buildProjection(table), Optional.empty(),
@ -400,7 +400,7 @@ private boolean isTableInvalid(P partition) {
nextDataCollection(partition);
return true;
}
if (getKeyMapper().getKeyKolumns(currentTable).isEmpty()) {
if (getQueryColumns(currentTable).isEmpty()) {
LOGGER.warn("Incremental snapshot for table '{}' skipped cause the table has no primary keys", currentTableId);
nextDataCollection(partition);
return true;
@ -461,7 +461,8 @@ private void nextDataCollection(P partition) {
@Override
@SuppressWarnings("unchecked")
public void addDataCollectionNamesToSnapshot(P partition, List<String> dataCollectionIds, Optional<String> additionalCondition, OffsetContext offsetContext)
public void addDataCollectionNamesToSnapshot(P partition, List<String> dataCollectionIds, Optional<String> additionalCondition, Optional<String> surrogateKey,
OffsetContext offsetContext)
throws InterruptedException {
context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
boolean shouldReadChunk = !context.snapshotRunning();
@ -471,7 +472,7 @@ public void addDataCollectionNamesToSnapshot(P partition, List<String> dataColle
LOGGER.info("Data-collections to snapshot have been expanded from {} to {}", dataCollectionIds, expandedDataCollectionIds);
}
final List<DataCollection<T>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(expandedDataCollectionIds, additionalCondition);
final List<DataCollection<T>> newDataCollectionIds = context.addDataCollectionNamesToSnapshot(expandedDataCollectionIds, additionalCondition, surrogateKey);
if (shouldReadChunk) {
progressListener.snapshotStarted(partition);
progressListener.monitoredDataCollectionsDetermined(partition, newDataCollectionIds.stream()
@ -545,7 +546,7 @@ public void stopSnapshot(P partition, List<String> dataCollectionIds, OffsetCont
}
protected void addKeyColumnsToCondition(Table table, StringBuilder sql, String predicate) {
for (Iterator<Column> i = getKeyMapper().getKeyKolumns(table).iterator(); i.hasNext();) {
for (Iterator<Column> i = getQueryColumns(table).iterator(); i.hasNext();) {
final Column key = i.next();
sql.append(jdbcConnection.quotedColumnIdString(key.name())).append(predicate);
if (i.hasNext()) {
@ -716,7 +717,7 @@ private Object[] keyFromRow(Object[] row) {
if (row == null) {
return null;
}
final List<Column> keyColumns = getKeyMapper().getKeyKolumns(currentTable);
final List<Column> keyColumns = getQueryColumns(currentTable);
final Object[] key = new Object[keyColumns.size()];
for (int i = 0; i < keyColumns.size(); i++) {
final Object fieldValue = row[keyColumns.get(i).position() - 1];
@ -760,4 +761,14 @@ protected Table refreshTableSchema(Table table) throws SQLException {
private KeyMapper getKeyMapper() {
return connectorConfig.getKeyMapper() == null ? table -> table.primaryKeyColumns() : connectorConfig.getKeyMapper();
}
private List<Column> getQueryColumns(Table table) {
if (context != null && context.currentDataCollectionId() != null) {
Optional<String> surrogateKey = context.currentDataCollectionId().getSurrogateKey();
if (surrogateKey.isPresent()) {
return Collections.singletonList(table.columnWithName(surrogateKey.get()));
}
}
return getKeyMapper().getKeyKolumns(table);
}
}

View File

@ -54,6 +54,9 @@ public class AbstractIncrementalSnapshotContext<T> implements IncrementalSnapsho
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION = DATA_COLLECTIONS_TO_SNAPSHOT_KEY
+ "_additional_condition";
public static final String DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY = DATA_COLLECTIONS_TO_SNAPSHOT_KEY
+ "_surrogate_key";
public static final String EVENT_PRIMARY_KEY = INCREMENTAL_SNAPSHOT_KEY + "_primary_key";
public static final String TABLE_MAXIMUM_KEY = INCREMENTAL_SNAPSHOT_KEY + "_maximum_key";
@ -183,6 +186,8 @@ private String dataCollectionsToSnapshotAsString() {
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString());
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION,
x.getAdditionalCondition().orElse(null));
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY,
x.getSurrogateKey().orElse(null));
return map;
})
.collect(Collectors.toList());
@ -198,7 +203,8 @@ private List<DataCollection<T>> stringToDataCollections(String dataCollectionsSt
List<LinkedHashMap<String, String>> dataCollections = mapper.readValue(dataCollectionsStr, mapperTypeRef);
List<DataCollection<T>> dataCollectionsList = dataCollections.stream()
.map(x -> new DataCollection<T>((T) TableId.parse(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID), useCatalogBeforeSchema),
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION))))
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION)),
Optional.ofNullable(x.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY))))
.collect(Collectors.toList());
return dataCollectionsList;
}
@ -226,9 +232,9 @@ private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
}
@SuppressWarnings("unchecked")
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> additionalCondition) {
public List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> additionalCondition, Optional<String> surrogateKey) {
final List<DataCollection<T>> newDataCollectionIds = dataCollectionIds.stream()
.map(x -> new DataCollection<T>((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition))
.map(x -> new DataCollection<T>((T) TableId.parse(x, useCatalogBeforeSchema), additionalCondition, surrogateKey))
.collect(Collectors.toList());
addTablesIdsToSnapshot(newDataCollectionIds);
return newDataCollectionIds;
@ -243,7 +249,7 @@ public void stopSnapshot() {
@SuppressWarnings("unchecked")
public boolean removeDataCollectionFromSnapshot(String dataCollectionId) {
final T collectionId = (T) TableId.parse(dataCollectionId, useCatalogBeforeSchema);
return dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection<T>(collectionId, null)));
return dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection<T>(collectionId, null, null)));
}
protected static <U> IncrementalSnapshotContext<U> init(AbstractIncrementalSnapshotContext<U> context, Map<String, ?> offsets) {

View File

@ -20,9 +20,12 @@ public class DataCollection<T> {
private Optional<String> additionalCondition;
public DataCollection(T id, Optional<String> additionalCondition) {
private Optional<String> surrogateKey;
public DataCollection(T id, Optional<String> additionalCondition, Optional<String> surrogateKey) {
this.id = id;
this.additionalCondition = additionalCondition == null ? Optional.empty() : additionalCondition;
this.surrogateKey = surrogateKey == null ? Optional.empty() : surrogateKey;
}
public T getId() {
@ -41,6 +44,14 @@ public void setAdditionalCondition(Optional<String> additionalCondition) {
this.additionalCondition = additionalCondition;
}
public Optional<String> getSurrogateKey() {
return surrogateKey;
}
public void setSurrogateKey(Optional<String> surrogateKey) {
this.surrogateKey = surrogateKey;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -63,6 +74,7 @@ public String toString() {
return "DataCollection{" +
"id=" + id +
", additionalCondition=" + additionalCondition +
", surrogateKey=" + surrogateKey +
'}';
}
}

View File

@ -31,7 +31,8 @@ public interface IncrementalSnapshotChangeEventSource<P extends Partition, T ext
void init(P partition, OffsetContext offsetContext);
void addDataCollectionNamesToSnapshot(P partition, List<String> dataCollectionIds, Optional<String> additionalCondition, OffsetContext offsetContext)
void addDataCollectionNamesToSnapshot(P partition, List<String> dataCollectionIds, Optional<String> additionalCondition, Optional<String> surrogateKey,
OffsetContext offsetContext)
throws InterruptedException;
void stopSnapshot(P partition, List<String> dataCollectionIds, OffsetContext offsetContext);

View File

@ -17,7 +17,7 @@ public interface IncrementalSnapshotContext<T> {
DataCollection<T> nextDataCollection();
List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> additionalCondition);
List<DataCollection<T>> addDataCollectionNamesToSnapshot(List<String> dataCollectionIds, Optional<String> additionalCondition, Optional<String> surrogateKey);
int dataCollectionsToBeSnapshottedCount();