DBZ-5926 Add support for Connect Headers to Debezium Server

Add headers support to ChangeEvent
Add SerializationFormat support headers
This commit is contained in:
Jeremy Ford 2022-12-15 08:50:34 -05:00 committed by Jiri Pechanec
parent 107f09a2e7
commit d775104331
18 changed files with 544 additions and 75 deletions

View File

@ -5,10 +5,13 @@
*/
package io.debezium.engine;
import java.util.Collections;
import java.util.List;
import io.debezium.common.annotation.Incubating;
/**
* A data change event with key and value.
* A data change event with key, value, and headers.
*
* @param <K>
* @param <V>
@ -20,6 +23,10 @@ public interface ChangeEvent<K, V> {
V value();
default <H> List<Header<H>> headers() {
return Collections.emptyList();
}
/**
* @return A name of the logical destination for which the event is intended
*/

View File

@ -22,6 +22,7 @@
import io.debezium.common.annotation.Incubating;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.spi.OffsetCommitPolicy;
@ -287,16 +288,27 @@ static <K, V> Builder<ChangeEvent<K, V>> create(Class<? extends SerializationFor
return create(KeyValueChangeEventFormat.of(keyFormat, valueFormat));
}
/**
* Obtain a new {@link Builder} instance that can be used to construct runnable {@link DebeziumEngine} instances.
* Different formats are used for key, value, and headers of emitted change events.
* <p>
* Convenience method, equivalent to calling {@code create(KeyValueChangeEventFormat.of(MyKeyFormat.class, MyValueFormat.class, MyHeaderFormat.class)}.
*
* @return the new builder; never null
*/
static <K, V, H> Builder<ChangeEvent<K, V>> create(Class<? extends SerializationFormat<K>> keyFormat,
Class<? extends SerializationFormat<V>> valueFormat,
Class<? extends SerializationFormat<H>> headerFormat) {
return create(KeyValueHeaderChangeEventFormat.of(keyFormat, valueFormat, headerFormat));
}
static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> create(KeyValueChangeEventFormat<K, V> format) {
final ServiceLoader<BuilderFactory> loader = ServiceLoader.load(BuilderFactory.class);
final Iterator<BuilderFactory> iterator = loader.iterator();
if (!iterator.hasNext()) {
throw new DebeziumException("No implementation of Debezium engine builder was found");
}
final BuilderFactory builder = iterator.next();
if (iterator.hasNext()) {
LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass());
}
final BuilderFactory builder = determineBuilderFactory();
return builder.builder(format);
}
static <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat<T>, H extends SerializationFormat<U>> Builder<ChangeEvent<S, T>> create(KeyValueHeaderChangeEventFormat<K, V, H> format) {
final BuilderFactory builder = determineBuilderFactory();
return builder.builder(format);
}
@ -307,6 +319,11 @@ static <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>
* @return the new builder; never null
*/
static <T, V extends SerializationFormat<T>> Builder<RecordChangeEvent<T>> create(ChangeEventFormat<V> format) {
final BuilderFactory builder = determineBuilderFactory();
return builder.builder(format);
}
private static BuilderFactory determineBuilderFactory() {
final ServiceLoader<BuilderFactory> loader = ServiceLoader.load(BuilderFactory.class);
final Iterator<BuilderFactory> iterator = loader.iterator();
if (!iterator.hasNext()) {
@ -316,7 +333,7 @@ static <T, V extends SerializationFormat<T>> Builder<RecordChangeEvent<T>> creat
if (iterator.hasNext()) {
LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass());
}
return builder.builder(format);
return builder;
}
/**
@ -340,5 +357,15 @@ interface BuilderFactory {
* @return this builder object so methods can be chained together; never null
*/
<S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>> Builder<ChangeEvent<S, T>> builder(KeyValueChangeEventFormat<K, V> format);
/**
* Prescribe the output and header formats to be used by the {@link DebeziumEngine}.
* Usually called by {@link DebeziumEngine#create}.
* @param format
* @return this builder object so methods can be chained together; never null
*/
default <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat<T>, H extends SerializationFormat<U>> Builder<ChangeEvent<S, T>> builder(KeyValueHeaderChangeEventFormat<K, V, H> format) {
throw new UnsupportedOperationException("Method must be implemented in order to support headers");
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine;
/**
* Represents a header that contains a key and a value.
*/
public interface Header<T> {
/**
* Key of a header.
*/
String getKey();
/**
* Value of a header.
*/
T getValue();
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine.format;
/**
* A {@link SerializationFormat} defining the JSON format serialized as byte[].
*/
public class JsonByteArray implements SerializationFormat<byte[]> {
}

View File

@ -0,0 +1,40 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine.format;
import io.debezium.common.annotation.Incubating;
/**
* Describes a change event output format comprising a key, value, and a header.
*/
@Incubating
public interface KeyValueHeaderChangeEventFormat<K extends SerializationFormat<?>, V extends SerializationFormat<?>, H extends SerializationFormat<?>>
extends KeyValueChangeEventFormat<K, V> {
static <K extends SerializationFormat<?>, V extends SerializationFormat<?>, H extends SerializationFormat<?>> KeyValueHeaderChangeEventFormat<K, V, H> of(
Class<K> keyFormat,
Class<V> valueFormat,
Class<H> headerFormat) {
return new KeyValueHeaderChangeEventFormat<>() {
@Override
public Class<K> getKeyFormat() {
return keyFormat;
}
@Override
public Class<V> getValueFormat() {
return valueFormat;
}
@Override
public Class<H> getHeaderFormat() {
return headerFormat;
}
};
}
Class<H> getHeaderFormat();
}

View File

@ -8,13 +8,19 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.HeaderConverter;
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
@ -26,11 +32,14 @@
import io.debezium.engine.DebeziumEngine.CompletionCallback;
import io.debezium.engine.DebeziumEngine.ConnectorCallback;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.engine.Header;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.spi.OffsetCommitPolicy;
@ -44,6 +53,7 @@
public class ConvertingEngineBuilder<R> implements Builder<R> {
private static final String CONVERTER_PREFIX = "converter";
private static final String HEADER_CONVERTER_PREFIX = "header.converter";
private static final String KEY_CONVERTER_PREFIX = "key.converter";
private static final String VALUE_CONVERTER_PREFIX = "value.converter";
private static final String FIELD_CLASS = "class";
@ -51,6 +61,7 @@ public class ConvertingEngineBuilder<R> implements Builder<R> {
private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url";
private final Builder<SourceRecord> delegate;
private final Class<? extends SerializationFormat<?>> formatHeader;
private final Class<? extends SerializationFormat<?>> formatKey;
private final Class<? extends SerializationFormat<?>> formatValue;
private Configuration config;
@ -59,15 +70,19 @@ public class ConvertingEngineBuilder<R> implements Builder<R> {
private Function<R, SourceRecord> fromFormat;
ConvertingEngineBuilder(ChangeEventFormat<?> format) {
this.delegate = EmbeddedEngine.create();
this.formatKey = null;
this.formatValue = format.getValueFormat();
this(KeyValueHeaderChangeEventFormat.of(null, format.getValueFormat(), null));
}
ConvertingEngineBuilder(KeyValueChangeEventFormat<?, ?> format) {
this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat) format
: KeyValueHeaderChangeEventFormat.of(format.getKeyFormat(), format.getValueFormat(), null));
}
ConvertingEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> format) {
this.delegate = EmbeddedEngine.create();
this.formatKey = format.getKeyFormat();
this.formatValue = format.getValueFormat();
this.formatHeader = format.getHeaderFormat();
}
@Override
@ -76,8 +91,8 @@ public Builder<R> notifying(Consumer<R> consumer) {
return this;
}
private boolean isFormat(Class<? extends SerializationFormat<?>> format1, Class<? extends SerializationFormat<?>> format2) {
return format1 == (Class<?>) format2;
private static boolean isFormat(Class<? extends SerializationFormat<?>> format1, Class<? extends SerializationFormat<?>> format2) {
return format1 == format2;
}
@Override
@ -154,18 +169,27 @@ public DebeziumEngine<R> build() {
final DebeziumEngine<SourceRecord> engine = delegate.build();
Converter keyConverter;
Converter valueConverter;
HeaderConverter headerConverter;
if (formatValue == Connect.class) {
toFormat = (record) -> {
return (R) new EmbeddedEngineChangeEvent<Void, SourceRecord>(
null,
record,
record);
};
headerConverter = null;
toFormat = (record) -> (R) new EmbeddedEngineChangeEvent<Void, SourceRecord, Object>(
null,
record,
StreamSupport.stream(record.headers().spliterator(), false)
.map(EmbeddedEngineHeader::new).collect(Collectors.toList()),
record);
}
else {
keyConverter = createConverter(formatKey, true);
valueConverter = createConverter(formatValue, false);
if (formatHeader == null) {
headerConverter = null;
}
else {
headerConverter = createHeaderConverter(formatHeader);
}
toFormat = (record) -> {
String topicName = record.topic();
if (topicName == null) {
@ -173,21 +197,31 @@ public DebeziumEngine<R> build() {
}
final byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key());
final byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value());
return isFormat(formatKey, Json.class) && isFormat(formatValue, Json.class)
|| isFormat(formatValue, CloudEvents.class)
? (R) new EmbeddedEngineChangeEvent<String, String>(
key != null ? new String(key, StandardCharsets.UTF_8) : null,
value != null ? new String(value, StandardCharsets.UTF_8) : null,
record)
: (R) new EmbeddedEngineChangeEvent<byte[], byte[]>(
key,
value,
record);
List<Header<?>> headers = Collections.emptyList();
if (headerConverter != null) {
List<Header<byte[]>> byteArrayHeaders = convertHeaders(record, topicName, headerConverter);
headers = (List) byteArrayHeaders;
if (shouldConvertHeadersToString()) {
headers = byteArrayHeaders.stream()
.map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8)))
.collect(Collectors.toList());
}
}
return shouldConvertKeyAndValueToString()
? (R) new EmbeddedEngineChangeEvent<>(
key != null ? new String(key, StandardCharsets.UTF_8) : null,
value != null ? new String(value, StandardCharsets.UTF_8) : null,
(List) headers,
record)
: (R) new EmbeddedEngineChangeEvent<>(key, value, (List) headers, record);
};
}
fromFormat = (record) -> ((EmbeddedEngineChangeEvent<?, ?>) record).sourceRecord();
fromFormat = (record) -> ((EmbeddedEngineChangeEvent<?, ?, ?>) record).sourceRecord();
HeaderConverter finalHeaderConverter = headerConverter;
return new DebeziumEngine<R>() {
@Override
@ -197,11 +231,54 @@ public void run() {
@Override
public void close() throws IOException {
if (finalHeaderConverter != null) {
finalHeaderConverter.close();
}
engine.close();
}
};
}
private boolean shouldConvertKeyAndValueToString() {
return isFormat(formatKey, Json.class) && isFormat(formatValue, Json.class)
|| isFormat(formatValue, CloudEvents.class);
}
private boolean shouldConvertHeadersToString() {
return isFormat(formatHeader, Json.class);
}
private List<Header<byte[]>> convertHeaders(
SourceRecord record, String topicName, HeaderConverter headerConverter) {
List<Header<byte[]>> headers = new ArrayList<>();
for (org.apache.kafka.connect.header.Header header : record.headers()) {
String headerKey = header.key();
byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value());
headers.add(new EmbeddedEngineHeader<>(headerKey, rawHeader));
}
return headers;
}
private HeaderConverter createHeaderConverter(Class<? extends SerializationFormat<?>> format) {
Configuration converterConfig = config.subset(HEADER_CONVERTER_PREFIX, true);
final Configuration commonConverterConfig = config.subset(CONVERTER_PREFIX, true);
converterConfig = commonConverterConfig.edit().with(converterConfig)
.with(ConverterConfig.TYPE_CONFIG, "header")
.build();
if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) {
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build();
}
else {
throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported");
}
final HeaderConverter converter = converterConfig.getInstance(FIELD_CLASS, HeaderConverter.class);
converter.configure(converterConfig.asMap());
return converter;
}
private Converter createConverter(Class<? extends SerializationFormat<?>> format, boolean key) {
// The converters can be configured both using converter.* prefix for cases when both converters
// are the same or using key.converter.* and value.converter.* converter when converters
@ -210,7 +287,7 @@ private Converter createConverter(Class<? extends SerializationFormat<?>> format
final Configuration commonConverterConfig = config.subset(CONVERTER_PREFIX, true);
converterConfig = commonConverterConfig.edit().with(converterConfig).build();
if (isFormat(format, Json.class)) {
if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) {
if (converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG)) {
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build();
}

View File

@ -11,6 +11,7 @@
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.SerializationFormat;
public class ConvertingEngineBuilderFactory implements BuilderFactory {
@ -25,4 +26,9 @@ public <S, T, K extends SerializationFormat<S>, V extends SerializationFormat<T>
KeyValueChangeEventFormat<K, V> format) {
return new ConvertingEngineBuilder<>(format);
}
public <S, T, U, K extends SerializationFormat<S>, V extends SerializationFormat<T>, H extends SerializationFormat<U>> Builder<ChangeEvent<S, T>> builder(
KeyValueHeaderChangeEventFormat<K, V, H> format) {
return new ConvertingEngineBuilder<>(format);
}
}

View File

@ -5,20 +5,25 @@
*/
package io.debezium.embedded;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.Header;
import io.debezium.engine.RecordChangeEvent;
class EmbeddedEngineChangeEvent<K, V> implements ChangeEvent<K, V>, RecordChangeEvent<V> {
class EmbeddedEngineChangeEvent<K, V, H> implements ChangeEvent<K, V>, RecordChangeEvent<V> {
private final K key;
private final V value;
private final List<Header<H>> headers;
private final SourceRecord sourceRecord;
EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) {
EmbeddedEngineChangeEvent(K key, V value, List<Header<H>> headers, SourceRecord sourceRecord) {
this.key = key;
this.value = value;
this.headers = headers;
this.sourceRecord = sourceRecord;
}
@ -32,6 +37,12 @@ public V value() {
return value;
}
@SuppressWarnings("unchecked")
@Override
public List<Header<H>> headers() {
return headers;
}
@Override
public V record() {
return value;

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import io.debezium.engine.Header;
public class EmbeddedEngineHeader<T> implements Header<T> {
private final String key;
private final T value;
public EmbeddedEngineHeader(String key, T value) {
this.key = key;
this.value = value;
}
public EmbeddedEngineHeader(org.apache.kafka.connect.header.Header header) {
this.key = header.key();
this.value = (T) header.value();
}
@Override
public String getKey() {
return key;
}
@Override
public T getValue() {
return value;
}
}

View File

@ -14,6 +14,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@ -32,6 +33,8 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
@ -45,6 +48,7 @@
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
@ -52,9 +56,11 @@
import io.debezium.doc.FixFor;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.Header;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
@ -150,7 +156,7 @@ public void verifyNonAsciiContentHandledCorrectly() throws Exception {
CountDownLatch firstLatch = new CountDownLatch(1);
// create an engine with our custom class
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class)
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class, Json.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
@ -363,10 +369,15 @@ public void shouldRunDebeziumEngine() throws Exception {
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
props.setProperty("transforms", "header");
props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
Headers expectedHeaders = new ConnectHeaders();
expectedHeaders.addString("headerKey", "headerValue");
// create an engine with our custom class
final DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(props)
@ -375,6 +386,7 @@ public void shouldRunDebeziumEngine() throws Exception {
Integer groupCount = records.size() / NUMBER_OF_LINES;
for (RecordChangeEvent<SourceRecord> r : records) {
assertThat(r.record().headers()).isEqualTo(expectedHeaders);
committer.markProcessed(r);
}
@ -592,12 +604,16 @@ public void shouldRunDebeziumEngineWithJson() throws Exception {
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
props.setProperty("converter.schemas.enable", "false");
props.setProperty("transforms", "header");
props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
EmbeddedEngineHeader<String> expectedHeader = new EmbeddedEngineHeader<>("headerKey", "\"headerValue\"");
// create an engine with our custom class
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class, Json.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
@ -606,6 +622,10 @@ public void shouldRunDebeziumEngineWithJson() throws Exception {
for (ChangeEvent<String, String> r : records) {
assertThat(r.key()).isNull();
assertThat(r.value()).startsWith("\"Generated line number ");
List<Header<String>> headers = r.headers();
assertThat(headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && h.getValue().equals(expectedHeader.getValue()));
committer.markProcessed(r);
}
@ -639,39 +659,74 @@ public void shouldRunDebeziumEngineWithJson() throws Exception {
stopConnector();
}
protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linesAdded + i + 1);
@Test
@FixFor("DBZ-5926")
public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
props.setProperty("converter.schemas.enable", "false");
props.setProperty("transforms", "header");
props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
EmbeddedEngineHeader<byte[]> expectedHeader = new EmbeddedEngineHeader<>("headerKey", "\"headerValue\"".getBytes(StandardCharsets.UTF_8));
// create an engine with our custom class
final DebeziumEngine<ChangeEvent<String, byte[]>> engine = DebeziumEngine.create(Json.class, JsonByteArray.class, JsonByteArray.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
int groupCount = records.size() / NUMBER_OF_LINES;
for (ChangeEvent<String, byte[]> r : records) {
assertThat(r.key()).isNull();
assertThat(new String(r.value(), Charsets.UTF_8)).startsWith("\"Generated line number ");
List<Header<byte[]>> headers = r.headers();
assertThat(headers).hasSize(1);
assertThat(headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && Arrays.equals(h.getValue(), expectedHeader.getValue()));
committer.markProcessed(r);
}
committer.markBatchFinished();
firstLatch.countDown();
for (int i = 0; i < groupCount; i++) {
allLatch.countDown();
}
})
.using(this.getClass().getClassLoader())
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
firstLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(firstLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
// Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linePrefix, linesAdded + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
protected String generateLine(int lineNumber) {
return generateLine("Generated line number ", lineNumber);
}
protected String generateLine(String linePrefix, int lineNumber) {
return linePrefix + lineNumber;
}
protected void consumeLines(int numberOfLines) throws InterruptedException {
consumeRecords(numberOfLines, 3, record -> {
String line = record.value().toString();
assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber));
++nextConsumedLineNumber;
},
false);
// Stop the connector ...
stopConnector();
}
@Test
@ -725,6 +780,68 @@ public void validationThrowsException() throws Exception {
assertThat(errorReference.get()).contains("Connector configuration is not valid. ");
assertThat(this.engine.isRunning()).isFalse();
}
protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linesAdded + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linePrefix, linesAdded + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
protected String generateLine(int lineNumber) {
return generateLine("Generated line number ", lineNumber);
}
protected String generateLine(String linePrefix, int lineNumber) {
return linePrefix + lineNumber;
}
protected void consumeLines(int numberOfLines) throws InterruptedException {
consumeRecords(numberOfLines, 3, record -> {
String line = record.value().toString();
assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber));
++nextConsumedLineNumber;
},
false);
}
public static class AddHeaderTransform implements Transformation<SourceRecord> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public SourceRecord apply(SourceRecord record) {
Headers headers = new ConnectHeaders();
headers.addString("headerKey", "headerValue");
record = record.newRecord(
record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), headers);
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
}
}
class InterruptedConnector extends SimpleSourceConnector {

View File

@ -37,6 +37,7 @@
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.Protobuf;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.server.events.ConnectorCompletedEvent;
@ -67,6 +68,7 @@ public class DebeziumServer {
private static final String PROP_FORMAT_PREFIX = PROP_PREFIX + "format.";
private static final String PROP_PREDICATES_PREFIX = PROP_PREFIX + "predicates.";
private static final String PROP_TRANSFORMS_PREFIX = PROP_PREFIX + "transforms.";
private static final String PROP_HEADER_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "header.";
private static final String PROP_KEY_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "key.";
private static final String PROP_VALUE_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "value.";
private static final String PROP_OFFSET_STORAGE_PREFIX = "offset.storage.";
@ -74,11 +76,14 @@ public class DebeziumServer {
private static final String PROP_PREDICATES = PROP_PREFIX + "predicates";
private static final String PROP_TRANSFORMS = PROP_PREFIX + "transforms";
private static final String PROP_SINK_TYPE = PROP_SINK_PREFIX + "type";
private static final String PROP_HEADER_FORMAT = PROP_FORMAT_PREFIX + "header";
private static final String PROP_KEY_FORMAT = PROP_FORMAT_PREFIX + "key";
private static final String PROP_VALUE_FORMAT = PROP_FORMAT_PREFIX + "value";
private static final String PROP_TERMINATION_WAIT = PROP_PREFIX + "termination.wait";
private static final String FORMAT_JSON = Json.class.getSimpleName().toLowerCase();
private static final String FORMAT_JSON_BYTE_ARRAY = JsonByteArray.class.getSimpleName().toLowerCase();
private static final String FORMAT_CLOUDEVENT = CloudEvents.class.getSimpleName().toLowerCase();
private static final String FORMAT_AVRO = Avro.class.getSimpleName().toLowerCase();
private static final String FORMAT_PROTOBUF = Protobuf.class.getSimpleName().toLowerCase();
@ -126,11 +131,15 @@ else if (beans.size() > 1) {
final Class<Any> keyFormat = (Class<Any>) getFormat(config, PROP_KEY_FORMAT);
final Class<Any> valueFormat = (Class<Any>) getFormat(config, PROP_VALUE_FORMAT);
final Class<Any> headerFormat = (Class<Any>) getHeaderFormat(config);
configToProperties(config, props, PROP_SOURCE_PREFIX, "", true);
configToProperties(config, props, PROP_FORMAT_PREFIX, "key.converter.", true);
configToProperties(config, props, PROP_FORMAT_PREFIX, "value.converter.", true);
configToProperties(config, props, PROP_FORMAT_PREFIX, "header.converter.", true);
configToProperties(config, props, PROP_KEY_FORMAT_PREFIX, "key.converter.", true);
configToProperties(config, props, PROP_VALUE_FORMAT_PREFIX, "value.converter.", true);
configToProperties(config, props, PROP_HEADER_FORMAT_PREFIX, "header.converter.", true);
configToProperties(config, props, PROP_SINK_PREFIX + name + ".", SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + name + ".", false);
configToProperties(config, props, PROP_SINK_PREFIX + name + ".", PROP_OFFSET_STORAGE_PREFIX + name + ".", false);
@ -149,7 +158,7 @@ else if (beans.size() > 1) {
props.setProperty("name", name);
LOGGER.debug("Configuration for DebeziumEngine: {}", props);
engine = DebeziumEngine.create(keyFormat, valueFormat)
engine = DebeziumEngine.create(keyFormat, valueFormat, headerFormat)
.using(props)
.using((DebeziumEngine.ConnectorCallback) health)
.using((DebeziumEngine.CompletionCallback) health)
@ -193,6 +202,9 @@ private Class<?> getFormat(Config config, String property) {
if (FORMAT_JSON.equals(formatName)) {
return Json.class;
}
if (FORMAT_JSON_BYTE_ARRAY.equals(formatName)) {
return JsonByteArray.class;
}
else if (FORMAT_CLOUDEVENT.equals(formatName)) {
return CloudEvents.class;
}
@ -205,6 +217,17 @@ else if (FORMAT_PROTOBUF.equals(formatName)) {
throw new DebeziumException("Unknown format '" + formatName + "' for option " + "'" + property + "'");
}
private Class<?> getHeaderFormat(Config config) {
final String formatName = config.getOptionalValue(PROP_HEADER_FORMAT, String.class).orElse(FORMAT_JSON);
if (FORMAT_JSON.equals(formatName)) {
return Json.class;
}
else if (FORMAT_JSON_BYTE_ARRAY.equals(formatName)) {
return JsonByteArray.class;
}
throw new DebeziumException("Unknown format '" + formatName + "' for option " + "'" + PROP_HEADER_FORMAT + "'");
}
public void stop(@Observes ShutdownEvent event) {
try {
LOGGER.info("Received request to stop the engine");

View File

@ -43,16 +43,20 @@ public TestConfigSource() {
String format = System.getProperty("test.apicurio.converter.format");
String formatKey = System.getProperty("debezium.format.key");
String formatValue = System.getProperty("debezium.format.value");
String formatHeader = System.getProperty("debezium.format.header", "json");
if (format != null && format.length() != 0) {
integrationTest.put("debezium.format.key", format);
integrationTest.put("debezium.format.value", format);
integrationTest.put("debezium.format.header", formatHeader);
}
else {
formatKey = (formatKey != null) ? formatKey : Json.class.getSimpleName().toLowerCase();
formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase();
formatHeader = (formatHeader != null) ? formatHeader : Json.class.getSimpleName().toLowerCase();
integrationTest.put("debezium.format.key", formatKey);
integrationTest.put("debezium.format.value", formatValue);
integrationTest.put("debezium.format.header", formatHeader);
}
unitTest.put("debezium.sink.type", "test");
@ -61,7 +65,9 @@ public TestConfigSource() {
unitTest.put("debezium.source.offset.flush.interval.ms", "0");
unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString());
unitTest.put("debezium.source.topic", "topicX");
unitTest.put("debezium.format.header", formatHeader);
unitTest.put("debezium.format.schemas.enable", "true");
unitTest.put("debezium.format.header.schemas.enable", "false");
unitTest.put("debezium.format.value.schemas.enable", "false");
unitTest.put("debezium.transforms", "hoist");
unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value");

View File

@ -18,6 +18,8 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
@ -27,6 +29,7 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.RecordCommitter;
import io.debezium.engine.Header;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
@ -69,18 +72,23 @@ void stop() {
producer.close(Duration.ofSeconds(5));
}
catch (Throwable t) {
LOGGER.warn("Could not close producer {}", t);
LOGGER.warn("Could not close producer", t);
}
}
}
@Override
public void handleBatch(final List<ChangeEvent<Object, Object>> records, final RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
public void handleBatch(final List<ChangeEvent<Object, Object>> records,
final RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(records.size());
for (ChangeEvent<Object, Object> record : records) {
try {
LOGGER.trace("Received event '{}'", record);
producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value()), (metadata, exception) -> {
Headers headers = convertHeaders(record);
producer.send(new ProducerRecord<>(record.destination(), null, null, record.key(), record.value(), headers), (metadata, exception) -> {
if (exception != null) {
LOGGER.error("Failed to send record to {}:", record.destination(), exception);
throw new DebeziumException(exception);
@ -100,4 +108,13 @@ public void handleBatch(final List<ChangeEvent<Object, Object>> records, final R
latch.await();
committer.markBatchFinished();
}
private Headers convertHeaders(ChangeEvent<Object, Object> record) {
List<Header<Object>> headers = record.headers();
Headers kafkaHeaders = new RecordHeaders();
for (Header<Object> header : headers) {
kafkaHeaders.add(header.getKey(), getBytes(header.getValue()));
}
return kafkaHeaders;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.server.kafka;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
public class AddHeaderTransform implements Transformation<SourceRecord> {
@Override
public SourceRecord apply(SourceRecord record) {
Headers headers = new ConnectHeaders();
headers.addString("headerKey", "headerValue");
record = record.newRecord(
record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), headers);
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}

View File

@ -7,6 +7,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -20,6 +21,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
@ -96,5 +98,10 @@ public void testKafka() {
return actual.size() >= MESSAGE_COUNT;
});
assertThat(actual.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
Headers headers = actual.get(0).headers();
assertThat(headers.headers("headerKey")).isNotEmpty();
assertThat(headers.headers("headerKey"))
.allMatch(h -> h.key().equals("headerKey") && Arrays.equals(h.value(), "\"headerValue\"".getBytes(StandardCharsets.UTF_8)));
}
}

View File

@ -29,9 +29,13 @@ public KafkaTestConfigSource() {
kafkaConfig.put("debezium.source.topic.prefix", "testc");
kafkaConfig.put("debezium.source.schema.include.list", "inventory");
kafkaConfig.put("debezium.source.table.include.list", "inventory.customers");
kafkaConfig.put("debezium.format.header.schemas.enable", "false");
// DBZ-5105
kafkaConfig.put("debezium.sink.kafka.producer.ssl.endpoint.identification.algorithm", "");
kafkaConfig.put("debezium.transforms", "addheader");
kafkaConfig.put("debezium.transforms.addheader.type", "io.debezium.server.kafka.AddHeaderTransform");
config = kafkaConfig;
}

View File

@ -259,9 +259,16 @@ Allowed values are:
* `Connect.class` - the output value is change event wrapping Kafka Connect's `SourceRecord`
* `Json.class` - the output value is a pair of key and value encoded as `JSON` strings
* `JsonByteArray.class` - the output value is a pair of key and value formatted as `JSON` and encoded as UTF-8 byte arrays
* `Avro.class` - the output value is a pair of key and value encoded as Avro serialized records (see xref:{link-avro-serialization}[Avro Serialization] for more details)
* `CloudEvents.class` - the output value is a pair of key and value encoded as xref:{link-cloud-events}[Cloud Events] messages
The header format can also be specified when calling `DebeziumEngine#create()`.
Allowed values are:
* `Json.class` - the header values are encoded as `JSON` strings
* `JsonByteArray.class` - the header values are formatted as `JSON` and encoded as UTF-8 byte arrays
Internally, the engine uses the appropriate Kafka Connect converter implementation to which the conversion is delegated.
The converter can be parametrized using engine properties to modify its behaviour.

View File

@ -328,7 +328,7 @@ By default the output is in JSON format but an arbitrary implementation of Kafka
|[[debezium-format-key]]<<debezium-format-key, `debezium.format.key`>>
|`json`
|The name of the output format for key, one of `json`/`avro`/`protobuf`.
|The name of the output format for key, one of `json`/`jsonbytearray`/`avro`/`protobuf`.
|[[debezium-format-key-props]]<<debezium-format-key-props, `debezium.format.key.*`>>
|
@ -336,12 +336,20 @@ By default the output is in JSON format but an arbitrary implementation of Kafka
|[[debezium-format-value]]<<debezium-format-value, `debezium.format.value`>>
|`json`
|The name of the output format for value, one of `json`/`avro`/`protobuf`/`cloudevents`.
|The name of the output format for value, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`cloudevents`.
|[[debezium-format-value-props]]<<debezium-format-value-props, `debezium.format.value.*`>>
|
|Configuration properties passed to the value converter.
|[[debezium-format-header]]<<debezium-format-header, `debezium.format.header`>>
|`json`
|The name of the output format for value, one of `json`/`jsonbytearray`.
|[[debezium-format-header-props]]<<debezium-format-header-props, `debezium.format.header.*`>>
|
|Configuration properties passed to the header converter.
|===
[id="debezium-transformations-configuration-options"]