tet123/documentation/modules/ROOT/pages/operations/debezium-server.adoc

741 lines
29 KiB
Plaintext

[id="debezium-server"]
= {prodname} Server
:linkattrs:
:icons: font
:toc:
:toclevels: 3
:toc-placement: macro
toc::[]
[NOTE]
====
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive.
Please let us know if you encounter any problems while using this feature.
Also please reach out if you have requirements for specific sinks to be supported by {prodname} Server or even would be interested in contributing the required implementation.
====
{prodname} provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar or Redis (Stream).
For streaming change events to Apache Kafka, it is recommended to deploy the {prodname} connectors via Kafka Connect.
== Installation
To install the server download and unpack the server distribution archive:
ifeval::['{page-version}' == 'main']
* {link-server-snapshot}[{prodname} Server distribution]
NOTE: The above links refers to the nightly snapshot build of the {prodname} main branch.
If you are looking for a non-snapshot version, please select the appropriate version of this documentation in the menu to the right.
endif::[]
ifeval::['{page-version}' != 'main']
* https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/{debezium-version}/debezium-server-dist-{debezium-version}.tar.gz[{prodname} Server distribution]
endif::[]
A directory named `debezium-server` will be created with these contents:
[source,subs="verbatim,attributes"]
----
debezium-server/
|-- CHANGELOG.md
|-- conf
|-- CONTRIBUTE.md
|-- COPYRIGHT.txt
|-- debezium-server-{debezium-version}-runner.jar
|-- lib
|-- LICENSE-3rd-PARTIES.txt
|-- LICENSE.txt
|-- README.md
`-- run.sh
----
The server is started using `run.sh` script, dependencies are stored in the `lib` directory, and the directory `conf` contains configuration files.
== Configuration
{prodname} Server uses https://github.com/eclipse/microprofile-config[MicroProfile Configuration] for configuration.
This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc.
The main configuration file is _conf/application.properties_.
There are multiple sections configured:
* `debezium.source` is for source connector configuration; each instance of {prodname} Server runs exactly one connector
* `debezium.sink` is for the sink system configuration
* `debezium.format` is for the output serialization format configuration
* `debezium.transforms` is for the configuration of message transformations
An example configuration file can look like so:
----
debezium.sink.type=kinesis
debezium.sink.kinesis.region=eu-central-1
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.include.list=inventory
----
When the server is started it generates a seqeunce of log messages like this:
----
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.server.name = tutorial
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.include.list = inventory
2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
----
[id="debezium-source-configuration-properties"]
=== Source configuration
The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with `debezium.source` prefix), together with few more specific ones, necessary for running outside of Kafka Connect:
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[debezium-source-connector-class]]<<debezium-source-connector-class, `debezium.source.connector.class`>>
|
|The name of the Java class implementing the source connector.
|[[debezium-source-offset-storage-file-filename]]<<debezium-source-offset-storage-file-filename, `debezium.source.offset.storage.file.filename`>>
|
|The file in which connector offsets are stored for non-Kafka deployments.
|[[debezium-source-offset-flush-interval-ms]]<<debezium-source-offset-flush-interval-ms, `debezium.source.offset.flush.interval.ms`>>
|
|Defines how frequently the offsets are flushed into the file.
|[[debezium-source-database-history-class]]<<debezium-source-database-history-class, `debezium.source.database.history`>>
|`io.debezium.relational.history.KafkaDatabaseHistory`
|Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) monitors the database schema evolution over the time and stores the data in database history.
This is by default based on Kafka.
There are also other options available
* `io.debezium.relational.history.FileDatabaseHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemoryDatabaseHistory` volatile store for test environments
|[[debezium-source-database-history-file-filename]]<<debezium-source-database-history-file-filename, `debezium.source.database.history.file.filename`>>
|
|The name and location of the file to which `FileDatabaseHistory` persists its data.
|===
[id="debezium-format-configuration-options"]
=== Format configuration
The message output format can be configured for both key and value separately.
By default the output is in JSON format but an arbitrary implementation of Kafka Connect's `Converter` can be used.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[debezium-format-key]]<<debezium-format-key, `debezium.format.key`>>
|`json`
|The name of the output format for key, one of `json`/`avro`/`protobuf`.
|[[debezium-format-key-props]]<<debezium-format-key-props, `debezium.format.key.*`>>
|
|Configuration properties passed to the key converter.
|[[debezium-format-value]]<<debezium-format-value, `debezium.format.value`>>
|`json`
|The name of the output format for value, one of `json`/`avro`/`protobuf`.
|[[debezium-format-value-props]]<<debezium-format-value-props, `debezium.format.value.*`>>
|
|Configuration properties passed to the value converter.
|===
[id="debezium-transformations-configuration-options"]
=== Transformation configuration
Before the messages are delivered to the sink, they can run through a sequence of transformations.
The server supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transformations] defined by Kafka Connect.
The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
[id="debezium-transforms"]
|`debezium.transforms`
|
|The comma separated list of symbolic names of transformations.
[id="debezium-transforms-name-class"]
|`debezium.transforms.<name>.class`
|
|The name of Java class implementing the transformation with name `<name>`.
[id="debezium-transforms-name"]
|`debezium.transforms.<name>.*`
|
|Configuration properties passed to the transformation with name `<name>`.
|===
[id="debezium-additional-configuration-options"]
=== Additional configuration
Debezium Server runs on top Quarkus framework.
All configuration options exposed by Quarkus are available in Debezium Server too.
The most frequent used are:
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
[id="debezium-quarkus-http-port"]
|`quarkus.http.port`
|8080
|The port on which Debezim exposes Microprofile Health endpoint and other exposed status information.
[id="debezium-quarkus-log-level"]
|`quarkus.log.level`
|INFO
|The default log level for every log category.
[id="debezium-quarkus-json-logging"]
|`quarkus.log.console.json`
|true
|Determine whether to enable the JSON console formatting extension, which disables "normal" console formatting.
|===
JSON logging can be disabled by setting `quarkus.log.console.json=false` in the _conf/application.properties_ file, as demonstrated in the _conf/application.properties.example_ file.
=== Sink configuration
Sink configuration is specific for each sink type.
The sink is selected by configuration property `debezium.sink.type`.
==== Amazon Kinesis
Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability.
Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[kinesis-type]]<<kinesis-type, `debezium.sink.type`>>
|
|Must be set to `kinesis`.
|[[kinesis-region]]<<kinesis-region, `debezium.sink.kinesis.region`>>
|
|A region name in which the Kinesis target streams are provided.
|[[kinesis-endpoint]]<<kinesis-endpoint, `debezium.sink.kinesis.endpoint`>>
|_endpoint determined by aws sdk_
|(Optional) An endpoint url at which the Kinesis target streams are provided.
|[[kinesis-credentials-profile]]<<kinesis-credentials-profile, `debezium.sink.kinesis.credentials.profile`>>
|`default`
|A credentials profile name used to communicate with Amazon API.
|[[kinesis-null-key]]<<kinesis-null-key, `debezium.sink.kinesis.null.key`>>
|`default`
|Kinesis does not support the notion of messages without key.
So this string will be used as message key for messages from tables without primary key.
|===
===== Injection points
The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[kinesis-ext-client]]<<kinesis-ext-client, `software.amazon.awssdk.services.kinesis.KinesisClient`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `KinesisClient` used to send messages to target streams.
|[[kinesis-ext-stream-name-mapper]]<<kinesis-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name.
By default the same name is used.
|===
==== Google Cloud Pub/Sub
Google Cloud Pub/Sub is an implementation of messaging/eventing system designed for scalable batch and stream processing applications.
Pub/Sub exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[pubsub-type]]<<pubsub-type, `debezium.sink.type`>>
|
|Must be set to `pubsub`.
|[[pubsub-project-id]]<<pubsub-project-id, `debezium.sink.pubsub.project.id`>>
|_system-wide default project id_
|A project name in which the target topics are created.
|[[pubsub-ordering]]<<pubsub-ordering, `debezium.sink.pubsub.ordering.enabled`>>
|`true`
|Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the https://googleapis.dev/java/google-api-grpc/latest/com/google/pubsub/v1/PubsubMessage.Builder.html#setOrderingKey-java.lang.String-[same order] as were sent for messages with the same order key.
This feature can be disabled.
|[[pubsub-null-key]]<<pubsub-null-key, `debezium.sink.pubsub.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pub/Sub so a surrogate key must be used.
|===
===== Injection points
The Pub/Sub sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[pubsub-pub-builder]]<<pubsub-pub-builder, `io.debezium.server.pubsub.PubSubChangeConsumer.PublisherBuilder`>>
|`@CustomConsumerBuilder`
|A class that provides custom configured instance of a `Publisher` used to send messages to a dedicated topic.
|[[pubsub-ext-stream-name-mapper]]<<pubsub-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Pub/Sub topic name. By default the same name is used.
|===
==== Apache Pulsar
https://pulsar.apache.org/[Apache Pulsar] is high-performance, low-latency server for server-to-server messaging.
Pulsar exposes a REST APIs and a native endpoint provides a (not-only) Java client that is used to implement the sink.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pulsar-type]]<<pulsar-type, `debezium.sink.type`>>
|
|Must be set to `pulsar`.
|[[pulsar-client]]<<pulsar-client, `debezium.sink.pulsar.client.*`>>
|
|The Pulsar module supports pass-through configuration.
The client https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the client with the prefix removed.
At least `serviceUrl` must be provided.
|[[pulsar-producer]]<<pulsar-producer, `debezium.sink.pulsar.producer.*`>>
|
|The Pulsar module supports pass-through configuration.
The message producer https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the producer with the prefix removed.
The `topic` is set by Debezium.
|[[pulsar-null-key]]<<pulsar-null-key, `debezium.sink.pulsar.null.key`>>
|`default`
|Tables without primary key sends messages with `null` key.
This is not supported by Pulsar so a surrogate key must be used.
|===
===== Injection points
The Pulsar sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[pulsar-ext-stream-name-mapper]]<<pulsar-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Pulsar topic name. By default the same name is used.
|===
==== Azure Event Hubs
https://docs.microsoft.com/azure/event-hubs/event-hubs-about[Azure Event Hubs] is a big data streaming platform and event ingestion service that can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[eventhubs-type]]<<eventhubs-type, `debezium.sink.type`>>
|
|Must be set to `eventhubs`.
|[[connection-string]]<<connection-string, `debezium.sink.eventhubs.connectionstring`>>
|
|https://docs.microsoft.com/azure/event-hubs/event-hubs-get-connection-string[Connection string] required to communicate with Event Hubs. The format is: `Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>`
|[[hub-name]]<<hub-name, `debezium.sink.eventhubs.hubname`>>
|
|Name of the Event Hub
|[[partition-id]]<<partition-id, `debezium.sink.eventhubs.partitionid`>>
|
|(Optional) The identifier of the Event Hub partition that the events will be sent to. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionkey`
|[[partition-key]]<<partition-key, `debezium.sink.eventhubs.partitionkey`>>
|
|(Optional) The partition key will be used to hash the events. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionid`
|[[max-batch-size]]<<max-batch-size, `debezium.sink.eventhubs.maxbatchsize`>>
|
|Sets the maximum size for the batch of events, in bytes.
|===
===== Injection points
The default sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[eventhubs-ext-client]]<<eventhubs-ext-client, `com.azure.messaging.eventhubs.EventHubProducerClient`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `EventHubProducerClient` used to send messages.
|===
==== Redis (Stream)
Redis is an open source (BSD licensed) in-memory data structure store, used as a database, cache and message broker.
The Stream is a data type which models a _log data structure_ in a more abstract way. It implements powerful operations to overcome the limitations of a log file.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[redis-type]]<<redis-type, `debezium.sink.type`>>
|
|Must be set to `redis`.
|[[redis-address]]<<redis-address, `debezium.sink.redis.address`>>
|
|An address, formatted as `host:port`, at which the Redis target streams are provided.
|[[redis-user]]<<redis-user, `debezium.sink.redis.user`>>
|
|(Optional) A user name used to communicate with Redis.
|[[redis-password]]<<redis-password, `debezium.sink.redis.password`>>
|
|(Optional) A password (of respective user) used to communicate with Redis. A password must be set if a user is set.
|[[redis-null-key]]<<redis-null-key, `debezium.sink.redis.null.key`>>
|`default`
|Redis does not support the notion of data without key.
So this string will be used as key for records without primary key.
|[[redis-null-value]]<<redis-null-value, `debezium.sink.redis.null.value`>>
|`default`
|Redis does not support the notion of null payloads, as is the case with tombstone events.
So this string will be used as value for records without a payload.
|===
===== Injection points
The Redis sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[redis-client]]<<redis-client, `redis.clients.jedis.Jedis`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `Jedis` used to send messages to target streams.
|[[redis-ext-stream-name-mapper]]<<redis-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical Redis stream name.
By default the same name is used.
|===
==== NATS Streaming
https://docs.nats.io/nats-streaming-concepts/intro[NATS Streaming] is a data streaming system powered by NATS, and written in the Go programming language.
[cols="35%a,10%a,55%a"]
|===
|Property
|Default
|Description
|[[nats-streaming-type]]<<nats-streaming-type, `debezium.sink.type`>>
|
|Must be set to `nats-streaming`.
|[[nats-streaming-url]]<<nats-streaming-url, `debezium.sink.nats-streaming.url`>>
|
| URL (or comma separated list of URLs) to a node or nodes in the cluster formatted as `nats://host:port`.
|[[nats-streaming-cluster-id]]<<nats-streaming-cluster-id, `debezium.sink.nats-streaming.cluster.id`>>
|
|NATS Streaming Cluster ID.
|[[nats-streaming-client-id]]<<nats-streaming-client-id, `debezium.sink.nats-streaming.client.id`>>
|
|NATS Streaming Client ID.
|===
===== Injection points
The NATS Streaming sink behavior can be modified by a custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[streaming_connection]]<<streaming_connection, `io.nats.streaming.StreamingConnection`>>
|`@CustomConsumerBuilder`
|Custom configured instance of a `StreamingConnection` used to publish messages to target subjects.
|[[nats-streaming-ext-stream-name-mapper]]<<nats-streaming-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (topic) name into a physical NATS Streaming subject name.
By default the same name is used.
|===
==== Apache Kafka
https://kafka.apache.org/[Apache Kafka] is a popular open-source platform for distributed event streaming. Debezium server supports publishing captured change events to a configured Kafka message broker.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[kafka-type]]<<kafka-type, `debezium.sink.type`>>
|
|Must be set to `kafka`.
|[[kafka-producer]]<<kafka-producer, `debezium.sink.kafka.producer.*`>>
|
|The Kafka sink adapter supports pass-through configuration.
This means that all Kafka producer https://kafka.apache.org/documentation/#producerconfigs[configuration properties] are passed to the producer with the prefix removed.
At least `bootstrap.servers`, `key.serializer` and `value.serializer` properties must be provided. The `topic` is set by Debezium.
|===
==== Pravega
https://pravega.io/[Pravega] is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega. The transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed.
The Pravega sink expects destination scope and streams to already be created.
[cols="35%a,10%a,55%a",options="header"]
|===
|Property
|Default
|Description
|[[pravega-type]]<<pravega-type, `debezium.sink.type`>>
|
|Must be set to `pravega`.
|[[pravega-url]]<<pravega-url, `debezium.sink.pravega.controller.uri`>>
|`tcp://localhost:9090`
|The connection string to a Controller in the Pravega cluster.
|[[pravega-scope]]<<pravega-scope, `debezium.sink.pravega.scope`>>
|
|The name of the scope in which to find the destination streams.
|[[pravega-transaction]]<<pravega-transaction, `debezium.sink.pravega.transaction`>>
|`false`
|Set to `true` to have the sink use Pravega transactions for each Debezium batch.
|===
===== Injection points
Pravega sink behavior can be modified by custom logic providing alternative implementations for specific functionalities.
When the alternative implementations are not available then the default ones are used.
[cols="35%a,10%a,55%a"]
|===
|Interface
|CDI classifier
|Description
|[[pravega-ext-stream-name-mapper]]<<pravega-ext-stream-name-mapper, `io.debezium.server.StreamNameMapper`>>
|
|Custom implementation maps the planned destination (stream) name into a physical Pravega stream name.
By default the same name is used.
|===
== Extensions
{prodname} Server uses the https://quarkus.io/[Quarkus framework] and relies on dependency injection to enable developer to extend its behaviour.
Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM.
The server can be extended in two ways by providing a custom logic:
* implementation of a new sink
* customization of an existing sink - i.e. non-standard configuration
=== Implementation of a new sink
The new sink can be implemented as a CDI bean implementing interface `DebeziumEngine.ChangeConsumer` and with annotation `@Named` and unique name and scope `@Dependent`.
The name of the bean is used as the `debezium.sink.type` option.
The sink needs to read the configuration using Microprofile Config API.
The execution path must pass the messages into the target system and regularly commit the passed/processed messages.
See the https://github.com/debezium/debezium/blob/main/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java[Kinesis sink] implementation for further details.
=== Customization of an existing sink
Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink.
Typical examples are fine tuning of the target client setup, the destination naming etc.
See an example of a custom https://github.com/debezium/debezium-examples/tree/main/debezium-server-name-mapper[topic naming policy] implementation for further details.