DBZ-7632 Apply suggestions from code review

Co-authored-by: roldanbob <broldan@redhat.com>
This commit is contained in:
Vojtěch Juránek 2024-03-28 22:29:49 +01:00
parent 44fb5d2788
commit 8a5dd40e9c

View File

@ -20,16 +20,16 @@ They still want the same data change events, but prefer to have the connectors s
This `debezium-api` module defines a small API that allows an application to easily configure and run {prodname} connectors using {prodname} Engine.
Implementation-wise, starting 2.6.0 release, {prodname} provides two implementations of `DebeziumEngine` interface.
Beginning with the 2.6.0 release, {prodname} provides two implementations of the `DebeziumEngine` interface.
The older one, `EmbeddedEngine`, processes all records sequentially and allows to run on one task for given connector.
This is implementation is used by default.
Since 2.6.0 release, new implementation, `AsyncEmbeddedEngine`, is available.
This implementation allows to process records in multiple threads and also allows to run multiple tasks if the connector supports it (currently only connectors for SQL server and Mongo allows multiple tasks to be run withing single connector).
As both engines implements the same interface, both have the same API and code examples provided bellow are valid for both of them.
Both implementations also supports same configuration options.
However, new `AsyncEmbeddedEngine` added couple of additional configuration options for setting up and fine-tuning parallel processing.
These options are describe in xref:{link-engine}#async-engine-properties[the dedicated subsection].
To learn more about motivation for `AsyncEmbeddedEngine` and its implementation details, you can check https://github.com/debezium/debezium-design-documents/blob/main/DDD-7.md[design document for Asynchronous Embedded Engine].
This the default implementation.
Beginning with the 2.6.0 release, a new `AsyncEmbeddedEngine` implementation is available.
This implementation processes records in multiple threads, and can run multiple tasks, if the connector supports it (currently only the connectors for SQL Server and MongoDB support running multiple tasks within a single connector).
Because both of these engines implement the same interface and share the same API, the code examples that follow are valid for either engine.
Both implementations also support the same configuration options.
However, the new `AsyncEmbeddedEngine` provides a couple of new configuration options for setting up and fine-tuning parallel processing.
For information about these new configuration options, see the xref:{link-engine}#async-engine-properties[Asynchronous Engine Properties].
To learn more about the motivation behind development of the `AsyncEmbeddedEngine` and about its implementation details, see the https://github.com/debezium/debezium-design-documents/blob/main/DDD-7.md[Asynchronous Embedded Engine design document].
== Dependencies
@ -253,19 +253,20 @@ If your application needs to wait for the engine to completely stop before exiti
Alternatively you can register `CompletionCallback` while creating `DebeziumEngine` as a callback to be informed when the engine terminates.
Recall that when the JVM shuts down, it only waits for non-daemon threads.
Therefore, if your application exits, be sure to wait for completion of the engine when you run the engine on a daemon thread.
Therefore, when you run the engine on a daemon thread, if your application exits, be sure to wait for the engine process to complete.
Your application should always properly stop the engine to ensure graceful and complete shutdown and that each source record is sent to the application exactly one time.
For example, do not rely upon shutting down the `ExecutorService`, since that interrupts the running threads.
Although the `DebeziumEngine` will indeed terminate when its thread is interrupted, the engine may not terminate cleanly, and when your application is restarted it may see some of the same source records that it had processed just prior to the shutdown.
As mention above, there are two implementations of `DebeziumEngine` interface.
Both use the same API and the code sample above are valid for both of them.
The only exception is creation of the `DebeziumEngine` instance.
As also mentioned in the intorduction, by default `EmbeddedEgine` is used.
Therefore `DebeziumEngine.create(Json.class)` would result in using `EmbeddedEngine` instance under the hood.
If you want to use new `AsyncEmbeddedEngine` instance instead, you have to use `DebeziumEngine#create(KeyValueHeaderChangeEventFormat<K, V, H> format, String builderFactory)` method.
E.g. to create embedded engine which uses `AsyncEmbeddedEngine` under the hood and uses JSON as a key, value and header format, you need to use following code:
As mentioned earlier, there are two implementations of the `DebeziumEngine` interface.
The two implementations use the same API and the preceding code sample is valid for both versions.
The only exception is the creation of the `DebeziumEngine` instance.
As was also mentioned in the introduction, by default, the `EmbeddedEngine` implementation is used.
Therefore, the method `DebeziumEngine.create(Json.class)` results internally in the use of the `EmbeddedEngine` instance.
If you want to use new `AsyncEmbeddedEngine` instance instead, use the following method:
`DebeziumEngine#create(KeyValueHeaderChangeEventFormat<K, V, H> format, String builderFactory)`
For example, to create an embedded engine that uses the `AsyncEmbeddedEngine`, and uses JSON as its key, value, and header format, you would use following code:
[source,java,indent=0]
----
@ -523,32 +524,33 @@ The default is a periodic commity policy based upon time intervals.
|Description
|`record.processing.threads`
|`#` of availabel cores
|The number of threads to be used for processing CDC records. The default is number of available machine cores with upper limit of 16 threads.
If you want to use all available thread without any limitation, use `AVAILABLE_CORES` placeholder.
|The number of available cores.
|The number of threads to use for processing CDC records.
The default is the number of available machine cores, with an upper limit of 16 threads.
If you want to use all available threads without any limitation, use the `AVAILABLE_CORES` placeholder.
|`record.processing.shutdown.timeout.ms`
|1000
|Maximum time in milliseconds to wait for processing submitted records when task shutdown is called.
|Maximum time in milliseconds to wait for processing submitted records after a task shutdown is called.
|`record.processing.order`
|`ORDERED`
|Determines how the records should be produced.
* `ORDERED` means sequential processing, i.e. that the records are produced in the same order as they were obtained from the database.
* `UNORDERED` means non-sequential processing, i.e. the records can be produced in a different order than the original one.
`ORDERED`:: Records are processed sequentially; that is, they are produced in the order in which they were obtained from the database.
`UNORDERED`:: Records are processed non-sequentially; that is, they can be produced in an different order than in the source database.
Non-sequential approach gives better throughput, as the records are produced immediately once the SMTs and serialization of the message is done, without waiting of other records.
This option doesn't have any effect when ChangeConsumer is provided to the engine.
The non-sequential processing of the `UNORDERED` option results in better throughput, because records are produced immediately after any SMT processing and message serialization is complete, without waiting for other records.
This option doesn't have any effect when the `ChangeConsumer` method is provided to the engine.
|`record.processing.with.serial.consumer`
|`false`
|Specifies whether the default ChangeConsumer should be created from provided Consumer, resulting in serial Consumer processing.
This option has no effect if the ChangeConsumer is already provided to the engine via configuration.
|Specifies whether the default `ChangeConsumer` should be created from the provided `Consumer`, resulting in serial `Consumer` processing.
This option has no effect if the `ChangeConsumer` is already provided to the engine via configuration.
|`task.management.timeout.ms`
|180,000 (3 min)
|Time to wait for task's lifecycle management operations (starting and stopping), given in milliseconds.
|Time, in milliseconds, that the engine waits for a task's lifecycle management operations (starting and stopping) to complete.
|===
[[database-history-properties]]