From bb9bef915b1c487ac16eaa0951c4f2ae921d5b66 Mon Sep 17 00:00:00 2001 From: Vojtech Juranek Date: Thu, 4 Apr 2024 14:24:00 +0200 Subject: [PATCH] DBZ-7633 Add Postgres end-to-end JMH test for Debezium engine Example output on my localhost: Benchmark (eventCount) Mode Cnt Score Error Units PostgresEndToEndPerf.processRecordsAsyncEngine 1000 ss 0.750 s/op PostgresEndToEndPerf.processRecordsAsyncEngine 10000 ss 1.000 s/op PostgresEndToEndPerf.processRecordsAsyncEngine 100000 ss 2.248 s/op PostgresEndToEndPerf.processRecordsAsyncEngine 1000000 ss 14.941 s/op PostgresEndToEndPerf.processRecordsEmbeddedEngine 1000 ss 0.799 s/op PostgresEndToEndPerf.processRecordsEmbeddedEngine 10000 ss 0.981 s/op PostgresEndToEndPerf.processRecordsEmbeddedEngine 100000 ss 2.636 s/op PostgresEndToEndPerf.processRecordsEmbeddedEngine 1000000 ss 19.919 s/op --- debezium-microbenchmark-embedded/pom.xml | 37 +- .../embedded/PostgresEndToEndPerf.java | 367 ++++++++++++++++++ 2 files changed, 393 insertions(+), 11 deletions(-) create mode 100644 debezium-microbenchmark-embedded/src/main/java/io/debezium/performance/embedded/PostgresEndToEndPerf.java diff --git a/debezium-microbenchmark-embedded/pom.xml b/debezium-microbenchmark-embedded/pom.xml index e59b21967..77060a4d8 100644 --- a/debezium-microbenchmark-embedded/pom.xml +++ b/debezium-microbenchmark-embedded/pom.xml @@ -3,7 +3,7 @@ io.debezium debezium-parent - 2.6.0-SNAPSHOT + 2.7.0-SNAPSHOT ../debezium-parent/pom.xml 4.0.0 @@ -14,14 +14,9 @@ - io.debezium - debezium-microbenchmark - ${project.version} - - - io.debezium - debezium-embedded - ${project.version} + org.slf4j + slf4j-simple + ${version.org.slf4j} @@ -29,11 +24,31 @@ io.debezium - debezium-microbenchmark + debezium-embedded io.debezium - debezium-embedded + debezium-connector-postgres + + + org.openjdk.jmh + jmh-core + + + org.openjdk.jmh + jmh-generator-annprocess + + + org.awaitility + awaitility + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-simple diff --git a/debezium-microbenchmark-embedded/src/main/java/io/debezium/performance/embedded/PostgresEndToEndPerf.java b/debezium-microbenchmark-embedded/src/main/java/io/debezium/performance/embedded/PostgresEndToEndPerf.java new file mode 100644 index 000000000..db52eb644 --- /dev/null +++ b/debezium-microbenchmark-embedded/src/main/java/io/debezium/performance/embedded/PostgresEndToEndPerf.java @@ -0,0 +1,367 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.performance.embedded; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import javax.management.InstanceNotFoundException; +import javax.management.JMException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.awaitility.Awaitility; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnector; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.embedded.ConvertingEngineBuilderFactory; +import io.debezium.embedded.EmbeddedEngineChangeEvent; +import io.debezium.embedded.EmbeddedEngineConfig; +import io.debezium.embedded.async.AsyncEngineConfig; +import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.util.IoUtil; + +/** + * Basic end-to-end comparison between {@link io.debezium.embedded.EmbeddedEngine} and {@link io.debezium.embedded.async.AsyncEmbeddedEngine}. + * Heavily inspired by JMH benchmark {@code EndToEndPerf} for Oracle connector and reusing parts of its code, + * this test pre-creates specified number of records in the Postgres database and measure how long it takes + * to the engine to process them and store in an in-memory queue. + */ +public class PostgresEndToEndPerf { + + private static final String HOST = "localhost"; + private static final int PORT = 5432; + private static final String USER = "postgres"; + private static final String PASSWORD = "postgres"; + private static final String DATABASE = "postgres"; + private static final String SERVER_NAME = "server1"; + private static final String BASE_TABLE_NAME = "inventory.test"; + private static final KeyValueChangeEventFormat KV_EVENT_FORMAT = KeyValueChangeEventFormat.of(Json.class, Json.class); + + @State(Scope.Thread) + public abstract static class DebeziumEndToEndPerfTest { + + private DebeziumEngine engine; + private ExecutorService executors; + protected BlockingQueue consumedLines; + protected AtomicInteger count = new AtomicInteger(0); + + @Param({ "1000", "10000", "100000", "1000000" }) + public int eventCount; + + public abstract String getBaseTableName(); + + public abstract DebeziumEngine createEngine(); + + @Setup(Level.Iteration) + public void doSetup() { + final String tableName = getBaseTableName() + "_" + eventCount; + + // delete offset and re-create table if it already exists + delete("offsets.txt"); + recreateTable(tableName); + consumedLines = new ArrayBlockingQueue<>(eventCount); + + // create engine and start it + this.engine = createEngine(); + executors = Executors.newFixedThreadPool(1); + executors.execute(engine); + + // wait for the connector to transition to streaming + waitForStreamingToStart(); + + // insert records & commit as one transaction + createDmlEvents(tableName, eventCount); + } + + @TearDown(Level.Iteration) + public void doCleanup() throws Exception { + try { + if (engine != null) { + engine.close(); + } + if (executors != null) { + executors.shutdown(); + try { + executors.awaitTermination(CommonConnectorConfig.EXECUTOR_SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + finally { + executors.shutdownNow(); + engine = null; + executors = null; + } + } + } + + private static JdbcConfiguration defaultJdbcConfig() { + return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) + .withDefault(JdbcConfiguration.HOSTNAME, HOST) + .withDefault(JdbcConfiguration.PORT, PORT) + .withDefault(JdbcConfiguration.USER, USER) + .withDefault(JdbcConfiguration.PASSWORD, PASSWORD) + .withDefault(JdbcConfiguration.DATABASE, DATABASE) + .build(); + } + + private static Configuration.Builder defaultConnectorConfig() { + JdbcConfiguration jdbcConfiguration = defaultJdbcConfig(); + Configuration.Builder builder = Configuration.create(); + jdbcConfiguration.forEach((f, v) -> builder.with(CommonConnectorConfig.DATABASE_CONFIG_PREFIX + f, v)); + + return builder.with(CommonConnectorConfig.TOPIC_PREFIX, SERVER_NAME) + .with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) + .with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true) + .with(EmbeddedEngineConfig.ENGINE_NAME, "benchmark") + .with(EmbeddedEngineConfig.CONNECTOR_CLASS, PostgresConnector.class) + .with(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, getPath("offsets.txt").toAbsolutePath()) + .with(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS, 0); + } + + private static Properties addSmtConfig(Configuration config) { + final Properties configProps = config.asProperties(); + configProps.setProperty("transforms", "replace"); + configProps.setProperty("transforms.replace.type", "org.apache.kafka.connect.transforms.ReplaceField$Value"); + configProps.setProperty("transforms.replace.renames", "name:transformed_name"); + configProps.setProperty("transforms.replace.exclude", "id"); + return configProps; + } + + private static Consumer> getRecordConsumer(BlockingQueue consumedLines) { + return record -> { + if (Thread.currentThread().isInterrupted()) { + return; + } + while (!consumedLines.offer((EmbeddedEngineChangeEvent) record)) { + if (Thread.currentThread().isInterrupted()) { + return; + } + } + }; + } + + private static void recreateTable(String tableName) { + PostgresConnection connection = getTestConnection(); + try { + connection.execute("DROP TABLE IF EXISTS " + tableName); + } + catch (SQLException e) { + e.printStackTrace(); + } + try { + connection.execute("CREATE TABLE " + tableName + " (id numeric(9,0) primary key, name varchar(50))"); + } + catch (SQLException e) { + throw new RuntimeException("Failed to create table", e); + } + } + + private static void createDmlEvents(String tableName, int eventCount) { + PostgresConnection connection = getTestConnection(); + try { + for (int i = 0; i < eventCount; i++) { + StringBuilder dml = new StringBuilder("INSERT INTO " + tableName + " (id, name) values ("); + dml.append(i).append(",").append("'Test").append(i).append("')"); + connection.executeWithoutCommitting(dml.toString()); + } + connection.commit(); + } + catch (SQLException e) { + throw new RuntimeException("Failed to insert data set", e); + } + } + + private static PostgresConnection getTestConnection() { + PostgresConnection connection = new PostgresConnection(defaultJdbcConfig(), "test_connection"); + try { + connection.setAutoCommit(false); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + return connection; + } + + private static void waitForStreamingToStart() { + Awaitility.await() + .alias("Streaming was not started on time") + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .ignoreException(InstanceNotFoundException.class) + .until(() -> { + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + try { + return (boolean) server.getAttribute(getMbeanName(), "Connected"); + } + catch (JMException ignored) { + } + return false; + }); + } + + private static ObjectName getMbeanName() throws MalformedObjectNameException { + return new ObjectName("debezium.postgres:type=connector-metrics,context=streaming,server=" + SERVER_NAME); + } + + private static Path getPath(String relativePath) { + return Paths.get(resolveDataDir(), relativePath).toAbsolutePath(); + } + + private static void delete(String relativePath) { + Path history = getPath(relativePath).toAbsolutePath(); + if (history != null) { + history = history.toAbsolutePath(); + if (inTestDataDir(history)) { + try { + IoUtil.delete(history); + } + catch (IOException e) { + // ignored + } + } + } + } + + private static boolean inTestDataDir(Path path) { + Path target = FileSystems.getDefault().getPath(resolveDataDir()).toAbsolutePath(); + return path.toAbsolutePath().startsWith(target); + } + + private static String resolveDataDir() { + String value = System.getProperty("dbz.test.data.dir"); + if (value != null && (value = value.trim()).length() > 0) { + return value; + } + + value = System.getenv("DBZ_TEST_DATA_DIR"); + if (value != null && (value = value.trim()).length() > 0) { + return value; + } + + return "/tmp"; + } + + @State(Scope.Thread) + public static class EmbeddedEngineEndToEndPerfTest extends DebeziumEndToEndPerfTest { + public String getBaseTableName() { + return BASE_TABLE_NAME + "_embedded"; + } + + public DebeziumEngine createEngine() { + Configuration config = defaultConnectorConfig() + .with(PostgresConnectorConfig.SLOT_NAME, "embedded_" + eventCount) + // .with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, CommonConnectorConfig.EXECUTOR_SHUTDOWN_TIMEOUT_SEC) + .build(); + Properties configProps = addSmtConfig(config); + + return new ConvertingEngineBuilderFactory() + .builder(KV_EVENT_FORMAT) + .using(configProps) + .notifying(getRecordConsumer(consumedLines)) + .using(this.getClass().getClassLoader()) + .build(); + } + } + + @State(Scope.Thread) + public static class AsyncEngineEndToEndPerfTest extends DebeziumEndToEndPerfTest { + public String getBaseTableName() { + return BASE_TABLE_NAME + "_async"; + } + + public DebeziumEngine createEngine() { + Configuration config = defaultConnectorConfig() + .with(PostgresConnectorConfig.SLOT_NAME, "async_" + eventCount) + // .with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, CommonConnectorConfig.EXECUTOR_SHUTDOWN_TIMEOUT_SEC) + .with(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, 100) + .with(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, 5000) + // .with(AsyncEngineConfig.RECORD_PROCESSING_THREADS, 1) + // .with(AsyncEngineConfig.RECORD_PROCESSING_ORDER, "UNORDERED") + .build(); + Properties configProps = addSmtConfig(config); + + return new ConvertingAsyncEngineBuilderFactory() + // new ConvertingEngineBuilderFactory() + .builder(KV_EVENT_FORMAT) + .using(configProps) + .notifying(getRecordConsumer(consumedLines)) + .using(this.getClass().getClassLoader()) + .build(); + } + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.SECONDS) + @Fork(value = 1) + @Warmup(iterations = 1) + @Measurement(iterations = 1, time = 1) + public void processRecordsEmbeddedEngine(EmbeddedEngineEndToEndPerfTest state) { + List records = new ArrayList<>(); + while (records.size() < state.eventCount) { + List temp = new ArrayList<>(); + state.consumedLines.drainTo(temp); + records.addAll(temp); + } + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + @OutputTimeUnit(TimeUnit.SECONDS) + @Fork(value = 1) + @Warmup(iterations = 1) + @Measurement(iterations = 1, time = 1) + public void processRecordsAsyncEngine(AsyncEngineEndToEndPerfTest state) { + List records = new ArrayList<>(); + while (records.size() < state.eventCount) { + List temp = new ArrayList<>(); + state.consumedLines.drainTo(temp); + records.addAll(temp); + } + } +}