118 lines
4.6 KiB
Plaintext
118 lines
4.6 KiB
Plaintext
|
= Custom Converters
|
||
|
include::../_attributes.adoc[]
|
||
|
:source-highlighter: highlight.js
|
||
|
:toc:
|
||
|
:toc-placement: macro
|
||
|
:linkattrs:
|
||
|
:icons: font
|
||
|
|
||
|
toc::[]
|
||
|
|
||
|
== Datatype conversion
|
||
|
|
||
|
Debezium connectors maps database types to appropriate Java types and converts the incoming values to the target objects.
|
||
|
The mapping is specific and documented for each connector and is selected to cover most of the cases.
|
||
|
It is still possible that the application requires a specific handling of a certain type or specific column due to downstream system requirements.
|
||
|
This is something that cannot be covered by the standard conversion rules.
|
||
|
|
||
|
Debezium provides an extension point for the users to inject their own converters based on their business requirements.
|
||
|
The converters are written as Java classes enabled and configured via connector properties.
|
||
|
|
||
|
During connector startup, all configured converters are instantiated and placed in the registry.
|
||
|
While the internal schema representation is built every converter is invoked for every column/field of every table/collection and it can register itself to become responsible for the conversion of the given column/field.
|
||
|
|
||
|
Whenever a new change is processed by Debezium the converter is invoked to execute actual conversion for the registered columns/fields.
|
||
|
|
||
|
== Implementing converter
|
||
|
|
||
|
The converter implementation is a Java class that implements interface `CustomConverter`:
|
||
|
|
||
|
[source,java,indent=0]
|
||
|
----
|
||
|
public interface CustomConverter<S, F extends ConvertedField> {
|
||
|
|
||
|
@FunctionalInterface
|
||
|
interface Converter {
|
||
|
Object convert(Object input);
|
||
|
}
|
||
|
|
||
|
public interface ConverterRegistration<S> {
|
||
|
void register(S fieldSchema, Converter converter);
|
||
|
}
|
||
|
|
||
|
void configure(Properties props);
|
||
|
|
||
|
void converterFor(F field, ConverterRegistration<S> registration);
|
||
|
}
|
||
|
----
|
||
|
|
||
|
Method `configure` is used to pass converter configuration options into the converter after its instantiation so it can modify its runtime behaviour for each specific instance.
|
||
|
Method `converterFor` is invoked by Debezium and the converter is required to call `registration` in case of taking responsibility for the conversion.
|
||
|
The registration provides the target schema definition and the actual conversion code.
|
||
|
The metadata about the column/field are passed via `field` parameter.
|
||
|
They contain information like table/collection name, column/field name, type name, and others.
|
||
|
|
||
|
The following example implements a simple converter that will:
|
||
|
|
||
|
* accept one parameter named `schema.name`
|
||
|
* register itself for every column of type `isbn` with
|
||
|
** the target `STRING` schema named according to the `schema.name` parameter
|
||
|
** the conversion code that converts the ISBN data to `String`
|
||
|
|
||
|
|
||
|
[source,java,indent=0]
|
||
|
----
|
||
|
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
|
||
|
|
||
|
private SchemaBuilder isbnSchema;
|
||
|
|
||
|
@Override
|
||
|
public void configure(Properties props) {
|
||
|
isbnSchema = SchemaBuilder.string().name(props.getProperty("schema.name"));
|
||
|
}
|
||
|
|
||
|
@Override
|
||
|
public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
|
||
|
if ("isbn".equals(column.typeName())) {
|
||
|
registration.register(isbnSchema, x -> x.toString());
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
|
||
|
To compile the code it is necessary to provide dependencies to `debezium-api` and `connect-api` modules like:
|
||
|
[source,xml]
|
||
|
----
|
||
|
<dependency>
|
||
|
<groupId>io.debezium</groupId>
|
||
|
<artifactId>debezium-api</artifactId>
|
||
|
<version>${version.debezium}</version>
|
||
|
</dependency>
|
||
|
<dependency>
|
||
|
<groupId>org.apache.kafka</groupId>
|
||
|
<artifactId>connect-api</artifactId>
|
||
|
<version>${version.debezium}</version>
|
||
|
</dependency>
|
||
|
----
|
||
|
|
||
|
where `${version.debezium}` is either Debezium version string or Maven property defining the Debezium version.
|
||
|
|
||
|
== Configuring and using converters
|
||
|
|
||
|
After the converter is developed it has to be deployed in a JAR file side-by-side with Debezium connector JARs.
|
||
|
To enable the converter for a given connector instance it is necessary to provide connector options like in the example:
|
||
|
|
||
|
----
|
||
|
converters=isbn
|
||
|
isbn.type=io.debezium.test.IsbnConverter
|
||
|
isbn.schema.name=io.debezium.postgresql.type.Isbn
|
||
|
----
|
||
|
|
||
|
Option `converters` is mandatory and enumerates comma-separated symbolic names of converter instances to be used.
|
||
|
The symbolic names are used as a prefix for further configuration options.
|
||
|
|
||
|
`isbn.type` (generally `<prefix>.type`) is mandatory and is the name of the class that implements the converter.
|
||
|
|
||
|
`isbn.schema.name` is a converter parameter that is passed to the converter's `configure` method as `schema.name`.
|
||
|
|