DBZ-7632 Add documentation for AsyncEmbeddedEngine

This commit is contained in:
Vojtech Juranek 2024-03-28 13:59:11 +01:00 committed by Vojtěch Juránek
parent 55617368b2
commit 9a630b3611

View File

@ -20,6 +20,17 @@ They still want the same data change events, but prefer to have the connectors s
This `debezium-api` module defines a small API that allows an application to easily configure and run {prodname} connectors using {prodname} Engine.
Implementation-wise, starting 2.6.0 release, {prodname} provides two implementations of `DebeziumEngine` interface.
The older one, `EmbeddedEngine`, processes all records sequentially and allows to run on one task for given connector.
This is implementation is used by default.
Since 2.6.0 release, new implementation, `AsyncEmbeddedEngine`, is available.
This implementation allows to process records in multiple threads and also allows to run multiple tasks if the connector supports it (currently only connectors for SQL server and Mongo allows multiple tasks to be run withing single connector).
As both engines implements the same interface, both have the same API and code examples provided bellow are valid for both of them.
Both implementations also supports same configuration options.
However, new `AsyncEmbeddedEngine` added couple of additional configuration options for setting up and fine-tuning parallel processing.
These options are describe in xref:{link-engine}#async-engine-properties[the dedicated subsection].
To learn more about motivation for `AsyncEmbeddedEngine` and its implementation details, you can check https://github.com/debezium/debezium-design-documents/blob/main/DDD-7.md[design document for Asynchronous Embedded Engine].
== Dependencies
To use {prodname} Engine module, add the `debezium-api` module to your application's dependencies.
@ -248,6 +259,32 @@ Your application should always properly stop the engine to ensure graceful and c
For example, do not rely upon shutting down the `ExecutorService`, since that interrupts the running threads.
Although the `DebeziumEngine` will indeed terminate when its thread is interrupted, the engine may not terminate cleanly, and when your application is restarted it may see some of the same source records that it had processed just prior to the shutdown.
As mention above, there are two implementations of `DebeziumEngine` interface.
Both use the same API and the code sample above are valid for both of them.
The only exception is creation of the `DebeziumEngine` instance.
As also mentioned in the intorduction, by default `EmbeddedEgine` is used.
Therefore `DebeziumEngine.create(Json.class)` would result in using `EmbeddedEngine` instance under the hood.
If you want to use new `AsyncEmbeddedEngine` instance instead, you have to use `DebeziumEngine#create(KeyValueHeaderChangeEventFormat<K, V, H> format, String builderFactory)` method.
E.g. to create embedded engine which uses `AsyncEmbeddedEngine` under the hood and uses JSON as a key, value and header format, you need to use following code:
[source,java,indent=0]
----
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine
.create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),
"io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory")
.using(props)
.notifying(record -> {
System.out.println(record);
}).build()
) {
// Run also the engine istself asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
// Do something else or wait for a signal or an event
}
----
[[engine-output-message-formats]]
== Output Message Formats
@ -476,6 +513,44 @@ The default is a periodic commity policy based upon time intervals.
|Max delay (in ms) between retries when encountering connection errors.
|===
[[async-engine-properties]]
=== Asynchronous Engine Properties
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|`record.processing.threads`
|`#` of availabel cores
|The number of threads to be used for processing CDC records. The default is number of available machine cores with upper limit of 16 threads.
If you want to use all available thread without any limitation, use `AVAILABLE_CORES` placeholder.
|`record.processing.shutdown.timeout.ms`
|1000
|Maximum time in milliseconds to wait for processing submitted records when task shutdown is called.
|`record.processing.order`
|`ORDERED`
|Determines how the records should be produced.
* `ORDERED` means sequential processing, i.e. that the records are produced in the same order as they were obtained from the database.
* `UNORDERED` means non-sequential processing, i.e. the records can be produced in a different order than the original one.
Non-sequential approach gives better throughput, as the records are produced immediately once the SMTs and serialization of the message is done, without waiting of other records.
This option doesn't have any effect when ChangeConsumer is provided to the engine.
|`record.processing.with.serial.consumer`
|`false`
|Specifies whether the default ChangeConsumer should be created from provided Consumer, resulting in serial Consumer processing.
This option has no effect if the ChangeConsumer is already provided to the engine via configuration.
|`task.management.timeout.ms`
|180,000 (3 min)
|Time to wait for task's lifecycle management operations (starting and stopping), given in milliseconds.
|===
[[database-history-properties]]
=== Database schema history properties