diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java index da91c7517..f9b7a6af0 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEmbeddedEngine.java @@ -148,7 +148,13 @@ private AsyncEmbeddedEngine(Properties config, // Create thread pools for executing tasks and record pipelines. taskService = Executors.newFixedThreadPool(this.config.getInteger(ConnectorConfig.TASKS_MAX_CONFIG, () -> 1)); - recordService = Executors.newFixedThreadPool(computeRecordThreads(this.config.getString(AsyncEmbeddedEngine.RECORD_PROCESSING_THREADS))); + final String processingThreads = this.config.getString(AsyncEmbeddedEngine.RECORD_PROCESSING_THREADS); + if (processingThreads == null || processingThreads.isBlank()) { + recordService = Executors.newCachedThreadPool(); + } + else { + recordService = Executors.newFixedThreadPool(computeRecordThreads(processingThreads)); + } // Validate provided config and prepare Kafka worker config needed for Kafka stuff, like e.g. OffsetStore. if (!this.config.validateAndRecord(AsyncEngineConfig.CONNECTOR_FIELDS, LOGGER::error)) { diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineConfig.java b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineConfig.java index 875dc13a8..24cf289a7 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineConfig.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/async/AsyncEngineConfig.java @@ -24,7 +24,9 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig { */ Field RECORD_PROCESSING_THREADS = Field.create("record.processing.threads") .withDescription("The number of threads to be used for processing CDC records. If you want to use all available threads, you can use " - + "'AVAILABLE_CORES' placeholder."); + + "'AVAILABLE_CORES' placeholder. If the number of threads is not specified, the threads will be created as needed, using " + + "Java 'Executors.newCachedThreadPool()' executor service.") + .withDefault(""); // We need to set some non-null value to avoid Kafka config validation failures. /** * An optional field that specifies maximum time in ms to wait for submitted records to finish processing when the task shut down is called. diff --git a/documentation/modules/ROOT/pages/development/engine.adoc b/documentation/modules/ROOT/pages/development/engine.adoc index 55bd7965d..97ff1d88d 100644 --- a/documentation/modules/ROOT/pages/development/engine.adoc +++ b/documentation/modules/ROOT/pages/development/engine.adoc @@ -528,9 +528,9 @@ The default is a periodic commity policy based upon time intervals. |`record.processing.threads` |The number of available cores. -|The number of threads to use for processing CDC records. -The default value is the number of available machine cores, with an upper limit of 16 threads. -If you want to use all available threads without any limitation, use the `AVAILABLE_CORES` placeholder. +|The number of threads to use for processing CDC records. +If you want to use all available cores on given machine, you can use the `AVAILABLE_CORES` placeholder. +If the value is not specified, threads are created as needed, using Java https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Executors.html#newCachedThreadPool()[cached thread pool]. |`record.processing.shutdown.timeout.ms` |1000