DBZ-7764 Use cache thread pool as the default for async. engine

This commit is contained in:
Vojtech Juranek 2024-04-10 14:34:52 +02:00 committed by Jiri Pechanec
parent 230076205f
commit a2eaddffcf
3 changed files with 13 additions and 5 deletions

View File

@ -148,7 +148,13 @@ private AsyncEmbeddedEngine(Properties config,
// Create thread pools for executing tasks and record pipelines.
taskService = Executors.newFixedThreadPool(this.config.getInteger(ConnectorConfig.TASKS_MAX_CONFIG, () -> 1));
recordService = Executors.newFixedThreadPool(computeRecordThreads(this.config.getString(AsyncEmbeddedEngine.RECORD_PROCESSING_THREADS)));
final String processingThreads = this.config.getString(AsyncEmbeddedEngine.RECORD_PROCESSING_THREADS);
if (processingThreads == null || processingThreads.isBlank()) {
recordService = Executors.newCachedThreadPool();
}
else {
recordService = Executors.newFixedThreadPool(computeRecordThreads(processingThreads));
}
// Validate provided config and prepare Kafka worker config needed for Kafka stuff, like e.g. OffsetStore.
if (!this.config.validateAndRecord(AsyncEngineConfig.CONNECTOR_FIELDS, LOGGER::error)) {

View File

@ -24,7 +24,9 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig {
*/
Field RECORD_PROCESSING_THREADS = Field.create("record.processing.threads")
.withDescription("The number of threads to be used for processing CDC records. If you want to use all available threads, you can use "
+ "'AVAILABLE_CORES' placeholder.");
+ "'AVAILABLE_CORES' placeholder. If the number of threads is not specified, the threads will be created as needed, using "
+ "Java 'Executors.newCachedThreadPool()' executor service.")
.withDefault(""); // We need to set some non-null value to avoid Kafka config validation failures.
/**
* An optional field that specifies maximum time in ms to wait for submitted records to finish processing when the task shut down is called.

View File

@ -528,9 +528,9 @@ The default is a periodic commity policy based upon time intervals.
|`record.processing.threads`
|The number of available cores.
|The number of threads to use for processing CDC records.
The default value is the number of available machine cores, with an upper limit of 16 threads.
If you want to use all available threads without any limitation, use the `AVAILABLE_CORES` placeholder.
|The number of threads to use for processing CDC records.
If you want to use all available cores on given machine, you can use the `AVAILABLE_CORES` placeholder.
If the value is not specified, threads are created as needed, using Java https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Executors.html#newCachedThreadPool()[cached thread pool].
|`record.processing.shutdown.timeout.ms`
|1000