From 8f0d5a8e661e39623f6dacd9fa04239484ad4931 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Thu, 18 Apr 2024 13:42:46 +0200 Subject: [PATCH] DBZ-7777 Add JMH benchmark for engine without converter --- .../engine/DebeziumEnginePerf.java | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 debezium-microbenchmark-engine/src/main/java/io/debezium/performance/engine/DebeziumEnginePerf.java diff --git a/debezium-microbenchmark-engine/src/main/java/io/debezium/performance/engine/DebeziumEnginePerf.java b/debezium-microbenchmark-engine/src/main/java/io/debezium/performance/engine/DebeziumEnginePerf.java new file mode 100644 index 000000000..bb3233510 --- /dev/null +++ b/debezium-microbenchmark-engine/src/main/java/io/debezium/performance/engine/DebeziumEnginePerf.java @@ -0,0 +1,102 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.performance.engine; + +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import io.debezium.config.Configuration; +import io.debezium.embedded.EmbeddedEngine; +import io.debezium.embedded.async.AsyncEngineConfig; +import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; +import io.debezium.performance.engine.connector.PreComputedRecordsSourceConnector; + +/** + * JMH benchmark focused on speed of record processing of given {@link DebeziumEngine} implementation not using any key/value converter. + */ +public class DebeziumEnginePerf { + + @State(Scope.Thread) + public static class AsyncEnginePerfTest extends AbstractDebeziumEnginePerf { + @Param({ "1", "2", "4", "8", "16" }) + public int threadCount; + + @Param({ "ORDERED", "UNORDERED" }) + public String processingOrder; + + public DebeziumEngine createEngine() { + Configuration config = Configuration.create() + .with(EmbeddedEngine.ENGINE_NAME, "async-engine") + .with(EmbeddedEngine.CONNECTOR_CLASS, PreComputedRecordsSourceConnector.class) + .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath(OFFSET_FILE_NAME).toAbsolutePath()) + .with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 3_600_000) + .with(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, 100) + .with(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, 100) + .with(AsyncEngineConfig.RECORD_PROCESSING_THREADS, threadCount) + .with(AsyncEngineConfig.RECORD_PROCESSING_ORDER, processingOrder) + .build(); + + return new ConvertingAsyncEngineBuilderFactory() + .builder((KeyValueHeaderChangeEventFormat) null) + .using(config.asProperties()) + .notifying(getRecordConsumer()) + .using(this.getClass().getClassLoader()) + .build(); + } + } + + @State(Scope.Thread) + public static class EmbeddedEnginePerfTest extends AbstractDebeziumEnginePerf { + + public DebeziumEngine createEngine() { + Configuration config = Configuration.create() + .with(EmbeddedEngine.ENGINE_NAME, "embedded-engine") + .with(EmbeddedEngine.CONNECTOR_CLASS, PreComputedRecordsSourceConnector.class) + .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath(OFFSET_FILE_NAME).toAbsolutePath()) + .with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 3_600_000) + .build(); + + return new EmbeddedEngine.EngineBuilder() + .using(config.asProperties()) + .notifying(getRecordConsumer()) + .using(this.getClass().getClassLoader()) + .build(); + } + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.SECONDS) + @Fork(value = 1) + @Warmup(iterations = 1) + @Measurement(iterations = 1, time = 1) + public void processRecordsAsyncEngine(AsyncEnginePerfTest test) throws InterruptedException { + test.finishLatch.await(); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.SECONDS) + @Fork(value = 1) + @Warmup(iterations = 1) + @Measurement(iterations = 1, time = 1) + public void processRecordsEmbeddedEngine(EmbeddedEnginePerfTest test) throws InterruptedException { + test.finishLatch.await(); + } +}