tet123/documentation/modules/ROOT/pages/operations/debezium-server.adoc
akula cd4c6958bd DBZ-7512 Support arbitrary payloads with outbox event router on
debezium server

1. Support for string and binary serialization formats on debezium api.
2. Allow configuring separate key and value formats on embedded engine.

This change fixes the following issue using outbox event router on
embedded engine:

Outbox event router supports arbitrary payload formats with
BinaryDataConverter as the value.converter which passes payload
transparently. However this is  currently not supported with the
embedded engine which handles message conversion using value.format to
specify the format.

In addition, when we want to pass payload transparently, it makes
sense to also pass aggregateid i.e. the event key transparently. The
default outbox table configuration specifies aggregateid as a
varchar which is also not supported by embedded engine.
2024-03-01 08:23:47 +01:00

1620 lines
81 KiB
Plaintext

[id="debezium-server"]
= {prodname} Server
:linkattrs:
:icons: font
:toc:
:toclevels: 3
:toc-placement: macro
toc::[]
[NOTE]
====
Please let us know if you encounter any problems while using this feature.
Also please reach out if you have requirements for specific sinks to be supported by {prodname} Server or even would be interested in contributing the required implementation.
====
{prodname} provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar, Redis (Stream), or NATS JetStream.
For streaming change events to Apache Kafka, it is recommended to deploy the {prodname} connectors via Kafka Connect.
== Installation
To install the server download and unpack the server distribution archive:
ifeval::['{page-version}' == 'master']
* {link-server-snapshot}[{prodname} Server distribution]
NOTE: The above links refers to the nightly snapshot build of the {prodname} main branch.
If you are looking for a non-snapshot version, please select the appropriate version of this documentation in the menu to the right.
endif::[]
ifeval::['{page-version}' != 'master']
* https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/{debezium-version}/debezium-server-dist-{debezium-version}.tar.gz[{prodname} Server distribution]
endif::[]
A directory named `debezium-server` will be created with these contents:
[source,subs="verbatim,attributes"]
----
debezium-server/
|-- CHANGELOG.md
|-- conf
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-{debezium-version}-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh
----
The server is started using `run.sh` script, dependencies are stored in the `lib` directory, and the directory `conf` contains configuration files.
[NOTE]
====
In case of using the Oracle connector you will have to add to the `lib` directory the ORACLE JDBC driver (if using XStream also the XStream API files),
explained here: xref:{link-oracle-connector}#obtaining-oracle-jdbc-driver-and-xstreams-api-files[Obtaining the Oracle JDBC driver and XStream API files]
====
== Configuration
{prodname} Server uses https://github.com/eclipse/microprofile-config[MicroProfile Configuration] for configuration.
This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.
The main configuration file is _conf/application.properties_.
There are multiple sections configured:
* `debezium.source` is for source connector configuration; each instance of {prodname} Server runs exactly one connector
* `debezium.sink` is for the sink system configuration
* `debezium.format` is for the output serialization format configuration
* `debezium.transforms` is for the configuration of message transformations
* `debezium.predicates` is for the configuration of message transformation predicates
An example configuration file can look like so:
----
debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
----
In this configuration file example:
* The sink is setup for AWS Kinesis in region `eu-central-1`
* The source connector is setup for PostgreSQL using the default {prodname} https://github.com/debezium/postgres-decoderbufs[decoderbufs] plugin.
If using PostgreSQL's built-in `pgoutput` plugin, set `debezium.source.plugin.name=pgoutput`
* The source connector is set to capture events from a schema named `inventory`.
If you want to capture all changes in the database, remove this line.
Otherwise, update this line to correspond to your preferred schema or tables.
* The source offset will be stored in a file named `offsets.dat` in the `data` directory.
Note that you might need to create this directory to prevent an error on startup.
When the server is started it generates a seqeunce of log messages like this:
----
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) topic.prefix = tutorial
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.include.list = inventory
2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
----
[id="debezium-source-configuration-properties"]
=== Source configuration
The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with `debezium.source` prefix), together with few more specific ones, necessary for running outside of Kafka Connect:
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[debezium-source-connector-class]]<<debezium-source-connector-class, `debezium.source.connector.class`>>
|
|The name of the Java class implementing the source connector.
|[[debezium-source-offset-storage]]<<debezium-source-offset-storage, `debezium.source.offset.storage`>>
|`org.apache.kafka.connect.storage.FileOffsetBackingStore`
|Class to use for storing and retrieving offsets for non-Kafka deployments.
To use Redis to store offsets, use `io.debezium.storage.redis.offset.RedisOffsetBackingStore`
|[[debezium-source-offset-storage-file-filename]]<<debezium-source-offset-storage-file-filename, `debezium.source.offset.storage.file.filename`>>
|
|If using a file offset store (default), the file in which connector offsets are stored for non-Kafka deployments.
|[[debezium-source-offset-flush-interval-ms]]<<debezium-source-offset-flush-interval-ms, `debezium.source.offset.flush.interval.ms`>>
|
|Defines how frequently the offsets are flushed into the file.
|[[debezium-source-offset-redis-address]]<<debezium-source-offset-redis-address, `debezium.source.offset.storage.redis.address`>>
|
|(Optional) If using Redis to store offsets, an address, formatted as `host:port`, at which the Redis target streams are provided. If not supplied, will attempt to read `debezium.sink.redis.address`
|[[debezium-source-offset-redis-user]]<<debezium-source-offset-redis-user, `debezium.source.offset.storage.redis.user`>>
|
|(Optional) If using Redis to store offsets, a user name used to communicate with Redis. If the `redis.address` configuration is not supplied, and the `redis.address` is taken from the Redis sink, will attempt to load the value from `debezium.sink.redis.user`
|[[debezium-source-offset-redis-password]]<<debezium-source-offset-redis-password, `debezium.source.offset.storage.redis.password`>>
|
|(Optional) If using Redis to store offsets, a password (of respective user) used to communicate with Redis. A password must be set if a user is set. If the `redis.address` configuration is not supplied, and the `redis.address` is taken from the Redis sink, will attempt to load the value from `debezium.sink.redis.password`
|[[debezium-source-offset-redis-ssl-enabled]]<<debezium-source-offset-redis-ssl-enabled, `debezium.source.offset.storage.redis.ssl.enabled`>>
|
|(Optional) If using Redis to store offsets, whether or not to use SSL to communicate with Redis. If the `redis.address` configuration is not supplied, and the `redis.address` is taken from the Redis sink, will attempt to load the value from `debezium.sink.redis.ssl.enabled`. Default is 'false'
|[[debezium-source-offset-redis-key]]<<debezium-source-offset-redis-key, `debezium.source.offset.storage.redis.key`>>
|
|(Optional) If using Redis to store offsets, define the hash key in redis. If the `redis.key` configuration is not supplied, and the default value is `metadata:debezium:offsets`
|[[debezium-source-offset-redis-wait-enabled]]<<redis-wait-enabled, `debezium.source.offset.storage.redis.wait.enabled`>>
|`false`
|If using Redis to store offsets, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[debezium-source-offset-redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.source.offset.storage.redis.wait.timeout.ms`>>
|`1000`
|If using Redis to store offsets, defines the timeout in milliseconds when waiting for replica. Must have a positive value.
|[[debezium-source-offset-redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.source.offset.storage.redis.wait.retry.enabled`>>
|`false`
|If using Redis to store offsets, enables retry on wait for replica failure.
|[[debezium-source-offset-redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.source.offset.storage.redis.wait.retry.delay.ms`>>
|`1000`
|If using Redis to store offsets, defines the delay of retry on wait for replica failure.
|[[debezium-source-database-history-class]]<<debezium-source-database-history-class, `debezium.source.schema.history.internal`>>
|`io.debezium.storage.kafka.history.KafkaSchemaHistory`
|Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) track the database schema evolution over time and stores this data in a database schema history.
This is by default based on Kafka.
There are also other options available
* `io.debezium.storage.file.history.FileSchemaHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemorySchemaHistory` volatile store for test environments
* `io.debezium.storage.redis.history.RedisSchemaHistory` for Redis deploments
* `io.debezium.storage.redis.history.RocketMqSchemaHistory` for RocketMQ deploments
|[[debezium-source-database-history-file-filename]]<<debezium-source-database-history-file-filename, `debezium.source.schema.history.internal.file.filename`>>
|
|The name and location of the file to which `FileSchemaHistory` persists its data.
|[[debezium-source-database-history-redis-address]]<<debezium-source-database-history-redis-address, `debezium.source.schema.history.internal.redis.address`>>
|
|The Redis host:port to connect to if using `RedisSchemaHistory`.
|[[debezium-source-database-history-redis-user]]<<debezium-source-database-history-redis-user, `debezium.source.schema.history.internal.redis.user`>>
|
|The Redis user to use if using `RedisSchemaHistory`.
|[[debezium-source-database-history-redis-password]]<<debezium-source-database-history-redis-password, `debezium.source.schema.history.internal.redis.password`>>
|
|The Redis password to use if using `RedisSchemaHistory`.
|[[debezium-source-database-history-redis-ssl-enabled]]<<debezium-source-database-history-redis-ssl-enabled, `debezium.source.schema.history.internal.redis.ssl.enabled`>>
|
|Use SSL connection if using `RedisSchemaHistory`.
|[[debezium-source-database-history-redis-key]]<<debezium-source-database-history-redis-key, `debezium.source.schema.history.internal.redis.key`>>
|
|The Redis key to use for storage if using `RedisSchemaHistory`. Default: metadata:debezium:schema_history
|[[debezium-source-database-history-redis-retry-initial-delay-ms]]<<debezium-source-database-history-redis-retry-initial-delay-ms, `debezium.source.schema.history.internal.redis.retry.initial.delay.ms`>>
|
|The initial delay in case of a connection retry to Redis if using `RedisSchemaHistory`. Default: 300 (ms)
|[[debezium-source-database-history-redis-retry-max-delay-ms]]<<debezium-source-database-history-redis-retry-max-delay-ms, `debezium.source.schema.history.internal.redis.retry.max.delay.ms`>>
|
|The maximum delay in case of a connection retry to Redis if using `RedisSchemaHistory`. Default: 10000 (ms)
|[[debezium-source-database-history-redis-retry-max-attempts]]<<debezium-source-database-history-redis-retry-max-attempts, `debezium.source.schema.history.internal.redis.retry.max.attempts`>>
|
|The maximum number of attempts to connect to Redis. Default: 10
|[[debezium-source-database-history-redis-connection-timeout-ms]]<<debezium-source-database-history-redis-connection-timeout-ms, `debezium.source.schema.history.internal.redis.connection.timeout.ms`>>
|
|Connection timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)
|[[debezium-source-database-history-redis-socket-timeout-ms]]<<debezium-source-database-history-redis-socket-timeout-ms, `debezium.source.schema.history.internal.redis.socket.timeout.ms`>>
|
|Socket timeout of Redis client if using `RedisSchemaHistory`. Default: 2000 (ms)
|[[debezium-source-database-history-redis-wait-enabled]]<<redis-wait-enabled, `debezium.source.schema.history.internal.redis.wait.enabled`>>
|`false`
|If using Redis to store schema history, enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[debezium-source-database-history-redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.source.schema.history.internal.redis.wait.timeout.ms`>>
|`1000`
|If using Redis to store schema history, defines the timeout in milliseconds when waiting for replica. Must have a positive value.
|[[debezium-source-database-history-redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.source.schema.history.internal.redis.wait.retry.enabled`>>
|`false`
|If using Redis to store schema history, enables retry on wait for replica failure.
|[[debezium-source-database-history-redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.source.schema.history.internal.redis.wait.retry.delay.ms`>>
|`1000`
|If using Redis to store schema history, defines the delay of retry on wait for replica failure.
|[[schema-history-internal-rocketmq-topic]]<<schema-history-internal-rocketmq-topic, `debezium.source.schema.history.internal.rocketmq.topic`>>
|
|The name of the rocketmq topic for the database schema history.
|[[schema-history-internal-rocketmq-namesrvAddr]]<<schema-history-internal-rocketmq-namesrvAddr, `debezium.source.schema.history.internal.rocketmq.name.srv.addr`>>
|`localhost:9876`
|RocketMQ service discovery NameServer address configuration.
|[[schema-history-internal-rocketmq-acl-enabled]]<<schema-history-internal-rocketmq-acl-enabled, `debezium.source.schema.history.internal.rocketmq.acl.enabled`>>
|`false`
|RocketMQ access control enable configuration, default is 'false'.
|[[schema-history-internal-rocketmq-access-key]]<<schema-history-internal-rocketmq-access-key, `debezium.source.schema.history.internal.rocketmq.access.key`>>
|
|RocketMQ access key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
empty.
|[[schema-history-internal-rocketmq-secret-key]]<<schema-history-internal-rocketmq-secret-key, `debezium.source.schema.history.internal.rocketmq.secret.key`>>
|
|RocketMQ secret key. If `debezium.source.schema.history.internal.rocketmq.acl.enabled` is true, the value cannot be
empty.
|[[schema-history-internal-rocketmq-recovery-attempts]]<<schema-history-internal-rocketmq-recovery-attempts,`debezium.source.schema.history.internal.rocketmq.recovery.attempts`>>
| `60`
|The maximum number of attempts to recover database schema history.
|[[schema-history-internal-rocketmq-recovery-poll-interval-ms]]<<schema-history-internal-rocketmq-recovery-poll-interval-ms,`debezium.source.schema.history.internal.rocketmq.recovery.poll.interval.ms`>>
| `1000`
|The number of milliseconds to wait while polling for persisted
data during recovery.
|[[schema-history-internal-rocketmq-store-record-timeout-ms]]<<schema-history-internal-rocketmq-store-record-timeout-ms,`debezium.source.schema.history.internal.rocketmq.store.record.timeout.ms`>>
| `60000`
|Timeout for sending messages to RocketMQ.
|===
[id="debezium-format-configuration-options"]
=== Format configuration
The message output format can be configured for both key and value separately.
By default the output is in JSON format but an arbitrary implementation of Kafka Connect's `Converter` can be used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[debezium-format-key]]<<debezium-format-key, `debezium.format.key`>>
|`json`
|The name of the output format for key, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`simplestring`/`binary`.
|[[debezium-format-key-props]]<<debezium-format-key-props, `debezium.format.key.*`>>
|
|Configuration properties passed to the key converter.
|[[debezium-format-value]]<<debezium-format-value, `debezium.format.value`>>
|`json`
|The name of the output format for value, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`cloudevents`/`simplestring`/`binary`.
|[[debezium-format-value-props]]<<debezium-format-value-props, `debezium.format.value.*`>>
|
|Configuration properties passed to the value converter.
|[[debezium-format-header]]<<debezium-format-header, `debezium.format.header`>>
|`json`
|The name of the output format for value, one of `json`/`jsonbytearray`.
|[[debezium-format-header-props]]<<debezium-format-header-props, `debezium.format.header.*`>>
|
|Configuration properties passed to the header converter.
|===
[id="debezium-transformations-configuration-options"]
=== Transformation configuration
Before the messages are delivered to the sink, they can run through a sequence of transformations.
The server supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transformations] defined by Kafka Connect.
The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
[id="debezium-transforms"]
|`debezium.transforms`
|
|The comma separated list of symbolic names of transformations.
[id="debezium-transforms-name-type"]
|`debezium.transforms.<name>.type`
|
|The name of Java class implementing the transformation with name `<name>`.
[id="debezium-transforms-name"]
|`debezium.transforms.<name>.*`
|
|Configuration properties passed to the transformation with name `<name>`.
[id="debezium-transforms-name-predicate"]
|`debezium.transforms.<name>.predicate`
|
|The name of the predicate to be applied to the transformation with name `<name>`.
[id="debezium-transforms-name-negate"]
|`debezium.transforms.<name>.negate`
| false
|Determines if the result of the predicate to the transformation with name `<name>` will be negated.
|===
[id="debezium-predicates-configuration-options"]
=== Predicates configuration
A Predicate can be associated with a transformation in order to make the transformation optional.
The server supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs[Filter and Conditional SMTs] defined by Kafka Connect.
The configuration will need to contain the list of predicates, implementation class for each predicate and configuration options for each of the predicates.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
[id="debezium-predicates"]
|`debezium.predicates`
|
|The comma separated list of symbolic names of predicates.
[id="debezium-predicates-name-type"]
|`debezium.predicates.<name>.type`
|
|The name of Java class implementing the predicate with name `<name>`.
[id="debezium-predicates-name"]
|`debezium.predicates.<name>.*`
|
|Configuration properties passed to the predicate with name `<name>`.
|===
[id="debezium-additional-configuration-options"]
=== Additional configuration
{prodname} Server runs on top of the Quarkus framework.
All configuration options exposed by Quarkus are available in {prodname} Server too.
The most frequent used are:
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
[id="debezium-quarkus-http-port"]
|`quarkus.http.port`
|8080
|The port on which {prodname} exposes Microprofile Health endpoint and other exposed status information. Health can be accessed on `\http://host:8080/q/health`.
[id="debezium-quarkus-log-level"]
|`quarkus.log.level`
|INFO
|The default log level for every log category.
[id="debezium-quarkus-json-logging"]
|`quarkus.log.console.json`
|true
|Determine whether to enable the JSON console formatting extension, which disables "normal" console formatting.
|===
JSON logging can be disabled by setting `quarkus.log.console.json=false` in the _conf/application.properties_ file, as demonstrated in the _conf/application.properties.example_ file.
==== Enabling message filtering
{prodname} Server provides filter STM capability, see xref:transformations/filtering.adoc[Message Filtering] for more details.
However, for security reasons it's not enabled by default and has to be explicitly enabled when {prodname} Server is started.
To enable it, set environment variable `ENABLE_DEBEZIUM_SCRIPTING` to `true`.
This will add `debezium-scripting` jar file and https://jcp.org/en/jsr/detail?id=223[JSR 223] implementations (currently Groovy and graalvm.js) jar files into the server class path.
These jar files are contained in `opt_lib` directory of the {prodname} Server distribution.
=== Sink configuration
Sink configuration is specific for each sink type.
The sink is selected by configuration property `debezium.sink.type`.
==== Amazon Kinesis
Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability.
Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[kinesis-type]]<<kinesis-type, `debezium.sink.type`>>
|
|Must be set to `kinesis`.
|[[kinesis-region]]<<kinesis-region, `debezium.sink.kinesis.region`>>
|
|A region name in which the Kinesis target streams are provided.
|[[kinesis-endpoint]]<<kinesis-endpoint, `debezium.sink.kinesis.endpoint`>>
|_endpoint determined by aws sdk_
|(Optional) An endpoint url at which the Kinesis target streams are provided.
|[[kinesis-credentials-profile]]<<kinesis-credentials-profile, `debezium.sink.kinesis.credentials.profile`>>
|
|(Optional) A credentials profile name used to communicate with Amazon API through the default credential profiles file.
If not present will be used the default credentials provider chain. It will look for credentials on the following order: environment variables, java system properties, web identity token credentials, default credential profiles file, Amazon ECS container credentials and instance profile credentials.
|[[kinesis-null-key]]<<kinesis-null-key, `debezium.sink.kinesis.null.key`>>
|`default`
|Kinesis does not support the notion of messages without key.
So this string will be used as message key for messages from tables without primary key.
|===
===== Injection points
The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[kinesis-ext-client]]<<kinesis-ext-client, `software.amazon.awssdk.services.kinesis.KinesisClient`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `KinesisClient` used to send messages to target streams.
|[[kinesis-ext-stream-name-mapper]]<<kinesis-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name.
By default the same name is used.
|===
==== Google Cloud Pub/Sub
Google Cloud Pub/Sub is an implementation of messaging/eventing system designed for scalable batch and stream processing applications.
Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pubsub-type]]<<pubsub-type, `debezium.sink.type`>>
|
|Must be set to `pubsub`.
|[[pubsub-project-id]]<<pubsub-project-id, `debezium.sink.pubsub.project.id`>>
|_system-wide default project id_
|A project name in which the target topics are created.
|[[pubsub-ordering]]<<pubsub-ordering, `debezium.sink.pubsub.ordering.enabled`>>
|`true`
|Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the https://googleapis.dev/java/google-api-grpc/latest/com/google/pubsub/v1/PubsubMessage.Builder.html#setOrderingKey-java.lang.String-[same order] as were sent for messages with the same order key.
This feature can be disabled.
|[[pubsub-null-key]]<<pubsub-null-key, `debezium.sink.pubsub.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pub/Sub so a surrogate key must be used.
|[[batch-delay-threshold-ms]]<<pubsub-null-key, `debezium.sink.pubsub.batch.delay.threshold.ms`>>
|`100`
|The maximum amount of time to wait to reach element count or request bytes threshold before publishing outstanding messages to Pub/Sub.
|[[batch-element-count-threshold]]<<batch-element-count-threshold, `debezium.sink.pubsub.batch.element.count.threshold`>>
|`100L`
|Once this many messages are queued, send all of the messages in a single call, even if the delay threshold hasn't elapsed yet.
|[[batch-request-byte-threshold]]<<batch-request-byte-threshold, `debezium.sink.pubsub.batch.request.byte.threshold`>>
|`10000000L`
|Once the number of bytes in the batched request reaches this threshold, send all of the messages in a single call, even if neither the delay or message count thresholds have been exceeded yet.
|[[flowcontrol-enabled]]<<flowcontrol-enabled, `debezium.sink.pubsub.flowcontrol.enabled`>>
|`false`
|When enabled, configures your publisher client with flow control to limit the rate of publish requests.
|[[flowcontrol-max-outstanding-messages]]<<flowcontrol-max-outstanding-messages, `debezium.sink.pubsub.flowcontrol.max.outstanding.messages`>>
|`Long.MAX_VALUE`
|(Optional) If flow control enabled, the maxmium number of messages before messages are blocked from being published
|[[flowcontrol-max-outstanding-bytes]]<<flowcontrol-max-outstanding-bytes, `debezium.sink.pubsub.flowcontrol.max.outstanding.bytes`>>
|`Long.MAX_VALUE`
|(Optional) If flow control enabled, the maxmium number of bytes before messages are blocked from being published
|[[retry-total-timeout-ms]]<<retry-total-timeout-ms, `debezium.sink.pubsub.retry.total.timeout.ms`>>
|`60000`
|The total timeout for a call to publish (including retries) to Pub/Sub.
|[[retry-initial-delay-ms]]<<retry-initial-delay-ms, `debezium.sink.pubsub.retry.initial.delay.ms`>>
|`5`
|The initial amount of time to wait before retrying the request.
|[[retry-delay-multiplier]]<<retry-delay-multiplier, `debezium.sink.pubsub.retry.delay.multiplier`>>
|`2.0`
|The previous wait time is multiplied by this multiplier to come up with the next wait time, until the max is reached.
|[[retry-max-delay-ms]]<<retry-max-delay-ms, `debezium.sink.pubsub.retry.max.delay.ms`>>
|`Long.MAX_VALUE`
|The maximum amount of time to wait before retrying.
i.e. after this value is reached, the wait time will not increase further by the multiplier.
|[[retry-initial-rpc-timeout-ms]]<<retry-initial-rpc-timeout-ms, `debezium.sink.pubsub.retry.initial.rpc.timeout.ms`>>
|`10000`
|Controls the timeout for the initial Remote Procedure Call
|[[retry-rpc-timeout-multiplier]]<<retry-initial-rpc-timeout-ms, `debezium.sink.pubsub.retry.rpc.timeout.multiplier`>>
|`2.0`
|The previous RPC timeout is multiplied by this multipler to come up with the next RPC timeout value, until the max is reached
|[[retry-max-rpc-timeout-ms]]<<retry-max-rpc-timeout-ms, `debezium.sink.pubsub.retry.max.rpc.timeout.ms`>>
|`10000`
|The max timeout for individual publish requests to Cloud Pub/Sub.
|[[wait-message-delivery-timeout-ms]]<<wait-message-delivery-timeout-ms, `debezium.sink.pubsub.wait.message.delivery.timeout.ms`>>
|`30000`
|The max wait time for retrieve of publish requests results to Cloud Pub/Sub.
|[[address]]<<address, `debezium.sink.pubsub.address`>>
|
|The address of the pubsub emulator.
Only to be used in a dev or test environment with the https://cloud.google.com/pubsub/docs/emulator[pubsub emulator].
Unless this value is set, debezium-server will connect to a cloud pubsub instance running in a gcp project, which is the desired behavior in a production environment.
|===
===== Injection points
The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[pubsub-pub-builder]]<<pubsub-pub-builder, `io.debezium.server.pubsub.PubSubChangeConsumer.PublisherBuilder`>>
|`@CustomConsumerBuilder`
|A class that provides custom configured instance of a `Publisher` used to send messages to a dedicated topic.
|[[pubsub-ext-stream-name-mapper]]<<pubsub-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Pub/Sub topic name. By default the same name is used.
|===
==== Pub/Sub Lite
Google Cloud Pub/Sub Lite is a cost-effective alternative to Google Cloud Pub/Sub.
Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pubsublite-type]]<<pubsublite-type, `debezium.sink.type` >>
|
|Must be set to `pubsublite`
|[[pubsublite-project-id]]<<pubsublite-project-id, `debezium.sink.pubsublite.project.id` >>
|system-wide default project id
|A project name or project id in which the target topics are created.
|[[pubsublite-region]]<<pubsublite-region, `debezium.sink.pubsublite.region` >>
|
|Region where the topics are being created. Example `us-east1-b`.
|[[pubsublite-ordering]]<<pubsublite-ordering, `debezium.sink.pubsublite.ordering.enabled`>>
|`true`
|Pub/Sub Lite can optionally use a message key to guarantee the delivery of the messages in with the same key to the https://cloud.google.com/pubsub/lite/docs/publishing#using_ordering_keys-[same partition].
This feature can be disabled.
|[[pubsublite-null-key]]<<pubsublite-null-key, `debezium.sink.pubsublite.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pub/Sub Lite so a surrogate key must be used.
|[[pubsublite-wait-message-delivery-timeout-ms]]<<pubsublite-wait-message-delivery-timeout-ms, `debezium.sink.pubsublite.wait.message.delivery.timeout.ms`>>
|`30000`
|The max wait time for retrieve of publish requests results to Cloud Pub/Sub.
|===
===== Injection points
The Pub/Sub Lite sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[pubsublite-pub-builder]]<<pubsublite-pub-builder, `io.debezium.server.pubsub.PubSubLiteChangeConsumer.PublisherBuilder`>>
|`@CustomConsumerBuilder`
|A class that provides custom configured instance of a `Publisher` used to send messages to a dedicated topic.
|[[pubsublite-ext-stream-name-mapper]]<<pubsublite-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Pub/Sub Lite topic name. By default the same name is used.
|===
==== HTTP Client
The HTTP Client will stream changes to any HTTP Server for additional processing with the original design goal to have
{prodname} act as a https://knative.dev/docs/eventing/sources/[Knative Event Source]. The HTTP Client sink supports
optional https://en.wikipedia.org/wiki/JSON_Web_Token[JSON Web Token (JWT) authentication].
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[httpclient-type]]<<httpclient-type, `debezium.sink.type` >>
|
|Must be set to `http`
|[[httpclient-url]]<<httpclient-url, `debezium.sink.http.url` >>
|
|The HTTP Server URL to stream events to. This can also be set by defining the `K_SINK` environment variable, which is used by the Knative source framework.
|[[httpclient-timeout]]<<httpclient-timeout, `debezium.sink.http.timeout.ms` >>
|60000
|The number of seconds to wait for a response from the server before timing out. (default of 60s)
|[[httpclient-retries]]<<httpclient-retries, `debezium.sink.http.retries` >>
|5
|The number retries before exception is thrown (default 5 times).
|[[httpclient-retry-interval]]<<httpclient-retry-interval, `debezium.sink.http.retry.interval.ms` >>
|1000
|The number of milliseconds to wait before another attempt to send record is made after failure (default of 1s).
|[[httpclient-headers-prefix]]<<httpclient-headers-prefix, `debezium.sink.http.headers.prefix` >>
|X-DEBEZIUM-
|Headers will be prefixed with this value (defaults to X-DEBEZIUM-).
|[[httpclient-headers-encode-base64]]<<httpclient-headers-encode-base64, `debezium.sink.http.headers.encode.base64` >>
|true
|Header values will be base64 encoded (defaults to true).
|[[httpclient-authentication-type]]<<httpclient-authentication-type, `debezium.sink.http.authentication.type` >>
|
|Specifies the authentication type to use. If missing, no authentication is used. Currently, only JSON Web Token (JWT) authentication (indicated by the value `jwt`) is supported.
|[[httpclient-authentication-jwt-username]]<<httpclient-authentication-jwt-username, `debezium.sink.http.authentication.jwt.username` >>
|
|Specifies the username for JWT authentication.
|[[httpclient-authentication-jwt-password]]<<httpclient-authentication-jwt-password, `debezium.sink.http.authentication.jwt.password` >>
|
|Specifies the password for JWT authentication.
|[[httpclient-authentication-jwt-url]]<<httpclient-authentication-jwt-url, `debezium.sink.http.authentication.jwt.url` >>
|
|Specifies the base URL (e.g., `http://myserver:8000/`) for JWT authentication. The paths `auth/authenticate` and `auth/refreshToken` are appended for the JWT initial and authentication REST requests.
|[[httpclient-authentication-jwt-token-expiration]]<<httpclient-authentication-jwt-token-expiration, `debezium.sink.http.authentication.jwt.token_expiration` >>
|
|Requested duration (in minutes) before the authentication token expires.
|[[httpclient-authentication-jwt-refresh-token-expiration]]<<httpclient-authentication-jwt-refresh-token-expiration, `debezium.sink.http.authentication.jwt.refresh_token_expiration` >>
|
|Requested duration (in minutes) before the refresh token expires.
|===
==== Apache Pulsar
https://pulsar.apache.org/[Apache Pulsar] is high-performance, low-latency server for server-to-server messaging.
Pulsar exposes a REST APIs and a native endpoint provides a (not-only) Java client that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pulsar-type]]<<pulsar-type, `debezium.sink.type`>>
|
|Must be set to `pulsar`.
|[[pulsar-timeout]]<<pulsar-timeout, `debezium.sink.pulsar.timeout`>>
|`0`
|Configures timeout in milliseconds for sending a batch of messages to Pulsar and waiting for the producer to flush and persist all of them.
By default it is set to `0` which means no timeout.
Make sure that https://pulsar.apache.org/api/client/2.10.1/org/apache/pulsar/client/api/ProducerBuilder.html#maxPendingMessages(int)[`maxPendingMessages`] and https://pulsar.apache.org/api/client/2.10.1/org/apache/pulsar/client/api/ProducerBuilder.html#blockIfQueueFull(boolean)[`blockIfQueueFull`] are configured properly on the producer.
|[[pulsar-client]]<<pulsar-client, `debezium.sink.pulsar.client.*`>>
|
|The Pulsar module supports pass-through configuration.
The client https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the client with the prefix removed.
At least `serviceUrl` must be provided.
|[[pulsar-producer]]<<pulsar-producer, `debezium.sink.pulsar.producer.*`>>
|
|The Pulsar module supports pass-through configuration.
The message producer https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the producer with the prefix removed.
The `topic` is set by {prodname}.
|[[pulsar-null-key]]<<pulsar-null-key, `debezium.sink.pulsar.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pulsar so a surrogate key must be used.
|[[pulsar-tenant]]<<pulsar-tenant, `debezium.sink.pulsar.tenant`>>
|`public`
|The target tenant used to deliver the message.
|[[pulsar-namespace]]<<pulsar-namespace, `debezium.sink.pulsar.namespace`>>
|`default`
|The target namespace used to deliver the message.
|===
===== Injection points
The Pulsar sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[pulsar-ext-stream-name-mapper]]<<pulsar-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Pulsar topic name. By default the same name is used.
|===
==== Azure Event Hubs
https://docs.microsoft.com/azure/event-hubs/event-hubs-about[Azure Event Hubs] is a big data streaming platform and event ingestion service that can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[eventhubs-type]]<<eventhubs-type, `debezium.sink.type`>>
|
|Must be set to `eventhubs`.
|[[connection-string]]<<connection-string, `debezium.sink.eventhubs.connectionstring`>>
|
|https://docs.microsoft.com/azure/event-hubs/event-hubs-get-connection-string[Connection string] required to communicate with Event Hubs. The format is: `Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>`
|[[hub-name]]<<hub-name, `debezium.sink.eventhubs.hubname`>>
|
|Name of the Event Hub
|[[partition-id]]<<partition-id, `debezium.sink.eventhubs.partitionid`>>
|
|(Optional) The identifier of the Event Hub partition that the events will be sent to. Use this if you want all the change events received by {prodname} to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionkey`
|[[partition-key]]<<partition-key, `debezium.sink.eventhubs.partitionkey`>>
|
|(Optional) The partition key will be used to hash the events. Use this if you want all the change events received by {prodname} to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionid`
|[[max-batch-size]]<<max-batch-size, `debezium.sink.eventhubs.maxbatchsize`>>
|
|Sets the maximum size for the batch of events, in bytes.
|===
===== Using partitions in EventHubs
By default, when neither of the optional <<partition-id, `debezium.sink.eventhubs.partitionid`>> or <<partition-key, `debezium.sink.eventhubs.partitionkey`>> properties are defined, the EventHubs sink will send events round-robin to all available partitions.
You can enforce all messages to be sent to a single, fixed, partition by setting the <<partition-id, `debezium.sink.eventhubs.partitionid`>> property. Alternatively, you can use the <<partition-key, `debezium.sink.eventhubs.partitionkey`>> property to specify a fixed partition key that EventHubs will use to route all events to a specific partition.
If you have more specific routing requirements you can use the xref:transformations/partition-routing.adoc[Partition Routing] transformer. Ensure that the number of partitions specified in the transformer's `partition.topic.num` setting is equal or less to the number of partitions available in your EventHubs namespace, so that events cannot be routed to non-existing partition IDs. As an example, to route all events to 5 partitions based on their source schema name, you can set the following in your application.properties:
[source]
----
# Uses a hash of `source.db` to calculate which partition to send the event to. Ensures all events from the same source schema are sent to the same partition.
debezium.transforms=PartitionRouter
debezium.transforms.PartitionRouter.type=io.debezium.transforms.partitions.PartitionRouting
debezium.transforms.PartitionRouter.partition.payload.fields=source.db
debezium.transforms.PartitionRouter.partition.topic.num=5
----
===== Injection points
The default sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[eventhubs-ext-client]]<<eventhubs-ext-client, `com.azure.messaging.eventhubs.EventHubProducerClient`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `EventHubProducerClient` used to send messages.
|===
==== Redis (Stream)
Redis is an open source (BSD licensed) in-memory data structure store, used as a database, cache and message broker.
The Stream is a data type which models a _log data structure_ in a more abstract way. It implements powerful operations to overcome the limitations of a log file.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[redis-type]]<<redis-type, `debezium.sink.type`>>
|
|Must be set to `redis`.
|[[redis-address]]<<redis-address, `debezium.sink.redis.address`>>
|
|An address, formatted as `host:port`, at which the Redis target streams are provided.
|[[redis-db-index]]<<redis-db-index, `debezium.sink.redis.db.index`>>
|`0`
|A number in the range 0..15 used for selecting the database to work with. Default is database 0. This feature is only available for standalone Redis connections; Redis clusters use only database 0.
|[[redis-user]]<<redis-user, `debezium.sink.redis.user`>>
|
|(Optional) A user name used to communicate with Redis.
|[[redis-password]]<<redis-password, `debezium.sink.redis.password`>>
|
|(Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set.
|[[redis-ssl-enabled]]<<redis-ssl-enabled, `debezium.sink.redis.ssl.enabled`>>
|`false`
|(Optional) A Boolean value that specifies whether connections to Redis require SSL.
|[[redis-null-key]]<<redis-null-key, `debezium.sink.redis.null.key`>>
|`default`
|Redis does not support the notion of data without key.
So this string will be used as key for records without primary key.
|[[redis-null-value]]<<redis-null-value, `debezium.sink.redis.null.value`>>
|`default`
|Redis does not support the notion of null payloads, as is the case with tombstone events.
So this string will be used as value for records without a payload.
|[[redis-batch-size]]<<redis-batch-size, `debezium.sink.redis.batch.size`>>
|`500`
|Number of change records to insert in a single batch write (Pipelined transaction).
|[[redis-retry-initial-delay-ms]]<<redis-retry-initial-delay-ms, `debezium.sink.redis.retry.initial.delay.ms`>>
|`300`
|Initial retry delay when encountering Redis connection or OOM issues.
This value will be doubled upon every retry but won't exceed `debezium.sink.redis.retry.max.delay.ms`
|[[redis-retry-max-delay-ms]]<<redis-retry-max-delay-ms, `debezium.sink.redis.retry.max.delay.ms`>>
|`10000`
|Max delay when encountering Redis connection or OOM issues.
|[[redis-connection-timeout-ms]]<<redis-connection-timeout-ms, `debezium.sink.redis.connection.timeout.ms`>>
|`2000`
|Connection timeout for Redis client.
|[[redis-socket-timeout-ms]]<<redis-socket-timeout-ms, `debezium.sink.redis.socket.timeout.ms`>>
|`2000`
|Socket timeout for Redis client.
|[[redis-wait-enabled]]<<redis-wait-enabled, `debezium.sink.redis.wait.enabled`>>
|`false`
|Enables wait for replica. In case Redis is configured with a replica shard, this allows to verify that the data has been written to the replica.
For more information see Redis https://redis.io/commands/wait/[WAIT] command.
|[[redis-wait-timeout-ms]]<<redis-wait-timeout-ms, `debezium.sink.redis.wait.timeout.ms`>>
|`1000`
|Timeout in milliseconds when waiting for replica. Must have a positive value.
|[[redis-wait-retry-enabled]]<<redis-wait-retry-enabled, `debezium.sink.redis.wait.retry.enabled`>>
|`false`
|Enables retry on wait for replica failure.
|[[redis-wait-retry-delay]]<<redis-wait-retry-delay, `debezium.sink.redis.wait.retry.delay.ms`>>
|`1000`
|Delay of retry on wait for replica failure.
|[[redis-message-format]]<<redis-message-format, `debezium.sink.redis.message.format`>>
|`compact`
|The format of the message sent to the Redis stream. Possible values are `extended`(newer format) and `compact`(the until now, old format).
Read more about the message format xref:#p-redis-message-format[below].
|[[redis-memory-threshold-percentage]]<<redis-memory-threshold-percentage, `debezium.sink.redis.memory.threshold.percentage`>>
|`85`
|The sink will stop consuming records if the `used_memory` percentage (out of Redis configured `maxmemory`) is higher or equal to this threshold.
If the configured value is `0` then this threshold is disabled.
|[[redis-memory-limit-mb]]<<redis-memory-limit-mb, `debezium.sink.redis.memory.limit.mb`>>
|`0`
|If Redis `maxmemory` is not available or `0`, the `xref:#redis-memory-threshold-percentage[debezium.sink.redis.memory.threshold.percentage]`
will apply to this value (if this value is positive).
By default it is `0` (disabled).
|===
[id="p-redis-message-format"]
===== Message Format
We have seen xref:#redis-message-format[above] the `debezium.sink.redis.message.format` property which configures the message format in two ways which look like this in Redis:
- the `extended` format, using two pairs {1), 2)}={"key", "message key"} and {3), 4)}={"value", "message value"}:
[source]
----
1) 1) "1639304527499-0"
2) 1) "key"
2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
3) "value"
4) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
----
- and the `compact` format, using only one pair {1), 2)}={"message key", "message value"}:
[source]
----
1) 1) "1639304527499-0"
2) 1) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Key\"}, \"payload\": {\"empno\": 11}}"
2) "{\"schema\": {\"type\": \"struct\", \"fields\": [{\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"before\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"int32\", \"optional\": false, \"field\": \"empno\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"fname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"lname\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"job\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"mgr\"}, {\"type\": \"int64\", \"optional\": true, \"name\": \"io.debezium.time.Timestamp\", \"version\": 1, \"field\": \"hiredate\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"sal\"}, {\"type\": \"bytes\", \"optional\": true, \"name\": \"org.apache.kafka.connect.data.Decimal\", \"version\": 1, \"parameters\": {\"scale\": \"4\", \"connect.decimal.precision\": \"19\"}, \"field\": \"comm\"}, {\"type\": \"int32\", \"optional\": true, \"field\": \"dept\"}], \"optional\": true, \"name\": \"redislabs.dbo.emp.Value\", \"field\": \"after\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"version\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"connector\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"name\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"ts_ms\"}, {\"type\": \"string\", \"optional\": true, \"name\": \"io.debezium.data.Enum\", \"version\": 1, \"parameters\": {\"allowed\": \"true,last,false\"}, \"default\": \"false\", \"field\": \"snapshot\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"db\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"sequence\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"schema\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"table\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"change_lsn\"}, {\"type\": \"string\", \"optional\": true, \"field\": \"commit_lsn\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"event_serial_no\"}], \"optional\": false, \"name\": \"io.debezium.connector.sqlserver.Source\", \"field\": \"source\"}, {\"type\": \"string\", \"optional\": false, \"field\": \"op\"}, {\"type\": \"int64\", \"optional\": true, \"field\": \"ts_ms\"}, {\"type\": \"struct\", \"fields\": [{\"type\": \"string\", \"optional\": false, \"field\": \"id\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"total_order\"}, {\"type\": \"int64\", \"optional\": false, \"field\": \"data_collection_order\"}], \"optional\": true, \"field\": \"transaction\"}], \"optional\": false, \"name\": \"redislabs.dbo.emp.Envelope\"}, \"payload\": {\"before\": {\"empno\": 11, \"fname\": \"Yossi\", \"lname\": \"Mague\", \"job\": \"PFE\", \"mgr\": 1, \"hiredate\": 1562630400000, \"sal\": \"dzWUAA==\", \"comm\": \"AYag\", \"dept\": 3}, \"after\": null, \"source\": {\"version\": \"1.6.0.Final\", \"connector\": \"sqlserver\", \"name\": \"redislabs\", \"ts_ms\": 1637859764960, \"snapshot\": \"false\", \"db\": \"RedisConnect\", \"sequence\": null, \"schema\": \"dbo\", \"table\": \"emp\", \"change_lsn\": \"0000003a:00002f50:0002\", \"commit_lsn\": \"0000003a:00002f50:0005\", \"event_serial_no\": 1}, \"op\": \"d\", \"ts_ms\": 1637859769370, \"transaction\": null}}"
----
You can read more about Redis Streams https://redis.io/docs/data-types/streams/[here].
===== Injection points
The Redis sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[redis-ext-stream-name-mapper]]<<redis-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Redis stream name.
By default the same name is used.
|===
==== NATS Streaming
https://docs.nats.io/nats-streaming-concepts/intro[NATS Streaming] is a data streaming system powered by NATS, and written in the Go programming language.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[nats-streaming-type]]<<nats-streaming-type, `debezium.sink.type`>>
|
|Must be set to `nats-streaming`.
|[[nats-streaming-url]]<<nats-streaming-url, `debezium.sink.nats-streaming.url`>>
|
| URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as `nats://host:port`.
|[[nats-streaming-cluster-id]]<<nats-streaming-cluster-id, `debezium.sink.nats-streaming.cluster.id`>>
|
|NATS Streaming Cluster ID.
|[[nats-streaming-client-id]]<<nats-streaming-client-id, `debezium.sink.nats-streaming.client.id`>>
|
|NATS Streaming Client ID.
|===
===== Injection points
The NATS Streaming sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[streaming_connection]]<<streaming_connection, `io.nats.streaming.StreamingConnection`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `StreamingConnection` used to publish messages to target subjects.
|[[nats-streaming-ext-stream-name-mapper]]<<nats-streaming-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical NATS Streaming subject name.
By default the same name is used.
|===
==== NATS JetStream
NATS has a built-in distributed persistence system called https://docs.nats.io/nats-concepts/jetstream[JetStream] which enables new functionalities and higher qualities of service on top of the base 'Core NATS' functionalities and qualities of service.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[nats-jetstream-type]]<<nats-jetstream-type, `debezium.sink.type`>>
|
|Must be set to `nats-jetstream`.
|[[nats-jetstream-url]]<<nats-jetstream-url, `debezium.sink.nats-jetstream.url`>>
|
| URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as `nats://host:port`.
|[[nats-jetstream-create-stream]]<<nats-jetstream-create-stream, `debezium.sink.nats-jetstream.create-stream`>>
|
|If true, a basic stream will be created.
|[[nats-jetstream-subjects]]<<nats-jetstream-subjects, `debezium.sink.nats-jetstream.subjects`>>
| \*.*.*
|A comma separated list of subjects, messaging channel names. Can contain wildcards like test.inventory.*
|[[nats-jetstream-storage]]<<nats-jetstream-storage, `debezium.sink.nats-jetstream.storage`>>
| memory
| Controls how the messages are saved in the stream. Can be memory or file.
|===
If you need a more configurable stream, it can be created with nats cli. More about streams at: https://docs.nats.io/nats-concepts/jetstream/streams
===== Injection points
The NATS JetStream sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[jetstream_connection]]<<streaming_connection, `io.nats.client.JetStream`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `JetStream` used to publish messages to target subjects.
|[[jetstream-ext-stream-name-mapper]]<<nats-streaming-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical NATS JetStream subject name.
By default the same name is used.
|===
==== Apache Kafka
https://kafka.apache.org/[Apache Kafka] is a popular open-source platform for distributed event streaming. {prodname} Server supports publishing captured change events to a configured Kafka message broker.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[kafka-type]]<<kafka-type, `debezium.sink.type`>>
|
|Must be set to `kafka`.
|[[kafka-producer]]<<kafka-producer, `debezium.sink.kafka.producer.*`>>
|
|The Kafka sink adapter supports pass-through configuration.
This means that all Kafka producer https://kafka.apache.org/documentation/#producerconfigs[configuration properties] are passed to the producer with the prefix removed.
At least `bootstrap.servers`, `key.serializer` and `value.serializer` properties must be provided. The `topic` is set by {prodname}.
|===
===== Injection points
The Kafka sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[kafka-producer-ext-stream-name-mapper]]<<kafka-producer-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the original destination (topic) name into another Kafka topic. By default, the same name is used.
|===
==== Pravega
https://pravega.io/[Pravega] is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a {prodname} batch to Pravega. The transactional mode writes the {prodname} batch to a Pravega transaction that commits when the batch is completed.
The Pravega sink expects destination scope and streams to already be created.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pravega-type]]<<pravega-type, `debezium.sink.type`>>
|
|Must be set to `pravega`.
|[[pravega-url]]<<pravega-url, `debezium.sink.pravega.controller.uri`>>
|`tcp://localhost:9090`
|The connection string to a Controller in the Pravega cluster.
|[[pravega-scope]]<<pravega-scope, `debezium.sink.pravega.scope`>>
|
|The name of the scope in which to find the destination streams.
|[[pravega-transaction]]<<pravega-transaction, `debezium.sink.pravega.transaction`>>
|`false`
|Set to `true` to have the sink use Pravega transactions for each {prodname} batch.
|===
===== Injection points
Pravega sink behavior can be modified by custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[pravega-ext-stream-name-mapper]]<<pravega-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (stream) name into a physical Pravega stream name.
By default the same name is used.
|===
==== Infinispan
https://infinispan.org/[Infinispan] is open-source in-memory data grid that offers rich set of caches types as well as cache stores.
Due to very fast data access, Infinispan can be used, besides others, as a data source for various data processing and analytical tools.
The Infinispan sink expects that the destination cache is already defined and created within the Infinispan cluster.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[infinispan-type]]<<infinispan-type, `debezium.sink.type`>>
|
|Must be set to `infinispan`.
|[[infisnipan-server-host]]<<infisnipan-server-host, `debezium.sink.infinispan.server.host`>>
|
|The host name of one of the servers of the Infinispan cluster (can be also a comma-separated list of servers).
|[[infisnipan-server-port]]<<infisnipan-server-port, `debezium.sink.infinispan.server.port`>>
| 11222
|The port of the Infinispan server.
|[[infisnipan-cache]]<<infisnipan-cache, `debezium.sink.infinispan.cache`>>
|
|The name of the (exiting) cache where the records will be stored.
|[[infisnipan-user]]<<infisnipan-user, `debezium.sink.infinispan.user`>>
|
|(Optional) The user name used for connecting to Infinispan cluster.
|[[infisnipan-password]]<<infisnipan-password, `debezium.sink.infinispan.password`>>
|
|(Optional) The password used for connecting to Infinispan cluster.
|===
===== Injection points
The Infinispan sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[infinispan-ext-hotrod-cache]]<<infinispan-ext-hotrod-cache, `org.infinispan.client.hotrod.RemoteCache`>>
|`@CustomConsumerBuilder`
|Custom instance of https://docs.jboss.org/infinispan/14.0/apidocs/org/infinispan/client/hotrod/RemoteCache.html[Hot Rod cache] which will be used for connecting and sending events to the Infinspan cluster.
|===
==== Apache RocketMQ
https://rocketmq.apache.org/[Apache RocketMQ] is a distributed messaging and streaming platform with low latency, high
performance and reliability, trillion-level capacity and flexible scalability. {prodname} server supports publishing
captured change events to a configured RocketMQ.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[rocketmq-type]]<<rocketmq-type, `debezium.sink.type`>>
|
|Must be set to `rocketmq`.
|[[rocketmq-namesrv-addr]]<<rocketmq-namesrv-addr, `debezium.sink.rocketmq.producer.name.srv.addr`>>
|
|Name server address of Apache RocketMQ .
|[[rocketmq-producer-group]]<<rocketmq-producer-group, `debezium.sink.rocketmq.producer.group`>>
|
|Producer group of Apache RocketMQ.
|[[rocketmq-max-message-size]]<<rocketmq-max-message-size, `debezium.sink.rocketmq.producer.max.message.size`>>
| 4M, Suggest less than 4 MB.
|(Optional) Maximum number of bytes of sent message body.
|[[rocketmq-send-timeout]]<<rocketmq-send-timeout, `debezium.sink.rocketmq.producer.send.msg.timeout`>>
| 3000ms
|(Optional) The send message timeout duration is the waiting time for local synchronous invocation of clients. Set a
proper value based on the actual application to avoid long thread blocking time.
|[[rocketmq-acl-enabled]]<<rocketmq-acl-enabled, `debezium.sink.rocketmq.producer.acl.enabled`>>
| false
|(Optional) The configuration is used to enable access authorization.
|[[rocketmq-access-key]]<<rocketmq-access-key, `debezium.sink.rocketmq.producer.access.key`>>
|
|(Optional) The access key used for connecting to Apache RocketMQ cluster .
|[[rocketmq-secret-key]]<<rocketmq-secret-key, `debezium.sink.rocketmq.producer.secret.key`>>
|
|(Optional) The access secret used for connecting to Apache RocketMQ cluster .
|===
===== Injection points
The RocketMQ sink behaviour can be modified by a custom logic providing alternative implementations for specific
functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[rocketmq-producer]]<<rocketmq-producer, `org.apache.rocketmq.client.producer.DefaultMQProducer`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a RocketMQ used to publish messages to target topic.
|[[rocketmq-ext-stream-name-mapper]]<<rocketmq-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (stream) name into a RocketMQ topic name.
By default the same name is used.
|===
==== RabbitMQ Stream
https://www.rabbitmq.com/[RabbitMQ] is an open source message broker, supporting multiple messaging protocols and can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.
RabbitMQ supports messages queues and streams.
Debezium Server supports publishing captured change events to a configured RabbitMQ Stream.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[rabbitmq-type]]<<rabbitmq-type, `debezium.sink.type`>>
|
|Must be set to `rabbitmq`.
|[[rabbitmq-host]]<<rabbitmq-host, `debezium.sink.rabbitmq.connection.host`>>
| localhost
|Host of RabbitMQ server.
|[[rabbitmq-port]]<<rabbitmq-port, `debezium.sink.rabbitmq.connection.port`>>
| 5672
|Port of RabbitMQ server.
|[[rabbitmq-connection]]<<rabbitmq-connection, `debezium.sink.rabbitmq.connection.*`>>
|
|The RabbitMQ module supports pass-through configuration.
The connection https://github.com/rabbitmq/rabbitmq-java-client/blob/main/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java[configuration properties] are passed to the RabbitMQ client with the prefix removed.
|[[rabbitmq-ack-timeout]]<<rabbitmq-ack-timeout, `debezium.sink.rabbitmq.ackTimeout`>>
| 30000
| Defines the maximum time in milliseconds to wait a confirm from the broker after publishing a message.
|[[rabbitmq-exchange]]<<rabbitmq-exchange, `debezium.sink.rabbitmq.exchange`>>
| _topic name_
|(Optional) Exchange name to use when publishing messages.
|[[rabbitmq-routing-key]]<<rabbitmq-routing-key, `debezium.sink.rabbitmq.routingKey`>>
| _empty string_
|(Optional) Static routing key to use when publishing messages.
|[[rabbitmq-auto-create-routing-key]]<<rabbitmq-auto-create-routing-key, `debezium.sink.rabbitmq.autoCreateRoutingKey`>>
| false
|(Optional) If `true` the non-existing routing key is automatically created.
|[[rabbitmq-routing-key-durable]]<<rabbitmq-routing-key-durable, `debezium.sink.rabbitmq.routingKeyDurable`>>
| true
|(Optional) If `true` the target queue content will survive a RabbitMQ server restart.
|[[rabbitmq-routing-key-from-topic-name]]<<rabbitmq-routing-key-from-topic-name, `debezium.sink.rabbitmq.routingKeyFromTopicName`>>
| false
|(Optional) If `true` the routing key is used from topic name instead of a static value.
|[[rabbitmq-delivery-mode]]<<rabbitmq-delivery-mode, `debezium.sink.rabbitmq.deliveryMode`>>
| 2
|(Optional) The way how the message is delivered to and stored on a RabbitMQ server
* 1 - Non-persistent
* 2 - Persistent
|[[rabbitmq-null-value]]<<rabbitmq-null-value, `debezium.sink.rabbitmq.null.value`>>
|`default`
|RabbitMQ does not support the notion of null payloads, as is the case with tombstone events.
So this string will be used as value for records without a payload.
|===
===== Injection points
RabbitMQ sink behavior can be modified by custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a",options="header"]
|===
|Interface
|CDI classifier
|Description
|[[rabbitmq-ext-stream-name-mapper]]<<rabbitmq-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (stream) name into a RabbitMQ exchange name and (if enabled) into the routing key name.
By default the same name is used.
|===
==== RabbitMQ Native Stream
Since https://www.rabbitmq.com/[RabbitMQ 3.9], https://www.rabbitmq.com/streams.html[Streams] were introduced to RabbitMQ, utilizing a new blazingly-fast protocol that can be used alongside AMQP 0.9.1.
Streams are great for large fan-outs, replay & time travel, and large logs, all with very high throughput (million messages per second).
Debezium Server is enhanced to support publishing captured change events to native RabbitMQ Streams leveraging https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/[RabbitMQ Stream Java Client].
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[rabbitmqstream-type]]<<rabbitmqstream-type, `debezium.sink.type`>>
|
|Must be set to `rabbitmqstream`.
|[[rabbitmqstream-host]]<<rabbitmqstream-host, `debezium.sink.rabbitmqstream.connection.host`>>
| localhost
|Host of RabbitMQ server.
|[[rabbitmqstream-port]]<<rabbitmqstream-port, `debezium.sink.rabbitmqstream.connection.port`>>
| 5552
|Port of RabbitMQ Stream Protocol.
|[[rabbitmqstream-connection]]<<rabbitmqstream-connection, `debezium.sink.rabbitmqstream.connection.*`>>
|
|The RabbitMQ module supports pass-through configuration.
The connection https://github.com/rabbitmq/rabbitmqstream-java-client/blob/main/src/main/java/com/rabbitmq/client/ConnectionFactoryConfigurator.java[configuration properties] are passed to the RabbitMQ client with the prefix removed.
|[[rabbitmqstream-ack-timeout]]<<rabbitmqstream-ack-timeout, `debezium.sink.rabbitmqstream.ackTimeout`>>
| 30000
| Defines the maximum time in milliseconds to wait a confirm from the broker after publishing a message.
|[[rabbitmqstream-null-value]]<<rabbitmqstream-null-value, `debezium.sink.rabbitmqstream.null.value`>>
|`default`
|RabbitMQ does not support the notion of null payloads, as is the case with tombstone events.
So this string will be used as value for records without a payload.
|===
== Extensions
{prodname} Server uses the https://quarkus.io/[Quarkus framework] and relies on dependency injection to enable developer to extend its behaviour.
Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM.
The server can be extended in two ways by providing a custom logic:
* implementation of a new sink
* customization of an existing sink - i.e. non-standard configuration
=== Implementation of a new sink
The new sink can be implemented as a CDI bean implementing interface `DebeziumEngine.ChangeConsumer` and with annotation `@Named` and unique name and scope `@Dependent`.
The name of the bean is used as the `debezium.sink.type` option.
The sink needs to read the configuration using Microprofile Config API.
The execution path must pass the messages into the target system and regularly commit the passed/processed messages.
See the https://github.com/debezium/debezium-server/blob/main/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java[Kinesis sink] implementation for further details.
=== Customization of an existing sink
Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink.
Typical examples are fine tuning of the target client setup, the destination naming etc.
See an example of a custom https://github.com/debezium/debezium-examples/tree/main/debezium-server-name-mapper[topic naming policy] implementation for further details.
== Cassandra connector
=== Running {prodname} Server with Cassandra connector
Running with java 11+ requires setting the following java options at startup trough the JDK_JAVA_OPTIONS environment variable or equivalent:
```
JDK_JAVA_OPTIONS="--add-exports java.base/jdk.internal.misc=ALL-UNNAMED --add-exports java.base/jdk.internal.ref=ALL-UNNAMED --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-exports java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED --add-exports java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports java.rmi/sun.rmi.server=ALL-UNNAMED --add-exports java.sql/java.sql=ALL-UNNAMED --add-opens java.base/java.lang.module=ALL-UNNAMED --add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens java.base/jdk.internal.ref=ALL-UNNAMED --add-opens java.base/jdk.internal.reflect=ALL-UNNAMED --add-opens java.base/jdk.internal.math=ALL-UNNAMED --add-opens java.base/jdk.internal.module=ALL-UNNAMED --add-opens java.base/jdk.internal.util.jar=ALL-UNNAMED --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens java.base/sun.nio.ch=ALL-UNNAMED"
```
=== Sample of basic `application.properties` for running Cassandra connector with Redis sink
```
# Sink
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379
# Connector
debezium.source.connector.class=io.debezium.connector.cassandra.Cassandra4Connector
## node.id must be unique per each connector running on each Cassandra node
debezium.source.cassandra.node.id=sample_node_01
debezium.source.cassandra.hosts=127.0.0.1
debezium.source.cassandra.port=9042
debezium.source.cassandra.config=/opt/cassandra/conf/cassandra.yaml
debezium.source.commit.log.relocation.dir=cassandra/relocdir
debezium.source.offset.storage=io.debezium.server.redis.RedisOffsetBackingStore
debezium.source.topic.prefix=sample_prefix
## internal Cassandra http port
debezium.source.http.port=8040
```
=== Transformation for Operation Code
By default, Cassandra connector has it's own Operation Codes which are not entirely compatible with {prodname} Operation Codes.
If needed, a specific transform can be defined in {prodname} Server's `application.properties` to enable the conversion:
```
debezium.transforms=EnvelopeTransformation
debezium.transforms.EnvelopeTransformation.type=io.debezium.connector.cassandra.transforms.EnvelopeTransformation
```
This will convert Operation Codes as follows:
```
INSERT "i" -> CREATE "c"
UPDATE "u" -> UPDATE "u"
DELETE "d" -> DELETE "d"
RANGE_TOMBSTONE "r" -> TRUNCATE "t"
```