This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive.
Please let us know if you encounter any problems while using this feature.
Also please reach out if you have requirements for specific sinks to be supported by {prodname} Server or even would be interested in contributing the required implementation.
{prodname} provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Amazon Kinesis, Google Cloud Pub/Sub, Apache Pulsar or Redis (Stream).
* https://repo1.maven.org/maven2/io/debezium/debezium-server-dist/{debezium-version}/debezium-server-dist-{debezium-version}.tar.gz[{prodname} Server distribution]
2020-05-15 11:33:12,189 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using 'io.debezium.server.kinesis.KinesisChangeConsumer$$Lambda$119/0x0000000840130c40@f58853c' stream name mapper
2020-05-15 11:33:12,628 INFO [io.deb.ser.kin.KinesisChangeConsumer] (main) Using default KinesisClient 'software.amazon.awssdk.services.kinesis.DefaultKinesisClient@d1f74b8'
2020-05-15 11:33:12,628 INFO [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.kinesis.KinesisChangeConsumer' instantiated
2020-05-15 11:33:12,754 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,757 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = false
2020-05-15 11:33:12,763 INFO [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
admin.listeners = null
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
config.providers = []
connector.client.config.override.policy = None
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 0
offset.flush.timeout.ms = 5000
offset.storage.file.filename = data/offsets.dat
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.extension.classes = []
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
topic.tracking.allow.reset = true
topic.tracking.enable = true
value.converter = class org.apache.kafka.connect.json.JsonConverter
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.key.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,763 INFO [org.apa.kaf.con.run.WorkerConfig] (main) Worker configuration property 'internal.value.converter' is deprecated and may be removed in an upcoming release. The specified value 'org.apache.kafka.connect.json.JsonConverter' matches the default, so this property can be safely removed from the worker configuration.
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,765 INFO [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
schemas.cache.size = 1000
schemas.enable = true
2020-05-15 11:33:12,767 INFO [io.deb.ser.DebeziumServer] (main) Engine executor started
2020-05-15 11:33:12,773 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Starting FileOffsetBackingStore with file data/offsets.dat
2020-05-15 11:33:12,835 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Starting PostgresConnectorTask with configuration:
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) connector.class = io.debezium.connector.postgresql.PostgresConnector
2020-05-15 11:33:12,837 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.flush.interval.ms = 0
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.user = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.dbname = postgres
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) offset.storage.file.filename = data/offsets.dat
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.hostname = localhost
2020-05-15 11:33:12,838 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.password = ********
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) name = kinesis
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.server.name = tutorial
2020-05-15 11:33:12,839 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) database.port = 5432
2020-05-15 11:33:12,908 INFO [io.quarkus] (main) debezium-server 1.2.0-SNAPSHOT (powered by Quarkus 1.4.1.Final) started in 1.198s. Listening on: http://0.0.0.0:8080
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Profile prod activated.
2020-05-15 11:33:12,911 INFO [io.quarkus] (main) Installed features: [cdi, smallrye-health]
The source configuration uses the same configuration properties that are described on the specific connector documentation pages (just with `debezium.source` prefix), together with few more specific ones, necessary for running outside of Kafka Connect:
|(Optional) If using Redis to store offsets, an address, formatted as `host:port`, at which the Redis target streams are provided. If not supplied, will attempt to read `debezium.sink.redis.address`
|(Optional) If using Redis to store offsets, a user name used to communicate with Redis. If the `redis.address` configuration is not supplied, and the `redis.address` is taken from the Redis sink, will attempt to load the value from `debezium.sink.redis.user`
|(Optional) If using Redis to store offsets, a password (of respective user) used to communicate with Redis. A password must be set if a user is set. If the `redis.address` configuration is not supplied, and the `redis.address` is taken from the Redis sink, will attempt to load the value from `debezium.sink.redis.password`
|Some of the connectors (e.g MySQL, SQL Server, Db2, Oracle) monitors the database schema evolution over the time and stores the data in database history.
This is by default based on Kafka.
There are also other options available
* `io.debezium.relational.history.FileDatabaseHistory` for non-Kafka deployments
* `io.debezium.relational.history.MemoryDatabaseHistory` volatile store for test environments
The server supports https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect[single message transformations] defined by Kafka Connect.
The configuration will need to contain the list of transformations, implementation class for each transformation and configuration options for each of the transformations.
JSON logging can be disabled by setting `quarkus.log.console.json=false` in the _conf/application.properties_ file, as demonstrated in the _conf/application.properties.example_ file.
|Pub/Sub can optionally use a message key to guarantee the delivery of the messages in the https://googleapis.dev/java/google-api-grpc/latest/com/google/pubsub/v1/PubsubMessage.Builder.html#setOrderingKey-java.lang.String-[same order] as were sent for messages with the same order key.
|Once the number of bytes in the batched request reaches this threshold, send all of the messages in a single call, even if neither the delay or message count thresholds have been exceeded yet.
The client https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the client with the prefix removed.
The message producer https://pulsar.apache.org/docs/en/client-libraries-java/#client-configuration[configuration properties] are passed to the producer with the prefix removed.
https://docs.microsoft.com/azure/event-hubs/event-hubs-about[Azure Event Hubs] is a big data streaming platform and event ingestion service that can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.
|https://docs.microsoft.com/azure/event-hubs/event-hubs-get-connection-string[Connection string] required to communicate with Event Hubs. The format is: `Endpoint=sb://<NAMESPACE>/;SharedAccessKeyName=<ACCESS_KEY_NAME>;SharedAccessKey=<ACCESS_KEY_VALUE>`
|(Optional) The identifier of the Event Hub partition that the events will be sent to. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionkey`
|(Optional) The partition key will be used to hash the events. Use this if you want all the change events received by Debezium to be sent to a specific partition in Event Hubs. Do not use if you have specified `debezium.sink.eventhubs.partitionid`
Redis is an open source (BSD licensed) in-memory data structure store, used as a database, cache and message broker.
The Stream is a data type which models a _log data structure_ in a more abstract way. It implements powerful operations to overcome the limitations of a log file.
https://docs.nats.io/nats-streaming-concepts/intro[NATS Streaming] is a data streaming system powered by NATS, and written in the Go programming language.
https://kafka.apache.org/[Apache Kafka] is a popular open-source platform for distributed event streaming. Debezium server supports publishing captured change events to a configured Kafka message broker.
This means that all Kafka producer https://kafka.apache.org/documentation/#producerconfigs[configuration properties] are passed to the producer with the prefix removed.
At least `bootstrap.servers`, `key.serializer` and `value.serializer` properties must be provided. The `topic` is set by Debezium.
https://pravega.io/[Pravega] is a cloud-native storage system for event streams and data streams. This sink offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega. The transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed.
* customization of an existing sink - i.e. non-standard configuration
=== Implementation of a new sink
The new sink can be implemented as a CDI bean implementing interface `DebeziumEngine.ChangeConsumer` and with annotation `@Named` and unique name and scope `@Dependent`.
See the https://github.com/debezium/debezium/blob/main/debezium-server/debezium-server-kinesis/src/main/java/io/debezium/server/kinesis/KinesisChangeConsumer.java[Kinesis sink] implementation for further details.
See an example of a custom https://github.com/debezium/debezium-examples/tree/main/debezium-server-name-mapper[topic naming policy] implementation for further details.