DBZ-4588 Custo converters downstream edits & modularization comments
This commit is contained in:
parent
9a81339b54
commit
d6709b56fc
@ -1,12 +1,16 @@
|
|||||||
|
// Category: debezium-using
|
||||||
|
// Type: assembly
|
||||||
|
// ModuleID: developing-debezium-custom-data-type-converters
|
||||||
|
// Title: Developing {prodname} custom data type converters
|
||||||
[id="custom-converters"]
|
[id="custom-converters"]
|
||||||
= Custom Converters
|
= Custom Converters
|
||||||
|
|
||||||
|
ifdef::community[]
|
||||||
:source-highlighter: highlight.js
|
:source-highlighter: highlight.js
|
||||||
:toc:
|
:toc:
|
||||||
:toc-placement: macro
|
:toc-placement: macro
|
||||||
:linkattrs:
|
:linkattrs:
|
||||||
:icons: font
|
:icons: font
|
||||||
|
|
||||||
toc::[]
|
toc::[]
|
||||||
|
|
||||||
[NOTE]
|
[NOTE]
|
||||||
@ -15,58 +19,115 @@ This feature is currently in incubating state, i.e. exact semantics, configurati
|
|||||||
====
|
====
|
||||||
|
|
||||||
== Datatype Conversion
|
== Datatype Conversion
|
||||||
|
endif::community[]
|
||||||
|
|
||||||
The {prodname} connectors map database column types to corresponding Kafka Connect schema types and convert the column values accordingly.
|
ifdef::product[]
|
||||||
The specific column type mappings are documented for each of the connectors and represent a reasonable default behavior for most of the time.
|
[IMPORTANT]
|
||||||
It is still possible that an application requires a specific handling of a certain column type or specific column due to downstream system requirements.
|
====
|
||||||
For instance you might want to export temporal column values as a formatted string instead of milli-seconds since epoch.
|
The use of custom-developed converters is a Technology Preview feature only.
|
||||||
|
Technology Preview features are not supported with Red Hat production service level agreements (SLAs) and might not be functionally complete.
|
||||||
|
Red Hat does not recommend using them in production.
|
||||||
|
These features provide early access to upcoming product features, enabling customers to test functionality and provide feedback during the development process.
|
||||||
|
For more information about the support scope of Red Hat Technology Preview features, see link:https://access.redhat.com/support/offerings/techpreview[https://access.redhat.com/support/offerings/techpreview].
|
||||||
|
====
|
||||||
|
endif::product[]
|
||||||
|
|
||||||
For this purpose {prodname} provides an extension point that allows users to inject their own converters based on their business requirements.
|
Each field in a {prodname} change event record represents a field or column in the source table or data collection.
|
||||||
The converters are written in Java and are enabled and configured via connector properties.
|
The connector converts data types in the source to a corresponding Kafka Connect schema types.
|
||||||
|
Column values are likewise converted to match the schema type of the destination field.
|
||||||
|
For each connector, a default mapping specifies how the connector converts each data type.
|
||||||
|
The documentation for each connector provides details about the default mappings that the connector uses to convert data types.
|
||||||
|
|
||||||
During connector startup, all configured converters are instantiated and placed in an registry.
|
The default mappings are sufficient to satisfy most needs, but for some applications it might be necessary to apply an alternate mapping.
|
||||||
While the connector-internal schema representation is built, every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column or field.
|
For example, the default mapping for a column might export values using the format of milliseconds since the UNIX epoch, but you have a downstream application that requires the values to be formatted strings.
|
||||||
|
To customize data type mappings you can develop and deploy custom converters.
|
||||||
|
You can configure a custom converter to apply to all columns of a certain type, or to a specific table column only.
|
||||||
|
The converter function intercepts conversion requests for columns that match a specified criteria, and performs the specified format conversion.
|
||||||
|
The converter ignores columns that do not match the specified criteria.
|
||||||
|
|
||||||
Whenever a new change is processed by {prodname}, the converter is invoked to execute the actual conversion for the registered columns or fields.
|
Custom converters are Java classes that implement the Debezium service provider interface (SPI).
|
||||||
|
You enable and configure a custom converter by setting the `converters` property in the connector configuration.
|
||||||
|
The `converters` property defines the criteria for identifying the columns that you want the converter to process and provides other details that determine conversion behavior.
|
||||||
|
|
||||||
== Implementing Converters
|
After you start a connector, any converters that are enabled in the connector configuration are instantiated and are added to a registry.
|
||||||
|
The registry associates each converter with the columns or fields for it to process.
|
||||||
|
Whenever {prodname} processes a new change event, it invokes the configured converter to convert the columns or fields for which it is registered.
|
||||||
|
|
||||||
The converter implementation is a Java class that implements the interface `io.debezium.spi.converter.CustomConverter`:
|
// Type: procedure
|
||||||
|
// Title: Creating a {prodname} custom data type converter
|
||||||
|
// ModuleID: creating-a-debezium-custom-data-type-converter
|
||||||
|
[id="implementing-a-custom-converter"]
|
||||||
|
== Implementing custom converters
|
||||||
|
|
||||||
|
The following example shows a converter implementation of a Java class that implements the interface `io.debezium.spi.converter.CustomConverter`:
|
||||||
|
|
||||||
[source,java,indent=0]
|
[source,java,indent=0]
|
||||||
----
|
----
|
||||||
public interface CustomConverter<S, F extends ConvertedField> {
|
public interface CustomConverter<S, F extends ConvertedField> {
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
interface Converter {
|
interface Converter { // <1>
|
||||||
Object convert(Object input);
|
Object convert(Object input);
|
||||||
}
|
}
|
||||||
|
|
||||||
public interface ConverterRegistration<S> {
|
public interface ConverterRegistration<S> { // <2>
|
||||||
void register(S fieldSchema, Converter converter);
|
void register(S fieldSchema, Converter converter); // <3>
|
||||||
}
|
}
|
||||||
|
|
||||||
void configure(Properties props);
|
void configure(Properties props);
|
||||||
|
|
||||||
void converterFor(F field, ConverterRegistration<S> registration);
|
void converterFor(F field, ConverterRegistration<S> registration); // <4>
|
||||||
}
|
}
|
||||||
----
|
----
|
||||||
|
<1> A function for converting data from one type to another.
|
||||||
|
<2> Callback for registering a converter.
|
||||||
|
<3> Registers the given schema and converter for the current field.
|
||||||
|
Should not be invoked more than once for the same field.
|
||||||
|
<4> Registers the customized value and schema converter for use with a specific field.
|
||||||
|
|
||||||
The method `configure()` is used to pass converter configuration options into the converter after its instantiation, so it can modify its runtime behaviour for each specific instance.
|
.Custom converter methods
|
||||||
The method `converterFor()` is invoked by {prodname} and the converter is required to call `registration` in case of taking responsibility for the conversion.
|
The `configure()` and `converterFor()` methods are mandatory for each {prodname} custom converter:
|
||||||
The registration provides the target schema definition and the actual conversion code.
|
|
||||||
Schemas are currently represented using Kafka Connect's `SchemaBuilder` API.
|
`configure()`::
|
||||||
|
Passes the properties specified in the connector configuration to the converter instance.
|
||||||
|
The `configure` method runs when the connector is initialized.
|
||||||
|
You can use a converter with multiple connectors and modify its behavior based on the connector's property settings. +
|
||||||
|
The `configure` method accepts the following argument:
|
||||||
|
|
||||||
|
`props`::: Contains the properties to pass to the converter instance.
|
||||||
|
Each property specifies the format for converting the values of a particular type of column.
|
||||||
|
|
||||||
|
`converterFor()`::
|
||||||
|
Registers the converter to process specific columns or fields in the data source.
|
||||||
|
{prodname} invokes the `converterFor()` method to prompt the converter to call `registration` for the conversion.
|
||||||
|
The `converterFor` method runs once for each column. +
|
||||||
|
The method accepts the following arguments:
|
||||||
|
|
||||||
|
`field`:::
|
||||||
|
An object that passes metadata about the field or column that is processed.
|
||||||
|
The column metadata can include the name of the column or field, the name of the table or collection, the data type, size, and so forth.
|
||||||
|
|
||||||
|
`registration`:::
|
||||||
|
An object of type `io.debezium.spi.converter.CustomConverter.ConverterRegistration` that provides the target schema definition and the code for converting the column data.
|
||||||
|
The converter calls the `registration` parameter when the source column matches the type that the converter should process.
|
||||||
|
calls the `register` method to define the converter for each column in the schema.
|
||||||
|
Schemas are represented using the Kafka Connect link:https://kafka.apache.org/31/javadoc/org/apache/kafka/connect/data/SchemaBuilder.html[`SchemaBuilder`] API.
|
||||||
|
ifdef::community[]
|
||||||
In the future, an independent schema definition API will be added.
|
In the future, an independent schema definition API will be added.
|
||||||
The metadata about the column or field are passed via the `field` parameter.
|
endif::community[]
|
||||||
They contain information like table or collection name, column or field name, type name, and others.
|
|
||||||
|
|
||||||
The following example implements a simple converter that will:
|
The following example implements a simple converter that performs the following operations:
|
||||||
|
|
||||||
* accept one parameter named `schema.name`
|
* Runs the `configure` method, which configures the converter based on the value of the `schema.name` property that is specified in the connector configuration.
|
||||||
* register itself for every column of type `isbn` with
|
The converter configuration is specific to each instance.
|
||||||
** the target `STRING` schema named according to the `schema.name` parameter
|
* Runs the `converterFor` method, which registers the converter to process values in source columns for which the data type is set to `isbn`.
|
||||||
** the conversion code that converts the ISBN data to `String`
|
** Identifies the target `STRING` schema based on the value that is specified for the `schema.name` property.
|
||||||
|
** Converts ISBN data in the source column to `String` values.
|
||||||
|
|
||||||
|
=== {prodname} custom converter example
|
||||||
|
[id="example-debezium-simple-custom-converter"]
|
||||||
|
.A simple custom converter
|
||||||
|
====
|
||||||
[source,java,indent=0]
|
[source,java,indent=0]
|
||||||
----
|
----
|
||||||
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
|
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
|
||||||
@ -88,38 +149,86 @@ The following example implements a simple converter that will:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
----
|
----
|
||||||
|
====
|
||||||
|
|
||||||
|
// Type: procedure
|
||||||
|
[id="debezium-and-kafka-connect-api-module-dependencies"]
|
||||||
|
=== {prodname} and Kafka Connect API module dependencies
|
||||||
|
|
||||||
|
The converter code depends on the {prodname} and Kafka Connect API library modules.
|
||||||
|
To enable your converter code to compile, add these dependencies to your converter Java project as shown in the following example:
|
||||||
|
|
||||||
To compile the code it is necessary to provide dependencies to the `debezium-api` and `connect-api` modules like:
|
|
||||||
[source,xml]
|
[source,xml]
|
||||||
----
|
----
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.debezium</groupId>
|
<groupId>io.debezium</groupId>
|
||||||
<artifactId>debezium-api</artifactId>
|
<artifactId>debezium-api</artifactId>
|
||||||
<version>${version.debezium}</version>
|
<version>${version.debezium}</version> // <1>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.kafka</groupId>
|
<groupId>org.apache.kafka</groupId>
|
||||||
<artifactId>connect-api</artifactId>
|
<artifactId>connect-api</artifactId>
|
||||||
<version>${version.kafka}</version>
|
<version>${version.kafka}</version> <2>
|
||||||
</dependency>
|
</dependency>
|
||||||
----
|
----
|
||||||
|
<1> `${version.debezium}` represents the version of the {prodname} connector.
|
||||||
|
<2> `${version.kafka}` represents the version of Apache Kafka in your environment.
|
||||||
|
|
||||||
where `${version.debezium}` and `${version.kafka}` are the versions of {prodname} and Apache Kafka, respectively.
|
// Type: assembly
|
||||||
|
// Title: Using custom converters with {prodname} connectors
|
||||||
|
// ModuleID: deploying-and-configuring-debezium-custom-data-type-converters
|
||||||
|
[id="configuring-and-using-converters"]
|
||||||
== Configuring and Using Converters
|
== Configuring and Using Converters
|
||||||
|
|
||||||
After the converter is developed it has to be deployed in a JAR file side-by-side with the {prodname} connector JARs.
|
To use the converter with a connector, you deploy the converter JAR file alongside the connector file, and then configure the connector to use the converter.
|
||||||
To enable the converter for a given connector instance it is necessary to provide the connector options like this:
|
|
||||||
|
|
||||||
|
// Type: procedure
|
||||||
|
[id="deploying-a-debezium-custom-converter"]
|
||||||
|
=== Deploying a custom converter
|
||||||
|
|
||||||
|
.Procedure
|
||||||
|
* To use a custom converter with a {prodname} connector, export the Java project to a JAR file, and add the file to the directory that contains the JAR file for each {prodname} connector that you want to use it with. +
|
||||||
|
+
|
||||||
|
For example, in a typical deployment, you might store {prodname} connector files in subdirectories of a Kafka Connect directory, such as `/kafka/connect`,
|
||||||
|
and then store the JAR for each connector in its own subdirectory (`debezium-connector-db2`, `debezium-connector-mysql`, and so forth).
|
||||||
|
To use a converter with a connector, add the converter JAR file to the connector subdirectory.
|
||||||
|
|
||||||
|
NOTE: To use a converter with multiple connectors, add the connector JAR file to the directory for each of the connectors.
|
||||||
|
|
||||||
|
// Type: procedure
|
||||||
|
[id="configuring-a-connectors-to-use-a-custom-converter"]
|
||||||
|
=== Configuring a connector to use a custom converter
|
||||||
|
|
||||||
|
Custom converters act on specific columns or column types in a source table to specify how to convert their data types.
|
||||||
|
To enable a connector to use the custom converter, you add properties to the connector configuration that specify the converter name and class.
|
||||||
|
If the converter requires further information to customize the formats of specific data types, you can also define other coniguration options to provide that information.
|
||||||
|
|
||||||
|
.Prerequisites
|
||||||
|
* You have a custom converter Java program.
|
||||||
|
|
||||||
|
.Procedure
|
||||||
|
|
||||||
|
* Enable a converter for a connector instance by adding the following mandatory properties to the connector configuration:
|
||||||
|
+
|
||||||
|
[subs="+quotes"]
|
||||||
----
|
----
|
||||||
converters=isbn
|
converters: _<converterSymbolicName>_ // <1>
|
||||||
isbn.type=io.debezium.test.IsbnConverter
|
_<converterSymbolicName>_.type: _<fullyQualifiedConverterClassName>_ // <2>
|
||||||
isbn.schema.name=io.debezium.postgresql.type.Isbn
|
----
|
||||||
|
<1> The `converters` property is mandatory and enumerates a comma-separated list of symbolic names of the converter instances to use with the connector.
|
||||||
|
The values listed for this property serve as prefixes in the names of other properties that you specify for the converter.
|
||||||
|
<2> The `_<converterSymbolicName>_.type` property is mandatory, and specifies the name of the class that implements the converter.
|
||||||
|
For example, for the earlier xref:example-debezium-simple-custom-converter[custom converter example], you would add the following properties to the connector configuration:
|
||||||
|
+
|
||||||
|
----
|
||||||
|
converters: isbn
|
||||||
|
isbn.type: io.debezium.test.IsbnConverter
|
||||||
----
|
----
|
||||||
|
|
||||||
The option `converters` is mandatory and enumerates comma-separated symbolic names of converter instances to be used.
|
* If provide further configuration properties for a converter, prefix the property names with the symbolic name of the converter, followed by a dot (`.`).
|
||||||
The symbolic names are used as a prefix for further configuration options.
|
The symbolic name is label that you specify as a value for the `converters` property.
|
||||||
|
For example, to add a property for the preceding `isbn` converter to specify the `schema.name` to pass to the `configure` method in the converter code, add the following property:
|
||||||
`isbn.type` (generally `<prefix>.type`) is mandatory and is the name of the class that implements the converter.
|
+
|
||||||
|
----
|
||||||
`isbn.schema.name` is a converter parameter that is passed to the converter's `configure` method as `schema.name`.
|
isbn.schema.name: io.debezium.postgresql.type.Isbn
|
||||||
|
----
|
||||||
|
Loading…
Reference in New Issue
Block a user