DBZ-8040: Custom Converter Support
This commit is contained in:
parent
ef7a9747db
commit
479a41af28
@ -0,0 +1,11 @@
|
|||||||
|
/*
|
||||||
|
* Copyright Debezium Authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*/
|
||||||
|
package io.debezium.embedded;
|
||||||
|
|
||||||
|
import io.debezium.engine.format.SerializationFormat;
|
||||||
|
|
||||||
|
public class ClientProvided implements SerializationFormat<Object> {
|
||||||
|
}
|
@ -149,7 +149,7 @@ private boolean shouldConvertHeadersToString() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private List<Header<byte[]>> convertHeaders(
|
private List<Header<byte[]>> convertHeaders(
|
||||||
SourceRecord record, String topicName, HeaderConverter headerConverter) {
|
SourceRecord record, String topicName, HeaderConverter headerConverter) {
|
||||||
List<Header<byte[]>> headers = new ArrayList<>();
|
List<Header<byte[]>> headers = new ArrayList<>();
|
||||||
|
|
||||||
for (org.apache.kafka.connect.header.Header header : record.headers()) {
|
for (org.apache.kafka.connect.header.Header header : record.headers()) {
|
||||||
@ -171,6 +171,12 @@ private HeaderConverter createHeaderConverter(Class<? extends SerializationForma
|
|||||||
if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) {
|
if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) {
|
||||||
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
|
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
|
||||||
}
|
}
|
||||||
|
else if (isFormat(format, ClientProvided.class)) {
|
||||||
|
if (converterConfig.getString(FIELD_CLASS) == null) {
|
||||||
|
throw new DebeziumException(
|
||||||
|
"`" + ClientProvided.class.getSimpleName().toLowerCase() + "`" + " header converter requires a '" + FIELD_CLASS + "' configuration");
|
||||||
|
}
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported");
|
throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported");
|
||||||
}
|
}
|
||||||
@ -218,6 +224,13 @@ else if (isFormat(format, Binary.class)) {
|
|||||||
else if (isFormat(format, SimpleString.class)) {
|
else if (isFormat(format, SimpleString.class)) {
|
||||||
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.storage.StringConverter").build();
|
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.storage.StringConverter").build();
|
||||||
}
|
}
|
||||||
|
else if (isFormat(format, ClientProvided.class)) {
|
||||||
|
if (converterConfig.getString(FIELD_CLASS) == null) {
|
||||||
|
throw new DebeziumException(
|
||||||
|
"`" + ClientProvided.class.getSimpleName().toLowerCase() + "`" + (key ? " key" : " value") + " converter requires a '" + FIELD_CLASS
|
||||||
|
+ "' configuration");
|
||||||
|
}
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
|
throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user