= Debezium Server include::../_attributes.adoc[] :linkattrs: :icons: font :toc: :toc-placement: macro toc::[] [NOTE] ==== This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this feature. Also please reach out if you have requirements for specific sinks to be supported by Debezium Server (Apache Pulsar, NATS, etc.) or even would be interested in contributing the required implementation. ==== Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis or Google Pub/Sub. For streaming change events to Apache Kafka, it is recommended to deploy the Debezium connectors via Kafka Connect. == Installation To install the server download and unpack the server distribution archive: ifeval::['{page-version}' == 'master'] * {link-server-snapshot}[Debezium Server distribution] NOTE: The above links refers to the nightly snapshot build of the Debezium master branch. If you are looking for a non-snapshot version, please select the appropriate version of this documentation in the menu to the right. endif::[] ifeval::['{page-version}' != 'master'] * https://repo1.maven.org/maven2/io/debezium/debezium-server/{debezium-version}/debezium-server-{debezium-version}-distribution.tar.gz[Debezium Server distribution] endif::[] A directory named `debezium-server` will be created with these contents: ---- debezium-server/ |-- CHANGELOG.md |-- conf |-- CONTRIBUTE.md |-- COPYRIGHT.txt |-- debezium-server-{debezium-version}-runner.jar |-- lib |-- LICENSE-3rd-PARTIES.txt |-- LICENSE.txt |-- README.md `-- run.sh ---- The server is started using `run.sh` script, dependencies are stored in the `lib` directory, and the directory `conf` contains configuration files. == Configuration Debezium Server uses https://github.com/eclipse/microprofile-config[MicroProfile Configuration] for configuration. This means that the application can be configured from disparate sources like configuration files, environment variables, system properties etc. The main configuration file is _conf/application.properties_. There are multiple sections configured: * `debezium.source` is for source connector configuration; each instance of Debezium Server runs exactly one connector * `debezium.sink` is for the sink system configuration * `debezium.format` is for the output serialization format configuration * `debezium.transforms` is for the configuration of message transformations An example configuration file can look like so: ---- debezium.sink.type=kinesis debezium.sink.kinesis.region=eu-central-1 debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=postgres debezium.source.database.password=postgres debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.whitelist=inventory ---- When the server is started it generates a seqeunce of log messages like this: ---- __ ____ __ _____ ___ __ ____ ______ --/ __ \/ / / / _ | / _ \/ //_/ / / / __/ -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \ --\___\_\____/_/ |_/_/|_/_/|_|\____/___/ 2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper 2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8' 2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated 2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: converter.type = key decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true 2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = false 2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values: access.control.allow.methods = access.control.allow.origin = admin.listeners = null bootstrap.servers = [localhost:9092] client.dns.lookup = default config.providers = [] connector.client.config.override.policy = None header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter internal.key.converter = class org.apache.kafka.connect.json.JsonConverter internal.value.converter = class org.apache.kafka.connect.json.JsonConverter key.converter = class org.apache.kafka.connect.json.JsonConverter listeners = null metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 offset.flush.interval.ms = 0 offset.flush.timeout.ms = 5000 offset.storage.file.filename = data/offsets.dat offset.storage.partitions = null offset.storage.replication.factor = null offset.storage.topic = plugin.path = null rest.advertised.host.name = null rest.advertised.listener = null rest.advertised.port = null rest.extension.classes = [] rest.host.name = null rest.port = 8083 ssl.client.auth = none task.shutdown.graceful.timeout.ms = 5000 topic.tracking.allow.reset = true topic.tracking.enable = true value.converter = class org.apache.kafka.connect.json.JsonConverter 2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration. 2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration. 2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: converter.type = key decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true 2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values: converter.type = value decimal.format = BASE64 schemas.cache.size = 1000 schemas.enable = true 2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started 2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat 2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration: 2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector 2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost 2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ******** 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.server.name = tutorial 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432 2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) schema.whitelist = inventory 2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080 2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated. 2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health] ---- === Source configuration The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with `debezium.source` prefix), together with few more specific ones, necessary for running outside of Kafka Connect: [cols="35%a,10%a,55%a",options="header"] |======================= |Property |Default |Description |[[source-connector-class]]<> | |The name of the Java class implementing the source connector. |[[source-offset-storage-file-filename]]<> | |The file in which connector offsets are stored for non-Kafka deployments. |[[source-offset-flush-interval-ms]]<> | |Defines how frequently the offsets are flushed into the file. |======================= === Format configuration The message output format can be configured for both key and value separately. By default the output is in JSON format but an arbitrary implementation of Kafka Connect's `Converter` can be used. [cols="35%a,10%a,55%a",options="header"] |======================= |Property |Default |Description |[[debezium-format-key]]<> |`json` |The name of the output format for key, one of `json`/`avro`. |[[debezium-format-key-props]]<> | |Configuration properties passed to the key converter. |[[debezium-format-value]]<> |`json` |The name of the output format for value, one of `json`/`avro`. |[[debezium-format-value-props]]<> | |Configuration properties passed to the value converter. |======================= === Transformation configuration Before the messages are delivered to the sink, they can run through a sequence of transformations. The server supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transformations] defined by Kafka Connect. The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations. [cols="35%a,10%a,55%a",options="header"] |======================= |Property |Default |Description |`debezium.transformations` | |The comma separated list of symbolic names of transformations. |`debezium.transformations..class` | |The name of Java class implementing the transformation with name ``. |`debezium.transformations..*` | |Configuration properties passed to the transformation with name ``. |======================= === Sink configuration Sink configuration is specific for each sink type. Currently the only supported sink is https://aws.amazon.com/kinesis/[Amazon Kinesis]. The sink is selected by configuration property `debezium.sink.type`. ==== Amazon Kinesis Amazon Kinesis is an implementation of data streaming system with support for stream sharding and other techniques for high scalability. Kinesis exposes a set of REST APIs and provides a (not-only) Java SDK that is used to implement the sink. [cols="35%a,10%a,55%a",options="header"] |======================= |Property |Default |Description |[[kinesis-type]]<> | |Must be set to `kinesis`. |[[kinesis-region]]<> | |A region name in which the Kinesis target streams are provided. |[[kinesis-credentials-profile]]<> |`default` |A credentials profile name used to communicate with Amazon API. |[[kinesis-null-key]]<> |`default` |Kinesis does not support the notion of messages without key. So this string will be used as message key for messages from tables without primary key. |======================= ==== Injection points The Kinesis sink behaviour can be modified by a custom logic providing alternative implementations for specific functionalities. When the alternative implementations are not available then the default ones are used. [cols="35%a,10%a,55%a",options="header"] |======================= |Interface |CDI classifier |Description |[[kinesis-ext-client]]<> |`@CustomConsumerBuilder` |Custom configured instance of a `KinesisClient` used to send messages to target streams. |[[kinesis-ext-stream-name-mapper]]<> | |Custom implementation maps the planned destination (topic) name into a physical Kinesis stream name. By default the same name is used. |======================= == Extensions Debezium Server uses the https://quarkus.io/[Quarkus framework] and relies on dependency injection to enable developer to extend its behaviour. Note that only the JVM mode of Quarkus is supported, but not native execution via GraalVM. The server can be extended in two ways by providing a custom logic: * implementation of a new sink * customization of an existing sink - i.e. non-standard configuration === Implementation of a new sink The new sink can be implemented as a CDI bean implementing interface `DebeziumEngine.ChangeConsumer` and with annotation `@Named` and unique name and scope `@Dependent`. The name of the bean is used as the `debezium.sink.type` option. The sink needs to read the configuration using Microprofile Config API. The execution path must pass the messages into the target system and regularly commit the passed/processed messages. See the https://github.com/debezium/debezium/blob/master/debezium-server/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java[Kinesis sink] implementation for further details. === Customization of an existing sink Some of the sinks exposes dependency injections points that enable users to provide its own bean that would modify the behaviour of the sink. Typical examples are fine tuning of the target client setup, the destination naming etc.