From d6709b56fc84910a1afb3f6e260ed5f2dfb6f989 Mon Sep 17 00:00:00 2001 From: Bob Roldan Date: Mon, 9 May 2022 21:46:22 -0400 Subject: [PATCH] DBZ-4588 Custo converters downstream edits & modularization comments --- .../ROOT/pages/development/converters.adoc | 197 ++++++++++++++---- 1 file changed, 153 insertions(+), 44 deletions(-) diff --git a/documentation/modules/ROOT/pages/development/converters.adoc b/documentation/modules/ROOT/pages/development/converters.adoc index 9c32f5066..1aa54c7a9 100644 --- a/documentation/modules/ROOT/pages/development/converters.adoc +++ b/documentation/modules/ROOT/pages/development/converters.adoc @@ -1,12 +1,16 @@ +// Category: debezium-using +// Type: assembly +// ModuleID: developing-debezium-custom-data-type-converters +// Title: Developing {prodname} custom data type converters [id="custom-converters"] = Custom Converters +ifdef::community[] :source-highlighter: highlight.js :toc: :toc-placement: macro :linkattrs: :icons: font - toc::[] [NOTE] @@ -15,58 +19,115 @@ This feature is currently in incubating state, i.e. exact semantics, configurati ==== == Datatype Conversion +endif::community[] -The {prodname} connectors map database column types to corresponding Kafka Connect schema types and convert the column values accordingly. -The specific column type mappings are documented for each of the connectors and represent a reasonable default behavior for most of the time. -It is still possible that an application requires a specific handling of a certain column type or specific column due to downstream system requirements. -For instance you might want to export temporal column values as a formatted string instead of milli-seconds since epoch. +ifdef::product[] +[IMPORTANT] +==== +The use of custom-developed converters is a Technology Preview feature only. +Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete. +Red Hat does not recommend using them in production. +These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process. +For more information about the support scope of Red Hat Technology Preview features, see link:https://access.redhat.com/support/offerings/techpreview[https://access.redhat.com/support/offerings/techpreview]. +==== +endif::product[] -For this purpose {prodname} provides an extension point that allows users to inject their own converters based on their business requirements. -The converters are written in Java and are enabled and configured via connector properties. +Each field in a {prodname} change event record represents a field or column in the source table or data collection. +The connector converts data types in the source to a corresponding Kafka Connect schema types. +Column values are likewise converted to match the schema type of the destination field. +For each connector, a default mapping specifies how the connector converts each data type. +The documentation for each connector provides details about the default mappings that the connector uses to convert data types. -During connector startup, all configured converters are instantiated and placed in an registry. -While the connector-internal schema representation is built, every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column or field. +The default mappings are sufficient to satisfy most needs, but for some applications it might be necessary to apply an alternate mapping. +For example, the default mapping for a column might export values using the format of milliseconds since the UNIX epoch, but you have a downstream application that requires the values to be formatted strings. +To customize data type mappings you can develop and deploy custom converters. +You can configure a custom converter to apply to all columns of a certain type, or to a specific table column only. +The converter function intercepts conversion requests for columns that match a specified criteria, and performs the specified format conversion. +The converter ignores columns that do not match the specified criteria. -Whenever a new change is processed by {prodname}, the converter is invoked to execute the actual conversion for the registered columns or fields. +Custom converters are Java classes that implement the Debezium service provider interface (SPI). +You enable and configure a custom converter by setting the `converters` property in the connector configuration. +The `converters` property defines the criteria for identifying the columns that you want the converter to process and provides other details that determine conversion behavior. -== Implementing Converters +After you start a connector, any converters that are enabled in the connector configuration are instantiated and are added to a registry. +The registry associates each converter with the columns or fields for it to process. +Whenever {prodname} processes a new change event, it invokes the configured converter to convert the columns or fields for which it is registered. -The converter implementation is a Java class that implements the interface `io.debezium.spi.converter.CustomConverter`: +// Type: procedure +// Title: Creating a {prodname} custom data type converter +// ModuleID: creating-a-debezium-custom-data-type-converter +[id="implementing-a-custom-converter"] +== Implementing custom converters + +The following example shows a converter implementation of a Java class that implements the interface `io.debezium.spi.converter.CustomConverter`: [source,java,indent=0] ---- public interface CustomConverter { @FunctionalInterface - interface Converter { + interface Converter { // <1> Object convert(Object input); } - public interface ConverterRegistration { - void register(S fieldSchema, Converter converter); + public interface ConverterRegistration { // <2> + void register(S fieldSchema, Converter converter); // <3> } void configure(Properties props); - void converterFor(F field, ConverterRegistration registration); + void converterFor(F field, ConverterRegistration registration); // <4> } ---- +<1> A function for converting data from one type to another. +<2> Callback for registering a converter. +<3> Registers the given schema and converter for the current field. +Should not be invoked more than once for the same field. +<4> Registers the customized value and schema converter for use with a specific field. -The method `configure()` is used to pass converter configuration options into the converter after its instantiation, so it can modify its runtime behaviour for each specific instance. -The method `converterFor()` is invoked by {prodname} and the converter is required to call `registration` in case of taking responsibility for the conversion. -The registration provides the target schema definition and the actual conversion code. -Schemas are currently represented using Kafka Connect's `SchemaBuilder` API. +.Custom converter methods +The `configure()` and `converterFor()` methods are mandatory for each {prodname} custom converter: + +`configure()`:: +Passes the properties specified in the connector configuration to the converter instance. +The `configure` method runs when the connector is initialized. +You can use a converter with multiple connectors and modify its behavior based on the connector's property settings. + +The `configure` method accepts the following argument: + +`props`::: Contains the properties to pass to the converter instance. +Each property specifies the format for converting the values of a particular type of column. + +`converterFor()`:: +Registers the converter to process specific columns or fields in the data source. +{prodname} invokes the `converterFor()` method to prompt the converter to call `registration` for the conversion. +The `converterFor` method runs once for each column. + +The method accepts the following arguments: + +`field`::: +An object that passes metadata about the field or column that is processed. +The column metadata can include the name of the column or field, the name of the table or collection, the data type, size, and so forth. + +`registration`::: +An object of type `io.debezium.spi.converter.CustomConverter.ConverterRegistration` that provides the target schema definition and the code for converting the column data. +The converter calls the `registration` parameter when the source column matches the type that the converter should process. + calls the `register` method to define the converter for each column in the schema. +Schemas are represented using the Kafka Connect link:https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/data/SchemaBuilder.html[`SchemaBuilder`] API. +ifdef::community[] In the future, an independent schema definition API will be added. -The metadata about the column or field are passed via the `field` parameter. -They contain information like table or collection name, column or field name, type name, and others. +endif::community[] -The following example implements a simple converter that will: +The following example implements a simple converter that performs the following operations: - * accept one parameter named `schema.name` - * register itself for every column of type `isbn` with - ** the target `STRING` schema named according to the `schema.name` parameter - ** the conversion code that converts the ISBN data to `String` - +* Runs the `configure` method, which configures the converter based on the value of the `schema.name` property that is specified in the connector configuration. +The converter configuration is specific to each instance. +* Runs the `converterFor` method, which registers the converter to process values in source columns for which the data type is set to `isbn`. +** Identifies the target `STRING` schema based on the value that is specified for the `schema.name` property. +** Converts ISBN data in the source column to `String` values. + +=== {prodname} custom converter example +[id="example-debezium-simple-custom-converter"] +.A simple custom converter +==== [source,java,indent=0] ---- public static class IsbnConverter implements CustomConverter { @@ -88,38 +149,86 @@ The following example implements a simple converter that will: } } ---- +==== + +// Type: procedure +[id="debezium-and-kafka-connect-api-module-dependencies"] +=== {prodname} and Kafka Connect API module dependencies + +The converter code depends on the {prodname} and Kafka Connect API library modules. +To enable your converter code to compile, add these dependencies to your converter Java project as shown in the following example: -To compile the code it is necessary to provide dependencies to the `debezium-api` and `connect-api` modules like: [source,xml] ---- io.debezium debezium-api - ${version.debezium} + ${version.debezium} // <1> org.apache.kafka connect-api - ${version.kafka} + ${version.kafka} <2> ---- +<1> `${version.debezium}` represents the version of the {prodname} connector. +<2> `${version.kafka}` represents the version of Apache Kafka in your environment. -where `${version.debezium}` and `${version.kafka}` are the versions of {prodname} and Apache Kafka, respectively. - +// Type: assembly +// Title: Using custom converters with {prodname} connectors +// ModuleID: deploying-and-configuring-debezium-custom-data-type-converters +[id="configuring-and-using-converters"] == Configuring and Using Converters -After the converter is developed it has to be deployed in a JAR file side-by-side with the {prodname} connector JARs. -To enable the converter for a given connector instance it is necessary to provide the connector options like this: +To use the converter with a connector, you deploy the converter JAR file alongside the connector file, and then configure the connector to use the converter. +// Type: procedure +[id="deploying-a-debezium-custom-converter"] +=== Deploying a custom converter + +.Procedure +* To use a custom converter with a {prodname} connector, export the Java project to a JAR file, and add the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. + + + +For example, in a typical deployment, you might store {prodname} connector files in subdirectories of a Kafka Connect directory, such as `/kafka/connect`, +and then store the JAR for each connector in its own subdirectory (`debezium-connector-db2`, `debezium-connector-mysql`, and so forth). +To use a converter with a connector, add the converter JAR file to the connector subdirectory. + +NOTE: To use a converter with multiple connectors, add the connector JAR file to the directory for each of the connectors. + +// Type: procedure +[id="configuring-a-connectors-to-use-a-custom-converter"] +=== Configuring a connector to use a custom converter + +Custom converters act on specific columns or column types in a source table to specify how to convert their data types. +To enable a connector to use the custom converter, you add properties to the connector configuration that specify the converter name and class. +If the converter requires further information to customize the formats of specific data types, you can also define other coniguration options to provide that information. + +.Prerequisites +* You have a custom converter Java program. + +.Procedure + +* Enable a converter for a connector instance by adding the following mandatory properties to the connector configuration: ++ +[subs="+quotes"] ---- -converters=isbn -isbn.type=io.debezium.test.IsbnConverter -isbn.schema.name=io.debezium.postgresql.type.Isbn +converters: __ // <1> +__.type: __ // <2> +---- +<1> The `converters` property is mandatory and enumerates a comma-separated list of symbolic names of the converter instances to use with the connector. +The values listed for this property serve as prefixes in the names of other properties that you specify for the converter. +<2> The `__.type` property is mandatory, and specifies the name of the class that implements the converter. +For example, for the earlier xref:example-debezium-simple-custom-converter[custom converter example], you would add the following properties to the connector configuration: ++ +---- +converters: isbn +isbn.type: io.debezium.test.IsbnConverter ---- -The option `converters` is mandatory and enumerates comma-separated symbolic names of converter instances to be used. -The symbolic names are used as a prefix for further configuration options. - -`isbn.type` (generally `.type`) is mandatory and is the name of the class that implements the converter. - -`isbn.schema.name` is a converter parameter that is passed to the converter's `configure` method as `schema.name`. +* If provide further configuration properties for a converter, prefix the property names with the symbolic name of the converter, followed by a dot (`.`). + The symbolic name is label that you specify as a value for the `converters` property. + For example, to add a property for the preceding `isbn` converter to specify the `schema.name` to pass to the `configure` method in the converter code, add the following property: ++ +---- +isbn.schema.name: io.debezium.postgresql.type.Isbn +----