DBZ-7024 Add option for creating default ChangeConsumer

This option effective allowes the user to request serial processing of
the records byt the provided Consumer.
This commit is contained in:
Vojtech Juranek 2024-02-06 15:59:11 +01:00 committed by Jiri Pechanec
parent cc5f7aedd1
commit 0f7d3100b4
2 changed files with 21 additions and 2 deletions

View File

@ -702,6 +702,9 @@ public static final class AsyncEngineBuilder implements DebeziumEngine.Builder<S
@Override @Override
public Builder<SourceRecord> notifying(final Consumer<SourceRecord> consumer) { public Builder<SourceRecord> notifying(final Consumer<SourceRecord> consumer) {
this.consumer = consumer; this.consumer = consumer;
if (config.contains(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name())) {
this.handler = buildDefaultChangeConsumer(consumer);
}
return this; return this;
} }

View File

@ -45,15 +45,30 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig {
* An optional field that specifies how the records will be produced. Sequential processing (the default) means that the records will be produced in the same order * An optional field that specifies how the records will be produced. Sequential processing (the default) means that the records will be produced in the same order
* as the engine obtained them from the connector. Non-sequential processing means that the records can be produced in arbitrary order, typically once the record is * as the engine obtained them from the connector. Non-sequential processing means that the records can be produced in arbitrary order, typically once the record is
* transformed and/or serialized. * transformed and/or serialized.
* This option doesn't have any effect when {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} is provided to the engine. In such case the records are always
* processed sequentially.
*/ */
Field RECORD_PROCESSING_SEQUENTIALLY = Field.create("record.processing.sequentially") Field RECORD_PROCESSING_SEQUENTIALLY = Field.create("record.processing.sequentially")
.withDescription("Determines how the records should be produced. Sequential processing means (setting to `true`, the default) that the records are " .withDescription("Determines how the records should be produced. Sequential processing means (setting to `true`, the default) that the records are "
+ "produced in the same order as they were obtained from the database. Non-sequential processing means that the records can be produced in a different " + "produced in the same order as they were obtained from the database. Non-sequential processing means that the records can be produced in a different "
+ "order than the original one. Non-sequential approach gives better throughput, as the records are produced immediately once the SMTs and serialization of " + "order than the original one. Non-sequential approach gives better throughput, as the records are produced immediately once the SMTs and serialization of "
+ "the message is done, without waiting of other records.") + "the message is done, without waiting of other records. This option doesn't have any effect when ChangeConsumer is provided to the engine.")
.withDefault(true) .withDefault(true)
.withValidation(Field::isBoolean); .withValidation(Field::isBoolean);
/**
* An optional field that specifies if default {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} should be created for consuming records or not.
* The main effect of this option is that it when default {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} is created, engine will select different
* {@link io.debezium.embedded.AsyncEmbeddedEngine.RecordProcessor} and provided {@link java.util.function.Consumer} will always process records serially.
* will process the records sequentially. Only SMTs will be run in parallel in this case.
* This option doesn't have any effect when {@link io.debezium.engine.DebeziumEngine.ChangeConsumer} is provided to the engine in the configuration.
*/
Field RECORD_PROCESSING_WITH_SERIAL_CONSUMER = Field.create("record.processing.with.serial.consumer")
.withDescription("Specifies whether the default ChangeConsumer should be created from provided Consumer, resulting in serial Consumer processing. "
+ "This option has no effect if the ChangeConsumer is already provided to the engine via configuration.")
.withDefault(false)
.withValidation(Field::isBoolean);
/** /**
* The array of all exposed fields. * The array of all exposed fields.
*/ */
@ -61,5 +76,6 @@ public interface AsyncEngineConfig extends EmbeddedEngineConfig {
TASK_MANAGEMENT_TIMEOUT_MS, TASK_MANAGEMENT_TIMEOUT_MS,
RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS, RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS,
RECORD_PROCESSING_THREADS, RECORD_PROCESSING_THREADS,
RECORD_PROCESSING_SEQUENTIALLY); RECORD_PROCESSING_SEQUENTIALLY,
RECORD_PROCESSING_WITH_SERIAL_CONSUMER);
} }