// Category: debezium-using // Type: assembly // ModuleID: configuring-debezium-to-auto-create-change-data-capture-topics // Title: Configuring {prodname} to use automatically create topics [id="cdc-topic-auto-create-config"] = Customizing Topic Auto-Creation :toc: :toc-placement: macro :linkattrs: :icons: font :source-highlighter: highlight.js toc::[] {prodname} automatically creates *internal* topics for offsets, connector status, config storage and history topics. The destination topics for the captured tables will be automatically created with a default config by the Kafka brokers when `auto.create.topics.enable` is set to `true`.{empty} + When topic creation is disabled on the brokers, for example in production environments, or when the topics need a different configuration then these topics have to be created upfront either automated in a custom deployment process or manually until Kafka Connect 2.6. Since Kafka 2.6.0 Kafka Connect supports customizable topic auto-creation. == Set up Kafka Connect Kafka Connect since Kafka 2.6.0 comes with topic creation enabled: [source,options="nowrap",shell] ---- topic.creation.enable = true ---- ifdef::community[] [NOTE] ==== If you don't want to allow automatic topic creation by connectors you can set this value to `false` in the Kafka Connect config (_connect-distributed.properties_ file or via environment variable _CONNECT_TOPIC_CREATION_ENABLE_ when using https://hub.docker.com/r/debezium/connect[{prodname}'s container image for Kafka Connect]). ==== endif::community[] ifdef::product[] [NOTE] ==== If you don't want to allow automatic topic creation by connectors you can set this value to `false` in the Kafka Connect CRD: ==== [source,yaml,options="nowrap"] ---- apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaConnect metadata: name: my-connect-cluster ... spec: config: topic.creation.enable: "false" ---- endif::product[] == Configuration Topic auto-creation is based on groups. Every custom group has an `include` and an `exclude` property which are comma-separated lists of regular expressions matching topic names that should be included or excluded. [NOTE] ==== You can specify both, `include` and `exclude` parameters, but note that exclusion rules take precedence and override any inclusion rules for topics. ==== You don't have to specify any custom group. When there's no custom group registered or the registered group's `include` patterns don't match the topic which is to be created then the default config will be used. You can specify all {link-kafka-docs}/#topicconfigs[topic level configuration parameters] to customize how topics will be created. See {link-prefix}:{link-install-debezium}#_configuring_debezium_topics[Configuring {prodname} Topics] section in the {prodname} installation guide on generic topic configuration considerations. === Default group configuration The default config can be passed in the connector config JSON like: [source,options="nowrap",json] ---- { ... "topic.creation.default.replication.factor": 3, //<1> "topic.creation.default.partitions": 10, //<2> "topic.creation.default.cleanup.policy": "compact", //<3> "topic.creation.default.compression.type": "lz4" //<4> ... } ---- .Connector configuration for the `default` topic creation group [cols="1,9",options="header"] |=== |Item |Description |1 |`topic.creation.default.replication.factor` defines the replication factor for topics created by the default group.{empty} + `replication.factor` is mandatory for the `default` group but optional for custom groups. Custom groups will fallback to the `default` group's value if not set. Use `-1` to use the Kafka broker's default value. |2 |`topic.creation.default.partitions` defines the number of partitions for topics created by the default group.{empty} + `partitions` is mandatory for the `default` group but optional for custom groups. Custom groups will fallback to the `default` group's value if not set. Use `-1` to use the Kafka broker's default value. |3 |`topic.creation.default.cleanup.policy` is mapped to the {link-kafka-docs}/#cleanup.policy[`cleanup.policy`] property of the {link-kafka-docs}/#topicconfigs[topic level configuration parameters] and defines the log retention policy. |4 |`topic.creation.default.compression.type` is mapped to the {link-kafka-docs}/#compression.type[`compression.type`] property of the {link-kafka-docs}/#topicconfigs[topic level configuration parameters] and defines how messages are compressed on harddisk. |=== As you can see, you can use every {link-kafka-docs}/#topicconfigs[topic level configuration parameter] as property. [NOTE] ==== Note that `replication.factor` and `partitions` properties are mandatory for the `default` group and optional for custom groups. Custom groups will fallback to the `default` group's value if not set. Use `-1` to use the Kafka broker's default value.{empty} + This fallback does not apply to other config parameters. ==== === Custom group configuration You can specify multiple groups. Similar to the `default` group you group properties together by the group name. This will look like that in your connector JSON: [source,options="nowrap",json] ---- { ... //<1> "topic.creation.inventory.include": "dbserver1\\.inventory\\.*", //<2> "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, //<3> "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", //<4> "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", //<5> "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... } ---- .Connector configuration for custom `inventory` and `applicationlogs` topic creation groups [cols="1,9",options="header"] |=== |Item |Description |1 |First we define the configuration for the `inventory` group.{empty} + Note that `replication.factor` and `partitions` properties are optional for custom groups. Custom groups will fallback to the `default` group's value if not set. Use `-1` to use the Kafka broker's default value. |2 |`topic.creation.inventory.include` defines a regular expression to match all topics that start with `dbserver1.inventory.`. The config defined for the `inventory` group will only be applied when the topic name matches the given regular expression. |3 |Then we define the configuration for the `applicationlogs` group.{empty} + Note that `replication.factor` and `partitions` properties are optional for custom groups. Custom groups will fallback to the `default` group's value if not set. Use `-1` to use the Kafka broker's default value. |4 |`topic.creation.applicationlogs.include` defines a regular expression to match all topics that start with `dbserver1.logs.applog-`. The config defined for the `applicationlogs` group will only be applied when the topic name matches the given regular expression. As there's also the `exclude` property defined at position *<5>* all topics matching this `include` regular expression might be further restricted by the that `exlude` property. |5 |`topic.creation.applicationlogs.exclude` defines a regular expression to match all topics that start with `dbserver1.logs.applog-old-`. The config defined for the `applicationlogs` group will only be applied when the topic name does *not* match the given regular expression. As there's also the `include` property set for this group the `applicationlogs` group will only be applied to topics which name matches the `include` regular expression/s *and* _not_ match the `exclude` regular expression/s. |=== === Registering custom groups Finally, we need to register the two defined custom groups `inventory` and `applicationlogs` with the `topic.creation.groups` property: [source,options="nowrap",json] ---- { ... "topic.creation.groups": "inventory,applicationlogs", ... } ---- A complete connector JSON config will look like that: [source,options="nowrap",json] ---- { ... "topic.creation.default.replication.factor": 3, "topic.creation.default.partitions": 10, "topic.creation.default.cleanup.policy": "compact", "topic.creation.default.compression.type": "lz4" "topic.creation.groups": "inventory,applicationlogs", "topic.creation.inventory.include": "dbserver1\\.inventory\\.*", "topic.creation.inventory.replication.factor": 3, "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4" } ---- ifdef::community[] == Additional resources For more information on topic auto-creation you can have a look at these resources: * Debezium Blog: https://debezium.io/blog/2020/09/15/debezium-auto-create-topics/[Auto-creating Debezium Change Data Topics] * Kafka Improvement Proposal about adding topic auto-creation to Kafka Connect: https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics[KIP-158 Kafka Connect should allow source connectors to set topic-specific settings for new topics] endif::community[]