DBZ-7421 Improve incremental snapshot performance with increasing number of collections to snapshot
This commit is contained in:
parent
0db1a3eaae
commit
81298865a5
@ -36,6 +36,7 @@
|
||||
import io.debezium.relational.Table;
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.HexConverter;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* A class describing current state of incremental snapshot
|
||||
@ -102,10 +103,11 @@ public class AbstractIncrementalSnapshotContext<T> implements IncrementalSnapsho
|
||||
/**
|
||||
* Determines if the incremental snapshot was paused or not.
|
||||
*/
|
||||
private AtomicBoolean paused = new AtomicBoolean(false);
|
||||
private ObjectMapper mapper = new ObjectMapper();
|
||||
private final AtomicBoolean paused = new AtomicBoolean(false);
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
private String dataCollectionsToSnapshotJson;
|
||||
|
||||
private TypeReference<List<LinkedHashMap<String, String>>> mapperTypeRef = new TypeReference<>() {
|
||||
private final TypeReference<List<LinkedHashMap<String, String>>> mapperTypeRef = new TypeReference<>() {
|
||||
};
|
||||
|
||||
public AbstractIncrementalSnapshotContext(boolean useCatalogBeforeSchema) {
|
||||
@ -185,14 +187,20 @@ private Object[] serializedStringToArray(String field, String serialized) {
|
||||
|
||||
private String dataCollectionsToSnapshotAsString() {
|
||||
// TODO Handle non-standard table ids containing dots, commas etc.
|
||||
|
||||
if (!Strings.isNullOrEmpty(dataCollectionsToSnapshotJson)) {
|
||||
// A cached value to improve performance since this method is called in the "store"
|
||||
// that is called during events processing
|
||||
return dataCollectionsToSnapshotJson;
|
||||
}
|
||||
|
||||
try {
|
||||
List<LinkedHashMap<String, String>> dataCollectionsMap = dataCollectionsToSnapshot.stream()
|
||||
.map(x -> {
|
||||
LinkedHashMap<String, String> map = new LinkedHashMap<>();
|
||||
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ID, x.getId().toString());
|
||||
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION,
|
||||
x.getAdditionalCondition().isEmpty() ? null : x.getAdditionalCondition().orElse(null));
|
||||
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, x.getSurrogateKey().isEmpty() ? null : x.getSurrogateKey().orElse(null));
|
||||
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_ADDITIONAL_CONDITION, x.getAdditionalCondition().orElse(null));
|
||||
map.put(DATA_COLLECTIONS_TO_SNAPSHOT_KEY_SURROGATE_KEY, x.getSurrogateKey().orElse(null));
|
||||
return map;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
@ -235,6 +243,7 @@ public Map<String, Object> store(Map<String, Object> offset) {
|
||||
|
||||
private void addTablesIdsToSnapshot(List<DataCollection<T>> dataCollectionIds) {
|
||||
dataCollectionsToSnapshot.addAll(dataCollectionIds);
|
||||
dataCollectionsToSnapshotJson = dataCollectionsToSnapshotAsString();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -264,6 +273,7 @@ private Function<String, DataCollection<T>> buildDataCollection(List<AdditionalC
|
||||
@Override
|
||||
public void stopSnapshot() {
|
||||
this.dataCollectionsToSnapshot.clear();
|
||||
this.dataCollectionsToSnapshotJson = null;
|
||||
this.correlationId = null;
|
||||
}
|
||||
|
||||
@ -271,7 +281,9 @@ public void stopSnapshot() {
|
||||
@SuppressWarnings("unchecked")
|
||||
public boolean removeDataCollectionFromSnapshot(String dataCollectionId) {
|
||||
final T collectionId = (T) TableId.parse(dataCollectionId, useCatalogBeforeSchema);
|
||||
return dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection<T>(collectionId)));
|
||||
boolean removed = dataCollectionsToSnapshot.removeAll(Arrays.asList(new DataCollection<T>(collectionId)));
|
||||
this.dataCollectionsToSnapshotJson = dataCollectionsToSnapshotAsString();
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -300,6 +312,7 @@ protected static <U> IncrementalSnapshotContext<U> init(AbstractIncrementalSnaps
|
||||
: null;
|
||||
final String dataCollectionsStr = (String) offsets.get(DATA_COLLECTIONS_TO_SNAPSHOT_KEY);
|
||||
context.dataCollectionsToSnapshot.clear();
|
||||
context.dataCollectionsToSnapshotJson = null;
|
||||
if (dataCollectionsStr != null) {
|
||||
context.addTablesIdsToSnapshot(context.stringToDataCollections(dataCollectionsStr));
|
||||
}
|
||||
@ -346,7 +359,9 @@ public boolean isNonInitialChunk() {
|
||||
|
||||
public DataCollection<T> nextDataCollection() {
|
||||
resetChunk();
|
||||
return dataCollectionsToSnapshot.poll();
|
||||
DataCollection<T> nextDataCollection = dataCollectionsToSnapshot.poll();
|
||||
this.dataCollectionsToSnapshotJson = dataCollectionsToSnapshotAsString();
|
||||
return nextDataCollection;
|
||||
}
|
||||
|
||||
public void startNewChunk() {
|
||||
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.performance.core;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
import io.debezium.pipeline.signal.actions.snapshotting.AdditionalCondition;
|
||||
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
|
||||
import io.debezium.relational.TableId;
|
||||
|
||||
@Fork(1)
|
||||
@State(Scope.Thread)
|
||||
@Warmup(iterations = 2, time = 5)
|
||||
@Measurement(iterations = 2, time = 5)
|
||||
@OutputTimeUnit(TimeUnit.SECONDS)
|
||||
@BenchmarkMode({ Mode.Throughput })
|
||||
public class IncrementalSnapshotContextPerf {
|
||||
|
||||
private SignalBasedIncrementalSnapshotContext<TableId> snapshotContext;
|
||||
|
||||
@Param({ "1", "5", "10", "50", "100" })
|
||||
private int numberOfTableToSnapshot;
|
||||
|
||||
@Setup(Level.Trial)
|
||||
public void setUp() {
|
||||
snapshotContext = new SignalBasedIncrementalSnapshotContext<>(false);
|
||||
List<String> dataCollectionIds = generateDataCollectionIds(numberOfTableToSnapshot);
|
||||
snapshotContext.addDataCollectionNamesToSnapshot("1",
|
||||
dataCollectionIds,
|
||||
dataCollectionIds.stream().map(id -> AdditionalCondition.AdditionalConditionBuilder.builder()
|
||||
.dataCollection(Pattern.compile(id))
|
||||
.filter("color='blue'")
|
||||
.build()).collect(Collectors.toList()),
|
||||
"");
|
||||
}
|
||||
|
||||
private List<String> generateDataCollectionIds(int number) {
|
||||
|
||||
return IntStream.rangeClosed(1, number)
|
||||
.mapToObj(i -> IntStream.rangeClosed(1, number)
|
||||
.mapToObj(j -> String.format("%s.%s", "db" + i, "table" + j))
|
||||
.collect(Collectors.toList()))
|
||||
.flatMap(List::stream)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void store() {
|
||||
Map<String, Object> offset = new HashMap<>();
|
||||
snapshotContext.store(offset);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user