217 lines
8.0 KiB
Plaintext
217 lines
8.0 KiB
Plaintext
// Category: debezium-using
|
|
// Type: assembly
|
|
// ModuleID: configuring-debezium-to-auto-create-change-data-capture-topics
|
|
// Title: Configuring {prodname} to use automatically create topics
|
|
[id="cdc-topic-auto-create-config"]
|
|
= Custom Topic Auto-Creation
|
|
|
|
:toc:
|
|
:toc-placement: macro
|
|
:linkattrs:
|
|
:icons: font
|
|
:source-highlighter: highlight.js
|
|
|
|
toc::[]
|
|
|
|
{prodname} automatically creates *internal* topics for offsets, connector status, config
|
|
storage and history topics. The destination topics for the captured tables will be
|
|
automatically created with a default config by the Kafka brokers when
|
|
`auto.create.topics.enable` is enabled.{empty} +
|
|
When topic creation is disabled on the brokers, for example in production environments,
|
|
or when the topics need a different configuration then these topics have to be created
|
|
upfront either automated in a custom deployment process or manually until Kafka Connect 2.6.
|
|
|
|
Since Kafka 2.6.0 Kafka Connect supports customizable topic auto-creation.
|
|
|
|
== Kafka Connect
|
|
|
|
Kafka Connect since Kafka 2.6.0 comes with topic creation enabled:
|
|
|
|
[source,options="nowrap",shell]
|
|
----
|
|
topic.creation.enable = true
|
|
----
|
|
|
|
// TODO: how to express this for downstream?
|
|
[NOTE]
|
|
====
|
|
If you don't want to allow automatic topic creation by connectors you can set this value to `false`
|
|
in the Kafka Connect config (_connect-distributed.properties_ file or via environment variable
|
|
_CONNECT_TOPIC_CREATION_ENABLE_ when using https://hub.docker.com/r/debezium/connect[Debezium's container image for Kafka Connect]).
|
|
====
|
|
|
|
== Configuration
|
|
|
|
Topic auto-creation is based on groups. Every custom group has an `include` and an
|
|
`exclude` property which are comma-separated lists of regular expressions matching
|
|
topic names that should be included or excluded.
|
|
|
|
[NOTE]
|
|
====
|
|
You can specify both, `include` and `exclude` parameters, but note that exclusion rules
|
|
have precedent and override any inclusion rules for topics.
|
|
====
|
|
|
|
You don't have to specify any custom group. When there's no custom group registered or
|
|
the registered group's `include` patterns don't match the topic which is to be created
|
|
then the default config will be used.
|
|
|
|
You can specify all https://kafka.apache.org/documentation/#topicconfigs[*topic level configuration parameters*]
|
|
to customize how topics will be created.
|
|
|
|
=== Default Group Config
|
|
|
|
The default config can be passed in the connector config JSON like:
|
|
|
|
[source,options="nowrap",json]
|
|
----
|
|
{
|
|
...
|
|
|
|
"topic.creation.default.replication.factor": 3, //<1>
|
|
"topic.creation.default.partitions": 10, //<2>
|
|
"topic.creation.default.cleanup.policy": "compact", //<3>
|
|
"topic.creation.default.compression.type": "lz4" //<4>
|
|
|
|
...
|
|
}
|
|
----
|
|
|
|
.Connector configuration for the `default` topic creation group
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Item |Description
|
|
|
|
|1
|
|
|`topic.creation.default.replication.factor` defines the replication factor for topics created by
|
|
the default group.
|
|
|
|
|2
|
|
|`topic.creation.default.partitions` defines the number of partitions for topics created by
|
|
the default group.
|
|
|
|
|3
|
|
|`topic.creation.default.cleanup.policy` is mapped to the https://kafka.apache.org/documentation/#cleanup.policy[`cleanup.policy`]
|
|
property of the https://kafka.apache.org/documentation/#topicconfigs[topic level configuration parameters] and
|
|
defines the log retention policy.
|
|
|
|
|4
|
|
|`topic.creation.default.compression.type` is mapped to the https://kafka.apache.org/documentation/#compression.type[`compression.type`]
|
|
property of the https://kafka.apache.org/documentation/#topicconfigs[topic level configuration parameters] and
|
|
defines how messages are compressed on harddisk.
|
|
|===
|
|
|
|
As you can see, you can put every https://kafka.apache.org/documentation/#topicconfigs[topic level configuration parameter]
|
|
as property.
|
|
|
|
=== Custom Group Config
|
|
|
|
You can specify multiple groups. Similar to the `default` group you group properties together by
|
|
the group name. This will look like that in your connector JSON:
|
|
|
|
[source,options="nowrap",json]
|
|
----
|
|
{
|
|
...
|
|
|
|
//<1>
|
|
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*", //<2>
|
|
"topic.creation.inventory.replication.factor": 3,
|
|
"topic.creation.inventory.partitions": 20,
|
|
"topic.creation.inventory.cleanup.policy": "compact",
|
|
"topic.creation.inventory.delete.retention.ms": 7776000000,
|
|
|
|
//<3>
|
|
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", //<4>
|
|
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", //<5>
|
|
"topic.creation.applicationlogs.replication.factor": 1,
|
|
"topic.creation.applicationlogs.partitions": 20,
|
|
"topic.creation.applicationlogs.cleanup.policy": "delete",
|
|
"topic.creation.applicationlogs.retention.ms": 7776000000,
|
|
"topic.creation.applicationlogs.compression.type": "lz4",
|
|
|
|
...
|
|
}
|
|
----
|
|
|
|
.Connector configuration for custom `inventory` and `applicationlogs` topic creation groups
|
|
[cols="1,9",options="header"]
|
|
|===
|
|
|Item |Description
|
|
|
|
|1
|
|
|First we define the configuration for the `inventory` group.
|
|
|
|
|2
|
|
|`topic.creation.inventory.include` defines a regular expression to match all topics that start with
|
|
`dbserver1.inventory.`. The config defined for the `inventory` group will only be applied when the
|
|
topic name matches the given regular expression.
|
|
|
|
|3
|
|
|Then we define the configuration for the `applicationlogs` group.
|
|
|
|
|4
|
|
|`topic.creation.applicationlogs.include` defines a regular expression to match all topics that start
|
|
with `dbserver1.logs.applog-`. The config defined for the `applicationlogs` group will only be
|
|
applied when the topic name matches the given regular expression. As there's also the `exclude`
|
|
property defined at position *<5>* all topics matching this `include` regular expression might be
|
|
further restricted by the that `exlude` property.
|
|
|
|
|5
|
|
|`topic.creation.applicationlogs.exclude` defines a regular expression to match all topics that start
|
|
with `dbserver1.logs.applog-old-`. The config defined for the `applicationlogs` group will only be
|
|
applied when the topic name does *not* match the given regular expression. As there's also the
|
|
`include` property set for this group the `applicationlogs` group will only be applied to topics
|
|
which name matches the `include` regular expression/s *and* _not_ match the `exclude` regular
|
|
expression/s.
|
|
|===
|
|
|
|
=== Registering Custom Groups
|
|
|
|
Finally, we need to register the two defined custom groups `inventory` and `applicationlogs` with
|
|
the `topic.creation.groups` property:
|
|
|
|
[source,options="nowrap",json]
|
|
----
|
|
{
|
|
...
|
|
|
|
"topic.creation.groups": "inventory,applicationlogs",
|
|
|
|
...
|
|
}
|
|
----
|
|
|
|
A complete connector JSON config will look like that:
|
|
|
|
[source,options="nowrap",json]
|
|
----
|
|
{
|
|
...
|
|
|
|
"topic.creation.default.replication.factor": 3,
|
|
"topic.creation.default.partitions": 10,
|
|
"topic.creation.default.cleanup.policy": "compact",
|
|
"topic.creation.default.compression.type": "lz4"
|
|
"topic.creation.groups": "inventory,applicationlogs",
|
|
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
|
|
"topic.creation.inventory.replication.factor": 3,
|
|
"topic.creation.inventory.partitions": 20,
|
|
"topic.creation.inventory.cleanup.policy": "compact",
|
|
"topic.creation.inventory.delete.retention.ms": 7776000000,
|
|
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
|
|
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
|
|
"topic.creation.applicationlogs.replication.factor": 1,
|
|
"topic.creation.applicationlogs.partitions": 20,
|
|
"topic.creation.applicationlogs.cleanup.policy": "delete",
|
|
"topic.creation.applicationlogs.retention.ms": 7776000000,
|
|
"topic.creation.applicationlogs.compression.type": "lz4"
|
|
}
|
|
----
|
|
|
|
ifdef::community[]
|
|
== Additional resources
|
|
- Debezium Blog: https://debezium.io/blog/2020/09/15/debezium-auto-create-topics/[Auto-creating Debezium Change Data Topics]
|
|
- Kafka Improvement Proposal about adding topic auto-creation to Kafka Connect: https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics[KIP-158]
|
|
endif::community[]
|