tet123/documentation/modules/ROOT/pages/development/converters.adoc
2020-11-02 08:41:14 +01:00

126 lines
5.2 KiB
Plaintext

[id="custom-converters"]
= Custom Converters
:source-highlighter: highlight.js
:toc:
:toc-placement: macro
:linkattrs:
:icons: font
toc::[]
[NOTE]
====
This feature is currently in incubating state, i.e. exact semantics, configuration options etc. may change in future revisions, based on the feedback we receive. Please let us know if you encounter any problems while using this extension.
====
== Datatype Conversion
The {prodname} connectors map database column types to corresponding Kafka Connect schema types and convert the column values accordingly.
The specific column type mappings are documented for each of the connectors and represent a reasonable default behavior for most of the time.
It is still possible that an application requires a specific handling of a certain column type or specific column due to downstream system requirements.
For instance you might want to export temporal column values as a formatted string instead of milli-seconds since epoch.
For this purpose {prodname} provides an extension point that allows users to inject their own converters based on their business requirements.
The converters are written in Java and are enabled and configured via connector properties.
During connector startup, all configured converters are instantiated and placed in an registry.
While the connector-internal schema representation is built, every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column or field.
Whenever a new change is processed by {prodname}, the converter is invoked to execute the actual conversion for the registered columns or fields.
== Implementing Converters
The converter implementation is a Java class that implements the interface `io.debezium.spi.converter.CustomConverter`:
[source,java,indent=0]
----
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter {
Object convert(Object input);
}
public interface ConverterRegistration<S> {
void register(S fieldSchema, Converter converter);
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration);
}
----
The method `configure()` is used to pass converter configuration options into the converter after its instantiation, so it can modify its runtime behaviour for each specific instance.
The method `converterFor()` is invoked by {prodname} and the converter is required to call `registration` in case of taking responsibility for the conversion.
The registration provides the target schema definition and the actual conversion code.
Schemas are currently represented using Kafka Connect's `SchemaBuilder` API.
In the future, an independent schema definition API will be added.
The metadata about the column or field are passed via the `field` parameter.
They contain information like table or collection name, column or field name, type name, and others.
The following example implements a simple converter that will:
* accept one parameter named `schema.name`
* register itself for every column of type `isbn` with
** the target `STRING` schema named according to the `schema.name` parameter
** the conversion code that converts the ISBN data to `String`
[source,java,indent=0]
----
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private SchemaBuilder isbnSchema;
@Override
public void configure(Properties props) {
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(isbnSchema, x -> x.toString());
}
}
}
----
To compile the code it is necessary to provide dependencies to the `debezium-api` and `connect-api` modules like:
[source,xml]
----
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version>
</dependency>
----
where `${version.debezium}` and `${version.kafka}` are the versions of {prodname} and Apache Kafka, respectively.
== Configuring and Using Converters
After the converter is developed it has to be deployed in a JAR file side-by-side with the {prodname} connector JARs.
To enable the converter for a given connector instance it is necessary to provide the connector options like this:
----
converters=isbn
isbn.type=io.debezium.test.IsbnConverter
isbn.schema.name=io.debezium.postgresql.type.Isbn
----
The option `converters` is mandatory and enumerates comma-separated symbolic names of converter instances to be used.
The symbolic names are used as a prefix for further configuration options.
`isbn.type` (generally `<prefix>.type`) is mandatory and is the name of the class that implements the converter.
`isbn.schema.name` is a converter parameter that is passed to the converter's `configure` method as `schema.name`.