DBZ-7776 JMH benchmarks for engine processing only

Example output (and yes, very poor async-engine results are not a mistake,
see DBZ-7777):

    Benchmark                                        (processingOrder)  (recordCount)  (threadCount)  Mode  Cnt    Score   Error  Units
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              1    ss       295.353           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              2    ss       310.652           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              4    ss       305.956           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000              8    ss       334.755           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine               ORDERED         100000             16    ss       306.477           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              1    ss       257.661           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              2    ss        78.385           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              4    ss        75.899           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000              8    ss        81.068           s/op
    DebeziumEnginePerf.processRecordsAsyncEngine             UNORDERED         100000             16    ss        94.506           s/op
    DebeziumEnginePerf.processRecordsEmbeddedEngine                N/A         100000            N/A    ss         0.857           s/op
This commit is contained in:
Vojtech Juranek 2024-04-15 23:57:00 +02:00 committed by Jiri Pechanec
parent a948675c4d
commit a6a2801b01
2 changed files with 388 additions and 0 deletions

View File

@ -0,0 +1,212 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.performance.engine;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import io.debezium.config.Configuration;
import io.debezium.embedded.ConvertingEngineBuilderFactory;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.performance.engine.connector.PreComputedRecordsSourceConnector;
import io.debezium.util.IoUtil;
/**
* JMH benchmark focused on speed of record processing of given {@link DebeziumEngine} implementation.
*/
public class DebeziumEnginePerf {
@State(Scope.Thread)
public static abstract class DebeziumEnginePerfTest {
protected static final KeyValueChangeEventFormat KV_EVENT_FORMAT = KeyValueChangeEventFormat.of(Json.class, Json.class);
protected static final String OFFSET_FILE_NAME = "offsets.txt";
private DebeziumEngine<ChangeEvent<String, String>> engine;
private ExecutorService executors;
protected CountDownLatch finishLatch;
@Param({ "100000", "1000000" })
public int recordCount;
public abstract DebeziumEngine createEngine();
@Setup(Level.Iteration)
public void doSetup() throws InterruptedException {
delete(OFFSET_FILE_NAME);
finishLatch = new CountDownLatch(recordCount);
engine = createEngine();
executors = Executors.newFixedThreadPool(1);
executors.execute(engine);
}
@TearDown(Level.Iteration)
public void doCleanup() throws IOException {
try {
if (engine != null) {
engine.close();
}
if (executors != null) {
executors.shutdown();
try {
executors.awaitTermination(60, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
executors.shutdownNow();
}
}
}
finally {
engine = null;
executors = null;
}
}
protected Consumer<ChangeEvent<String, String>> getRecordConsumer() {
return record -> {
if (record != null) {
finishLatch.countDown();
}
};
}
protected Path getPath(String relativePath) {
return Paths.get(resolveDataDir(), relativePath).toAbsolutePath();
}
private void delete(String relativePath) {
Path history = getPath(relativePath).toAbsolutePath();
if (history != null) {
history = history.toAbsolutePath();
if (inTestDataDir(history)) {
try {
IoUtil.delete(history);
}
catch (IOException e) {
// ignored
}
}
}
}
private boolean inTestDataDir(Path path) {
Path target = FileSystems.getDefault().getPath(resolveDataDir()).toAbsolutePath();
return path.toAbsolutePath().startsWith(target);
}
private String resolveDataDir() {
String value = System.getProperty("dbz.test.data.dir");
if (value != null && (value = value.trim()).length() > 0) {
return value;
}
value = System.getenv("DBZ_TEST_DATA_DIR");
if (value != null && (value = value.trim()).length() > 0) {
return value;
}
return "/tmp";
}
}
@State(Scope.Thread)
public static class AsyncEnginePerfTest extends DebeziumEnginePerfTest {
@Param({ "1", "2", "4", "8", "16" })
public int threadCount;
@Param({ "ORDERED", "UNORDERED" })
public String processingOrder;
public DebeziumEngine createEngine() {
Configuration config = Configuration.create()
.with(EmbeddedEngine.ENGINE_NAME, "async-engine")
.with(EmbeddedEngine.CONNECTOR_CLASS, PreComputedRecordsSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath(OFFSET_FILE_NAME).toAbsolutePath())
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 3_600_000)
.with(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, 100)
.with(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, 100)
.with(AsyncEngineConfig.RECORD_PROCESSING_THREADS, threadCount)
.with(AsyncEngineConfig.RECORD_PROCESSING_ORDER, processingOrder)
.build();
return new ConvertingAsyncEngineBuilderFactory()
.builder(KV_EVENT_FORMAT)
.using(config.asProperties())
.notifying(getRecordConsumer())
.using(this.getClass().getClassLoader())
.build();
}
}
@State(Scope.Thread)
public static class EmbeddedEnginePerfTest extends DebeziumEnginePerfTest {
public DebeziumEngine createEngine() {
Configuration config = Configuration.create()
.with(EmbeddedEngine.ENGINE_NAME, "embedded-engine")
.with(EmbeddedEngine.CONNECTOR_CLASS, PreComputedRecordsSourceConnector.class)
.with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath(OFFSET_FILE_NAME).toAbsolutePath())
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 3_600_000)
.build();
return new ConvertingEngineBuilderFactory()
.builder(KV_EVENT_FORMAT)
.using(config.asProperties())
.notifying(getRecordConsumer())
.using(this.getClass().getClassLoader())
.build();
}
}
@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 1, time = 1)
public void processRecordsAsyncEngine(AsyncEnginePerfTest test) throws InterruptedException {
test.finishLatch.await();
}
@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@Warmup(iterations = 1)
@Measurement(iterations = 1, time = 1)
public void processRecordsEmbeddedEngine(EmbeddedEnginePerfTest test) throws InterruptedException {
test.finishLatch.await();
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.performance.engine.connector;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import io.debezium.config.Configuration;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.util.Collect;
/**
* Simple source connector which produces limited number of pre-computed records.
* It's similar to {@link SimpleSourceConnector}, but adjusted for benchmarks so fixed amount of records is generated beforehand and only these pre-generated records
* are sent to the consumer.
*
* @author vjuranek
*/
public class PreComputedRecordsSourceConnector extends SourceConnector {
protected static final String VERSION = "1.0";
public static final String TOPIC_NAME = "simple.topic";
public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
public static final String BATCH_COUNT = "batch.count";
public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 2048;
public static final int DEFAULT_BATCH_COUNT = 100;
private Map<String, String> config;
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
config = props;
}
@Override
public Class<? extends Task> taskClass() {
return PreComputedRecordsTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
configs.add(config);
return configs;
}
@Override
public void stop() {
// do nothing
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
public static class PreComputedRecordsTask extends SourceTask {
// records has to be static variable, otherwise JMH count also the time for generating the records
private static List<SourceRecord> records = precomputeRecords(DEFAULT_BATCH_COUNT * DEFAULT_RECORD_COUNT_PER_BATCH);
private final AtomicBoolean running = new AtomicBoolean();
private int batchCount;
private int recordsPerBatch;
private int recordsFrom;
private int currentBatch;
private int recordsSent;
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
if (running.compareAndSet(false, true)) {
Configuration config = Configuration.from(props);
recordsPerBatch = config.getInteger(RECORD_COUNT_PER_BATCH, DEFAULT_RECORD_COUNT_PER_BATCH);
batchCount = config.getInteger(BATCH_COUNT, DEFAULT_BATCH_COUNT);
recordsFrom = 0;
currentBatch = 0;
recordsSent = 0;
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
if (running.get()) {
if ((recordsFrom + (currentBatch + 1) * recordsPerBatch) >= DEFAULT_BATCH_COUNT * DEFAULT_RECORD_COUNT_PER_BATCH) {
recordsFrom = 0;
currentBatch = 0;
}
int recordsTo = Math.min((currentBatch + 1) * recordsPerBatch, records.size() - recordsFrom) - 1;
if (recordsFrom > recordsTo) {
return null;
}
List<SourceRecord> batch = records.subList(recordsFrom, recordsTo);
currentBatch++;
recordsSent = recordsSent + batch.size();
recordsFrom = recordsTo;
return batch;
}
return null;
}
@Override
public void stop() {
running.set(false);
}
}
private static List<SourceRecord> precomputeRecords(int numberOfRecords) {
Schema keySchema = SchemaBuilder.struct()
.name("simple.key")
.field("id", Schema.INT32_SCHEMA)
.build();
Schema valueSchema = SchemaBuilder.struct()
.name("simple.value")
.field("name", Schema.STRING_SCHEMA)
.field("surname", Schema.STRING_SCHEMA)
.field("address", Schema.STRING_SCHEMA)
.field("batch", Schema.INT32_SCHEMA)
.field("record", Schema.INT32_SCHEMA)
.field("timestamp", Schema.OPTIONAL_INT64_SCHEMA)
.build();
List<SourceRecord> records = new LinkedList<>();
Random random = new Random();
long initialTimestamp = System.currentTimeMillis();
for (int recordNum = 0; recordNum != numberOfRecords; recordNum++) {
Struct key = new Struct(keySchema);
key.put("id", recordNum);
Struct value = new Struct(valueSchema);
value.put("name", randomString(random, 10));
value.put("surname", randomString(random, 20));
value.put("address", randomString(random, 30));
value.put("batch", 1);
value.put("record", recordNum + 1);
value.put("timestamp", initialTimestamp + recordNum);
SourceRecord record = new SourceRecord(Collect.hashMapOf("source", "simple"), Collect.hashMapOf("id", recordNum), TOPIC_NAME, 1, keySchema, key, valueSchema,
value);
records.add(record);
}
return records;
}
private static String randomString(Random random, int length) {
final int ASCII_CHAR_START = 97;
final int ASCII_CHAR_END = 123;
return random.ints(ASCII_CHAR_START, ASCII_CHAR_END).limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}