diff --git a/debezium-api/src/main/java/io/debezium/engine/ChangeEvent.java b/debezium-api/src/main/java/io/debezium/engine/ChangeEvent.java index 58e111371..2da5a500a 100644 --- a/debezium-api/src/main/java/io/debezium/engine/ChangeEvent.java +++ b/debezium-api/src/main/java/io/debezium/engine/ChangeEvent.java @@ -5,10 +5,13 @@ */ package io.debezium.engine; +import java.util.Collections; +import java.util.List; + import io.debezium.common.annotation.Incubating; /** - * A data change event with key and value. + * A data change event with key, value, and headers. * * @param * @param @@ -20,6 +23,10 @@ public interface ChangeEvent { V value(); + default List> headers() { + return Collections.emptyList(); + } + /** * @return A name of the logical destination for which the event is intended */ diff --git a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java index 52fe704fa..ae520bdd9 100644 --- a/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java +++ b/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.java @@ -22,6 +22,7 @@ import io.debezium.common.annotation.Incubating; import io.debezium.engine.format.ChangeEventFormat; import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import io.debezium.engine.format.SerializationFormat; import io.debezium.engine.spi.OffsetCommitPolicy; @@ -287,16 +288,27 @@ static Builder> create(Class + * Convenience method, equivalent to calling {@code create(KeyValueChangeEventFormat.of(MyKeyFormat.class, MyValueFormat.class, MyHeaderFormat.class)}. + * + * @return the new builder; never null + */ + static Builder> create(Class> keyFormat, + Class> valueFormat, + Class> headerFormat) { + return create(KeyValueHeaderChangeEventFormat.of(keyFormat, valueFormat, headerFormat)); + } + static , V extends SerializationFormat> Builder> create(KeyValueChangeEventFormat format) { - final ServiceLoader loader = ServiceLoader.load(BuilderFactory.class); - final Iterator iterator = loader.iterator(); - if (!iterator.hasNext()) { - throw new DebeziumException("No implementation of Debezium engine builder was found"); - } - final BuilderFactory builder = iterator.next(); - if (iterator.hasNext()) { - LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass()); - } + final BuilderFactory builder = determineBuilderFactory(); + return builder.builder(format); + } + + static , V extends SerializationFormat, H extends SerializationFormat> Builder> create(KeyValueHeaderChangeEventFormat format) { + final BuilderFactory builder = determineBuilderFactory(); return builder.builder(format); } @@ -307,6 +319,11 @@ static , V extends SerializationFormat * @return the new builder; never null */ static > Builder> create(ChangeEventFormat format) { + final BuilderFactory builder = determineBuilderFactory(); + return builder.builder(format); + } + + private static BuilderFactory determineBuilderFactory() { final ServiceLoader loader = ServiceLoader.load(BuilderFactory.class); final Iterator iterator = loader.iterator(); if (!iterator.hasNext()) { @@ -316,7 +333,7 @@ static > Builder> creat if (iterator.hasNext()) { LoggerFactory.getLogger(Builder.class).warn("More than one Debezium engine builder implementation was found, using {}", builder.getClass()); } - return builder.builder(format); + return builder; } /** @@ -340,5 +357,15 @@ interface BuilderFactory { * @return this builder object so methods can be chained together; never null */ , V extends SerializationFormat> Builder> builder(KeyValueChangeEventFormat format); + + /** + * Prescribe the output and header formats to be used by the {@link DebeziumEngine}. + * Usually called by {@link DebeziumEngine#create}. + * @param format + * @return this builder object so methods can be chained together; never null + */ + default , V extends SerializationFormat, H extends SerializationFormat> Builder> builder(KeyValueHeaderChangeEventFormat format) { + throw new UnsupportedOperationException("Method must be implemented in order to support headers"); + } } } diff --git a/debezium-api/src/main/java/io/debezium/engine/Header.java b/debezium-api/src/main/java/io/debezium/engine/Header.java new file mode 100644 index 000000000..7887bbd1b --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/engine/Header.java @@ -0,0 +1,23 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine; + +/** + * Represents a header that contains a key and a value. + */ +public interface Header { + + /** + * Key of a header. + */ + String getKey(); + + /** + * Value of a header. + */ + T getValue(); + +} diff --git a/debezium-api/src/main/java/io/debezium/engine/format/JsonByteArray.java b/debezium-api/src/main/java/io/debezium/engine/format/JsonByteArray.java new file mode 100644 index 000000000..ee9a4d14e --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/engine/format/JsonByteArray.java @@ -0,0 +1,12 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine.format; + +/** + * A {@link SerializationFormat} defining the JSON format serialized as byte[]. + */ +public class JsonByteArray implements SerializationFormat { +} diff --git a/debezium-api/src/main/java/io/debezium/engine/format/KeyValueHeaderChangeEventFormat.java b/debezium-api/src/main/java/io/debezium/engine/format/KeyValueHeaderChangeEventFormat.java new file mode 100644 index 000000000..c1d000455 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/engine/format/KeyValueHeaderChangeEventFormat.java @@ -0,0 +1,40 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.engine.format; + +import io.debezium.common.annotation.Incubating; + +/** + * Describes a change event output format comprising a key, value, and a header. + */ +@Incubating +public interface KeyValueHeaderChangeEventFormat, V extends SerializationFormat, H extends SerializationFormat> + extends KeyValueChangeEventFormat { + static , V extends SerializationFormat, H extends SerializationFormat> KeyValueHeaderChangeEventFormat of( + Class keyFormat, + Class valueFormat, + Class headerFormat) { + return new KeyValueHeaderChangeEventFormat<>() { + + @Override + public Class getKeyFormat() { + return keyFormat; + } + + @Override + public Class getValueFormat() { + return valueFormat; + } + + @Override + public Class getHeaderFormat() { + return headerFormat; + } + }; + } + + Class getHeaderFormat(); +} diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java index fb1d6cd01..c30bc3595 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilder.java @@ -8,13 +8,19 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.time.Clock; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.ConverterConfig; +import org.apache.kafka.connect.storage.HeaderConverter; import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; @@ -26,11 +32,14 @@ import io.debezium.engine.DebeziumEngine.CompletionCallback; import io.debezium.engine.DebeziumEngine.ConnectorCallback; import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.Header; import io.debezium.engine.format.Avro; import io.debezium.engine.format.ChangeEventFormat; import io.debezium.engine.format.CloudEvents; import io.debezium.engine.format.Json; +import io.debezium.engine.format.JsonByteArray; import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import io.debezium.engine.format.Protobuf; import io.debezium.engine.format.SerializationFormat; import io.debezium.engine.spi.OffsetCommitPolicy; @@ -44,6 +53,7 @@ public class ConvertingEngineBuilder implements Builder { private static final String CONVERTER_PREFIX = "converter"; + private static final String HEADER_CONVERTER_PREFIX = "header.converter"; private static final String KEY_CONVERTER_PREFIX = "key.converter"; private static final String VALUE_CONVERTER_PREFIX = "value.converter"; private static final String FIELD_CLASS = "class"; @@ -51,6 +61,7 @@ public class ConvertingEngineBuilder implements Builder { private static final String APICURIO_SCHEMA_REGISTRY_URL_CONFIG = "apicurio.registry.url"; private final Builder delegate; + private final Class> formatHeader; private final Class> formatKey; private final Class> formatValue; private Configuration config; @@ -59,15 +70,19 @@ public class ConvertingEngineBuilder implements Builder { private Function fromFormat; ConvertingEngineBuilder(ChangeEventFormat format) { - this.delegate = EmbeddedEngine.create(); - this.formatKey = null; - this.formatValue = format.getValueFormat(); + this(KeyValueHeaderChangeEventFormat.of(null, format.getValueFormat(), null)); } ConvertingEngineBuilder(KeyValueChangeEventFormat format) { + this(format instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat) format + : KeyValueHeaderChangeEventFormat.of(format.getKeyFormat(), format.getValueFormat(), null)); + } + + ConvertingEngineBuilder(KeyValueHeaderChangeEventFormat format) { this.delegate = EmbeddedEngine.create(); this.formatKey = format.getKeyFormat(); this.formatValue = format.getValueFormat(); + this.formatHeader = format.getHeaderFormat(); } @Override @@ -76,8 +91,8 @@ public Builder notifying(Consumer consumer) { return this; } - private boolean isFormat(Class> format1, Class> format2) { - return format1 == (Class) format2; + private static boolean isFormat(Class> format1, Class> format2) { + return format1 == format2; } @Override @@ -154,18 +169,27 @@ public DebeziumEngine build() { final DebeziumEngine engine = delegate.build(); Converter keyConverter; Converter valueConverter; + HeaderConverter headerConverter; if (formatValue == Connect.class) { - toFormat = (record) -> { - return (R) new EmbeddedEngineChangeEvent( - null, - record, - record); - }; + headerConverter = null; + toFormat = (record) -> (R) new EmbeddedEngineChangeEvent( + null, + record, + StreamSupport.stream(record.headers().spliterator(), false) + .map(EmbeddedEngineHeader::new).collect(Collectors.toList()), + record); } else { keyConverter = createConverter(formatKey, true); valueConverter = createConverter(formatValue, false); + if (formatHeader == null) { + headerConverter = null; + } + else { + headerConverter = createHeaderConverter(formatHeader); + } + toFormat = (record) -> { String topicName = record.topic(); if (topicName == null) { @@ -173,21 +197,31 @@ public DebeziumEngine build() { } final byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key()); final byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value()); - return isFormat(formatKey, Json.class) && isFormat(formatValue, Json.class) - || isFormat(formatValue, CloudEvents.class) - ? (R) new EmbeddedEngineChangeEvent( - key != null ? new String(key, StandardCharsets.UTF_8) : null, - value != null ? new String(value, StandardCharsets.UTF_8) : null, - record) - : (R) new EmbeddedEngineChangeEvent( - key, - value, - record); + + List> headers = Collections.emptyList(); + if (headerConverter != null) { + List> byteArrayHeaders = convertHeaders(record, topicName, headerConverter); + headers = (List) byteArrayHeaders; + if (shouldConvertHeadersToString()) { + headers = byteArrayHeaders.stream() + .map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8))) + .collect(Collectors.toList()); + } + } + + return shouldConvertKeyAndValueToString() + ? (R) new EmbeddedEngineChangeEvent<>( + key != null ? new String(key, StandardCharsets.UTF_8) : null, + value != null ? new String(value, StandardCharsets.UTF_8) : null, + (List) headers, + record) + : (R) new EmbeddedEngineChangeEvent<>(key, value, (List) headers, record); }; } - fromFormat = (record) -> ((EmbeddedEngineChangeEvent) record).sourceRecord(); + fromFormat = (record) -> ((EmbeddedEngineChangeEvent) record).sourceRecord(); + HeaderConverter finalHeaderConverter = headerConverter; return new DebeziumEngine() { @Override @@ -197,11 +231,54 @@ public void run() { @Override public void close() throws IOException { + if (finalHeaderConverter != null) { + finalHeaderConverter.close(); + } engine.close(); } }; } + private boolean shouldConvertKeyAndValueToString() { + return isFormat(formatKey, Json.class) && isFormat(formatValue, Json.class) + || isFormat(formatValue, CloudEvents.class); + } + + private boolean shouldConvertHeadersToString() { + return isFormat(formatHeader, Json.class); + } + + private List> convertHeaders( + SourceRecord record, String topicName, HeaderConverter headerConverter) { + List> headers = new ArrayList<>(); + + for (org.apache.kafka.connect.header.Header header : record.headers()) { + String headerKey = header.key(); + byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value()); + headers.add(new EmbeddedEngineHeader<>(headerKey, rawHeader)); + } + + return headers; + } + + private HeaderConverter createHeaderConverter(Class> format) { + Configuration converterConfig = config.subset(HEADER_CONVERTER_PREFIX, true); + final Configuration commonConverterConfig = config.subset(CONVERTER_PREFIX, true); + converterConfig = commonConverterConfig.edit().with(converterConfig) + .with(ConverterConfig.TYPE_CONFIG, "header") + .build(); + + if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) { + converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.json.JsonConverter").build(); + } + else { + throw new DebeziumException("Header Converter '" + format.getSimpleName() + "' is not supported"); + } + final HeaderConverter converter = converterConfig.getInstance(FIELD_CLASS, HeaderConverter.class); + converter.configure(converterConfig.asMap()); + return converter; + } + private Converter createConverter(Class> format, boolean key) { // The converters can be configured both using converter.* prefix for cases when both converters // are the same or using key.converter.* and value.converter.* converter when converters @@ -210,7 +287,7 @@ private Converter createConverter(Class> format final Configuration commonConverterConfig = config.subset(CONVERTER_PREFIX, true); converterConfig = commonConverterConfig.edit().with(converterConfig).build(); - if (isFormat(format, Json.class)) { + if (isFormat(format, Json.class) || isFormat(format, JsonByteArray.class)) { if (converterConfig.hasKey(APICURIO_SCHEMA_REGISTRY_URL_CONFIG)) { converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.apicurio.registry.utils.converter.ExtJsonConverter").build(); } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilderFactory.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilderFactory.java index 65f8dd252..16dd7e173 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilderFactory.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConvertingEngineBuilderFactory.java @@ -11,6 +11,7 @@ import io.debezium.engine.RecordChangeEvent; import io.debezium.engine.format.ChangeEventFormat; import io.debezium.engine.format.KeyValueChangeEventFormat; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import io.debezium.engine.format.SerializationFormat; public class ConvertingEngineBuilderFactory implements BuilderFactory { @@ -25,4 +26,9 @@ public , V extends SerializationFormat KeyValueChangeEventFormat format) { return new ConvertingEngineBuilder<>(format); } + + public , V extends SerializationFormat, H extends SerializationFormat> Builder> builder( + KeyValueHeaderChangeEventFormat format) { + return new ConvertingEngineBuilder<>(format); + } } diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java index 92bb5be94..d8434d7ad 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java @@ -5,20 +5,25 @@ */ package io.debezium.embedded; +import java.util.List; + import org.apache.kafka.connect.source.SourceRecord; import io.debezium.engine.ChangeEvent; +import io.debezium.engine.Header; import io.debezium.engine.RecordChangeEvent; -class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent { +class EmbeddedEngineChangeEvent implements ChangeEvent, RecordChangeEvent { private final K key; private final V value; + private final List> headers; private final SourceRecord sourceRecord; - EmbeddedEngineChangeEvent(K key, V value, SourceRecord sourceRecord) { + EmbeddedEngineChangeEvent(K key, V value, List> headers, SourceRecord sourceRecord) { this.key = key; this.value = value; + this.headers = headers; this.sourceRecord = sourceRecord; } @@ -32,6 +37,12 @@ public V value() { return value; } + @SuppressWarnings("unchecked") + @Override + public List> headers() { + return headers; + } + @Override public V record() { return value; diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineHeader.java b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineHeader.java new file mode 100644 index 000000000..25dd797e7 --- /dev/null +++ b/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngineHeader.java @@ -0,0 +1,34 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded; + +import io.debezium.engine.Header; + +public class EmbeddedEngineHeader implements Header { + + private final String key; + private final T value; + + public EmbeddedEngineHeader(String key, T value) { + this.key = key; + this.value = value; + } + + public EmbeddedEngineHeader(org.apache.kafka.connect.header.Header header) { + this.key = header.key(); + this.value = (T) header.value(); + } + + @Override + public String getKey() { + return key; + } + + @Override + public T getValue() { + return value; + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index 66ad684e0..2b38c3439 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -14,6 +14,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -32,6 +33,8 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.file.FileStreamSourceConnector; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -45,6 +48,7 @@ import org.junit.Test; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Charsets; import io.debezium.DebeziumException; import io.debezium.config.Configuration; @@ -52,9 +56,11 @@ import io.debezium.doc.FixFor; import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.Header; import io.debezium.engine.RecordChangeEvent; import io.debezium.engine.format.ChangeEventFormat; import io.debezium.engine.format.Json; +import io.debezium.engine.format.JsonByteArray; import io.debezium.engine.spi.OffsetCommitPolicy; import io.debezium.util.Collect; import io.debezium.util.LoggingContext; @@ -150,7 +156,7 @@ public void verifyNonAsciiContentHandledCorrectly() throws Exception { CountDownLatch firstLatch = new CountDownLatch(1); // create an engine with our custom class - final DebeziumEngine> engine = DebeziumEngine.create(Json.class, Json.class) + final DebeziumEngine> engine = DebeziumEngine.create(Json.class, Json.class, Json.class) .using(props) .notifying((records, committer) -> { assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES); @@ -363,10 +369,15 @@ public void shouldRunDebeziumEngine() throws Exception { props.setProperty("offset.flush.interval.ms", "0"); props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); props.setProperty("topic", "topicX"); + props.setProperty("transforms", "header"); + props.setProperty("transforms.header.type", AddHeaderTransform.class.getName()); CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch allLatch = new CountDownLatch(6); + Headers expectedHeaders = new ConnectHeaders(); + expectedHeaders.addString("headerKey", "headerValue"); + // create an engine with our custom class final DebeziumEngine> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)) .using(props) @@ -375,6 +386,7 @@ public void shouldRunDebeziumEngine() throws Exception { Integer groupCount = records.size() / NUMBER_OF_LINES; for (RecordChangeEvent r : records) { + assertThat(r.record().headers()).isEqualTo(expectedHeaders); committer.markProcessed(r); } @@ -592,12 +604,16 @@ public void shouldRunDebeziumEngineWithJson() throws Exception { props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); props.setProperty("topic", "topicX"); props.setProperty("converter.schemas.enable", "false"); + props.setProperty("transforms", "header"); + props.setProperty("transforms.header.type", AddHeaderTransform.class.getName()); CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch allLatch = new CountDownLatch(6); + EmbeddedEngineHeader expectedHeader = new EmbeddedEngineHeader<>("headerKey", "\"headerValue\""); + // create an engine with our custom class - final DebeziumEngine> engine = DebeziumEngine.create(Json.class) + final DebeziumEngine> engine = DebeziumEngine.create(Json.class, Json.class, Json.class) .using(props) .notifying((records, committer) -> { assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES); @@ -606,6 +622,10 @@ public void shouldRunDebeziumEngineWithJson() throws Exception { for (ChangeEvent r : records) { assertThat(r.key()).isNull(); assertThat(r.value()).startsWith("\"Generated line number "); + + List> headers = r.headers(); + assertThat(headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && h.getValue().equals(expectedHeader.getValue())); + committer.markProcessed(r); } @@ -639,39 +659,74 @@ public void shouldRunDebeziumEngineWithJson() throws Exception { stopConnector(); } - protected void appendLinesToSource(int numberOfLines) throws IOException { - CharSequence[] lines = new CharSequence[numberOfLines]; - for (int i = 0; i != numberOfLines; ++i) { - lines[i] = generateLine(linesAdded + i + 1); + @Test + @FixFor("DBZ-5926") + public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception { + // Add initial content to the file ... + appendLinesToSource(NUMBER_OF_LINES); + + final Properties props = new Properties(); + props.setProperty("name", "debezium-engine"); + props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector"); + props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.setProperty("offset.flush.interval.ms", "0"); + props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); + props.setProperty("topic", "topicX"); + props.setProperty("converter.schemas.enable", "false"); + props.setProperty("transforms", "header"); + props.setProperty("transforms.header.type", AddHeaderTransform.class.getName()); + + CountDownLatch firstLatch = new CountDownLatch(1); + CountDownLatch allLatch = new CountDownLatch(6); + + EmbeddedEngineHeader expectedHeader = new EmbeddedEngineHeader<>("headerKey", "\"headerValue\"".getBytes(StandardCharsets.UTF_8)); + + // create an engine with our custom class + final DebeziumEngine> engine = DebeziumEngine.create(Json.class, JsonByteArray.class, JsonByteArray.class) + .using(props) + .notifying((records, committer) -> { + assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES); + int groupCount = records.size() / NUMBER_OF_LINES; + + for (ChangeEvent r : records) { + assertThat(r.key()).isNull(); + assertThat(new String(r.value(), Charsets.UTF_8)).startsWith("\"Generated line number "); + + List> headers = r.headers(); + assertThat(headers).hasSize(1); + assertThat(headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && Arrays.equals(h.getValue(), expectedHeader.getValue())); + + committer.markProcessed(r); + } + + committer.markBatchFinished(); + firstLatch.countDown(); + for (int i = 0; i < groupCount; i++) { + allLatch.countDown(); + } + }) + .using(this.getClass().getClassLoader()) + .build(); + + ExecutorService exec = Executors.newFixedThreadPool(1); + exec.execute(() -> { + LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); + engine.run(); + }); + + firstLatch.await(5000, TimeUnit.MILLISECONDS); + assertThat(firstLatch.getCount()).isEqualTo(0); + + for (int i = 0; i < 5; i++) { + // Add a few more lines, and then verify they are consumed ... + appendLinesToSource(NUMBER_OF_LINES); + Thread.sleep(10); } - java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); - linesAdded += numberOfLines; - } + allLatch.await(5000, TimeUnit.MILLISECONDS); + assertThat(allLatch.getCount()).isEqualTo(0); - protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException { - CharSequence[] lines = new CharSequence[numberOfLines]; - for (int i = 0; i != numberOfLines; ++i) { - lines[i] = generateLine(linePrefix, linesAdded + i + 1); - } - java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); - linesAdded += numberOfLines; - } - - protected String generateLine(int lineNumber) { - return generateLine("Generated line number ", lineNumber); - } - - protected String generateLine(String linePrefix, int lineNumber) { - return linePrefix + lineNumber; - } - - protected void consumeLines(int numberOfLines) throws InterruptedException { - consumeRecords(numberOfLines, 3, record -> { - String line = record.value().toString(); - assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber)); - ++nextConsumedLineNumber; - }, - false); + // Stop the connector ... + stopConnector(); } @Test @@ -725,6 +780,68 @@ public void validationThrowsException() throws Exception { assertThat(errorReference.get()).contains("Connector configuration is not valid. "); assertThat(this.engine.isRunning()).isFalse(); } + + protected void appendLinesToSource(int numberOfLines) throws IOException { + CharSequence[] lines = new CharSequence[numberOfLines]; + for (int i = 0; i != numberOfLines; ++i) { + lines[i] = generateLine(linesAdded + i + 1); + } + java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); + linesAdded += numberOfLines; + } + + protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException { + CharSequence[] lines = new CharSequence[numberOfLines]; + for (int i = 0; i != numberOfLines; ++i) { + lines[i] = generateLine(linePrefix, linesAdded + i + 1); + } + java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); + linesAdded += numberOfLines; + } + + protected String generateLine(int lineNumber) { + return generateLine("Generated line number ", lineNumber); + } + + protected String generateLine(String linePrefix, int lineNumber) { + return linePrefix + lineNumber; + } + + protected void consumeLines(int numberOfLines) throws InterruptedException { + consumeRecords(numberOfLines, 3, record -> { + String line = record.value().toString(); + assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber)); + ++nextConsumedLineNumber; + }, + false); + } + + public static class AddHeaderTransform implements Transformation { + + @Override + public void configure(Map configs) { + } + + @Override + public SourceRecord apply(SourceRecord record) { + Headers headers = new ConnectHeaders(); + headers.addString("headerKey", "headerValue"); + + record = record.newRecord( + record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), headers); + + return record; + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + } + } } class InterruptedConnector extends SimpleSourceConnector { diff --git a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java index d1a677a28..873347fde 100644 --- a/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java +++ b/debezium-server/debezium-server-core/src/main/java/io/debezium/server/DebeziumServer.java @@ -37,6 +37,7 @@ import io.debezium.engine.format.Avro; import io.debezium.engine.format.CloudEvents; import io.debezium.engine.format.Json; +import io.debezium.engine.format.JsonByteArray; import io.debezium.engine.format.Protobuf; import io.debezium.relational.history.SchemaHistory; import io.debezium.server.events.ConnectorCompletedEvent; @@ -67,6 +68,7 @@ public class DebeziumServer { private static final String PROP_FORMAT_PREFIX = PROP_PREFIX + "format."; private static final String PROP_PREDICATES_PREFIX = PROP_PREFIX + "predicates."; private static final String PROP_TRANSFORMS_PREFIX = PROP_PREFIX + "transforms."; + private static final String PROP_HEADER_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "header."; private static final String PROP_KEY_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "key."; private static final String PROP_VALUE_FORMAT_PREFIX = PROP_FORMAT_PREFIX + "value."; private static final String PROP_OFFSET_STORAGE_PREFIX = "offset.storage."; @@ -74,11 +76,14 @@ public class DebeziumServer { private static final String PROP_PREDICATES = PROP_PREFIX + "predicates"; private static final String PROP_TRANSFORMS = PROP_PREFIX + "transforms"; private static final String PROP_SINK_TYPE = PROP_SINK_PREFIX + "type"; + + private static final String PROP_HEADER_FORMAT = PROP_FORMAT_PREFIX + "header"; private static final String PROP_KEY_FORMAT = PROP_FORMAT_PREFIX + "key"; private static final String PROP_VALUE_FORMAT = PROP_FORMAT_PREFIX + "value"; private static final String PROP_TERMINATION_WAIT = PROP_PREFIX + "termination.wait"; private static final String FORMAT_JSON = Json.class.getSimpleName().toLowerCase(); + private static final String FORMAT_JSON_BYTE_ARRAY = JsonByteArray.class.getSimpleName().toLowerCase(); private static final String FORMAT_CLOUDEVENT = CloudEvents.class.getSimpleName().toLowerCase(); private static final String FORMAT_AVRO = Avro.class.getSimpleName().toLowerCase(); private static final String FORMAT_PROTOBUF = Protobuf.class.getSimpleName().toLowerCase(); @@ -126,11 +131,15 @@ else if (beans.size() > 1) { final Class keyFormat = (Class) getFormat(config, PROP_KEY_FORMAT); final Class valueFormat = (Class) getFormat(config, PROP_VALUE_FORMAT); + final Class headerFormat = (Class) getHeaderFormat(config); + configToProperties(config, props, PROP_SOURCE_PREFIX, "", true); configToProperties(config, props, PROP_FORMAT_PREFIX, "key.converter.", true); configToProperties(config, props, PROP_FORMAT_PREFIX, "value.converter.", true); + configToProperties(config, props, PROP_FORMAT_PREFIX, "header.converter.", true); configToProperties(config, props, PROP_KEY_FORMAT_PREFIX, "key.converter.", true); configToProperties(config, props, PROP_VALUE_FORMAT_PREFIX, "value.converter.", true); + configToProperties(config, props, PROP_HEADER_FORMAT_PREFIX, "header.converter.", true); configToProperties(config, props, PROP_SINK_PREFIX + name + ".", SchemaHistory.CONFIGURATION_FIELD_PREFIX_STRING + name + ".", false); configToProperties(config, props, PROP_SINK_PREFIX + name + ".", PROP_OFFSET_STORAGE_PREFIX + name + ".", false); @@ -149,7 +158,7 @@ else if (beans.size() > 1) { props.setProperty("name", name); LOGGER.debug("Configuration for DebeziumEngine: {}", props); - engine = DebeziumEngine.create(keyFormat, valueFormat) + engine = DebeziumEngine.create(keyFormat, valueFormat, headerFormat) .using(props) .using((DebeziumEngine.ConnectorCallback) health) .using((DebeziumEngine.CompletionCallback) health) @@ -193,6 +202,9 @@ private Class getFormat(Config config, String property) { if (FORMAT_JSON.equals(formatName)) { return Json.class; } + if (FORMAT_JSON_BYTE_ARRAY.equals(formatName)) { + return JsonByteArray.class; + } else if (FORMAT_CLOUDEVENT.equals(formatName)) { return CloudEvents.class; } @@ -205,6 +217,17 @@ else if (FORMAT_PROTOBUF.equals(formatName)) { throw new DebeziumException("Unknown format '" + formatName + "' for option " + "'" + property + "'"); } + private Class getHeaderFormat(Config config) { + final String formatName = config.getOptionalValue(PROP_HEADER_FORMAT, String.class).orElse(FORMAT_JSON); + if (FORMAT_JSON.equals(formatName)) { + return Json.class; + } + else if (FORMAT_JSON_BYTE_ARRAY.equals(formatName)) { + return JsonByteArray.class; + } + throw new DebeziumException("Unknown format '" + formatName + "' for option " + "'" + PROP_HEADER_FORMAT + "'"); + } + public void stop(@Observes ShutdownEvent event) { try { LOGGER.info("Received request to stop the engine"); diff --git a/debezium-server/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java b/debezium-server/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java index ac93a3bbd..10fdc7bdb 100644 --- a/debezium-server/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java +++ b/debezium-server/debezium-server-core/src/test/java/io/debezium/server/TestConfigSource.java @@ -43,16 +43,20 @@ public TestConfigSource() { String format = System.getProperty("test.apicurio.converter.format"); String formatKey = System.getProperty("debezium.format.key"); String formatValue = System.getProperty("debezium.format.value"); + String formatHeader = System.getProperty("debezium.format.header", "json"); if (format != null && format.length() != 0) { integrationTest.put("debezium.format.key", format); integrationTest.put("debezium.format.value", format); + integrationTest.put("debezium.format.header", formatHeader); } else { formatKey = (formatKey != null) ? formatKey : Json.class.getSimpleName().toLowerCase(); formatValue = (formatValue != null) ? formatValue : Json.class.getSimpleName().toLowerCase(); + formatHeader = (formatHeader != null) ? formatHeader : Json.class.getSimpleName().toLowerCase(); integrationTest.put("debezium.format.key", formatKey); integrationTest.put("debezium.format.value", formatValue); + integrationTest.put("debezium.format.header", formatHeader); } unitTest.put("debezium.sink.type", "test"); @@ -61,7 +65,9 @@ public TestConfigSource() { unitTest.put("debezium.source.offset.flush.interval.ms", "0"); unitTest.put("debezium.source.file", TEST_FILE_PATH.toAbsolutePath().toString()); unitTest.put("debezium.source.topic", "topicX"); + unitTest.put("debezium.format.header", formatHeader); unitTest.put("debezium.format.schemas.enable", "true"); + unitTest.put("debezium.format.header.schemas.enable", "false"); unitTest.put("debezium.format.value.schemas.enable", "false"); unitTest.put("debezium.transforms", "hoist"); unitTest.put("debezium.transforms.hoist.type", "org.apache.kafka.connect.transforms.HoistField$Value"); diff --git a/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java b/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java index aac2e2d33..afa42e92d 100644 --- a/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java +++ b/debezium-server/debezium-server-kafka/src/main/java/io/debezium/server/kafka/KafkaChangeConsumer.java @@ -18,6 +18,8 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; @@ -27,6 +29,7 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.Header; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.CustomConsumerBuilder; @@ -69,18 +72,23 @@ void stop() { producer.close(Duration.ofSeconds(5)); } catch (Throwable t) { - LOGGER.warn("Could not close producer {}", t); + LOGGER.warn("Could not close producer", t); } } } @Override - public void handleBatch(final List> records, final RecordCommitter> committer) throws InterruptedException { + public void handleBatch(final List> records, + final RecordCommitter> committer) + throws InterruptedException { final CountDownLatch latch = new CountDownLatch(records.size()); for (ChangeEvent record : records) { try { LOGGER.trace("Received event '{}'", record); - producer.send(new ProducerRecord<>(record.destination(), record.key(), record.value()), (metadata, exception) -> { + + Headers headers = convertHeaders(record); + + producer.send(new ProducerRecord<>(record.destination(), null, null, record.key(), record.value(), headers), (metadata, exception) -> { if (exception != null) { LOGGER.error("Failed to send record to {}:", record.destination(), exception); throw new DebeziumException(exception); @@ -100,4 +108,13 @@ public void handleBatch(final List> records, final R latch.await(); committer.markBatchFinished(); } + + private Headers convertHeaders(ChangeEvent record) { + List> headers = record.headers(); + Headers kafkaHeaders = new RecordHeaders(); + for (Header header : headers) { + kafkaHeaders.add(header.getKey(), getBytes(header.getValue())); + } + return kafkaHeaders; + } } diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/AddHeaderTransform.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/AddHeaderTransform.java new file mode 100644 index 000000000..595de2e32 --- /dev/null +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/AddHeaderTransform.java @@ -0,0 +1,43 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.server.kafka; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; + +public class AddHeaderTransform implements Transformation { + + @Override + public SourceRecord apply(SourceRecord record) { + Headers headers = new ConnectHeaders(); + headers.addString("headerKey", "headerValue"); + + record = record.newRecord( + record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), headers); + + return record; + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(Map map) { + + } +} diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java index c706967f9..0db2e16ae 100644 --- a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaIT.java @@ -7,6 +7,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -20,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; @@ -96,5 +98,10 @@ public void testKafka() { return actual.size() >= MESSAGE_COUNT; }); assertThat(actual.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT); + Headers headers = actual.get(0).headers(); + assertThat(headers.headers("headerKey")).isNotEmpty(); + assertThat(headers.headers("headerKey")) + .allMatch(h -> h.key().equals("headerKey") && Arrays.equals(h.value(), "\"headerValue\"".getBytes(StandardCharsets.UTF_8))); + } } diff --git a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java index 167012a35..05dca5da6 100644 --- a/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java +++ b/debezium-server/debezium-server-kafka/src/test/java/io/debezium/server/kafka/KafkaTestConfigSource.java @@ -29,9 +29,13 @@ public KafkaTestConfigSource() { kafkaConfig.put("debezium.source.topic.prefix", "testc"); kafkaConfig.put("debezium.source.schema.include.list", "inventory"); kafkaConfig.put("debezium.source.table.include.list", "inventory.customers"); + kafkaConfig.put("debezium.format.header.schemas.enable", "false"); // DBZ-5105 kafkaConfig.put("debezium.sink.kafka.producer.ssl.endpoint.identification.algorithm", ""); + kafkaConfig.put("debezium.transforms", "addheader"); + kafkaConfig.put("debezium.transforms.addheader.type", "io.debezium.server.kafka.AddHeaderTransform"); + config = kafkaConfig; } diff --git a/documentation/modules/ROOT/pages/development/engine.adoc b/documentation/modules/ROOT/pages/development/engine.adoc index 031a116c9..4ca04e1cc 100644 --- a/documentation/modules/ROOT/pages/development/engine.adoc +++ b/documentation/modules/ROOT/pages/development/engine.adoc @@ -259,9 +259,16 @@ Allowed values are: * `Connect.class` - the output value is change event wrapping Kafka Connect's `SourceRecord` * `Json.class` - the output value is a pair of key and value encoded as `JSON` strings +* `JsonByteArray.class` - the output value is a pair of key and value formatted as `JSON` and encoded as UTF-8 byte arrays * `Avro.class` - the output value is a pair of key and value encoded as Avro serialized records (see xref:{link-avro-serialization}[Avro Serialization] for more details) * `CloudEvents.class` - the output value is a pair of key and value encoded as xref:{link-cloud-events}[Cloud Events] messages +The header format can also be specified when calling `DebeziumEngine#create()`. +Allowed values are: + +* `Json.class` - the header values are encoded as `JSON` strings +* `JsonByteArray.class` - the header values are formatted as `JSON` and encoded as UTF-8 byte arrays + Internally, the engine uses the appropriate Kafka Connect converter implementation to which the conversion is delegated. The converter can be parametrized using engine properties to modify its behaviour. diff --git a/documentation/modules/ROOT/pages/operations/debezium-server.adoc b/documentation/modules/ROOT/pages/operations/debezium-server.adoc index a223dc311..7c1cf3ffa 100644 --- a/documentation/modules/ROOT/pages/operations/debezium-server.adoc +++ b/documentation/modules/ROOT/pages/operations/debezium-server.adoc @@ -328,7 +328,7 @@ By default the output is in JSON format but an arbitrary implementation of Kafka |[[debezium-format-key]]<> |`json` -|The name of the output format for key, one of `json`/`avro`/`protobuf`. +|The name of the output format for key, one of `json`/`jsonbytearray`/`avro`/`protobuf`. |[[debezium-format-key-props]]<> | @@ -336,12 +336,20 @@ By default the output is in JSON format but an arbitrary implementation of Kafka |[[debezium-format-value]]<> |`json` -|The name of the output format for value, one of `json`/`avro`/`protobuf`/`cloudevents`. +|The name of the output format for value, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`cloudevents`. |[[debezium-format-value-props]]<> | |Configuration properties passed to the value converter. +|[[debezium-format-header]]<> +|`json` +|The name of the output format for value, one of `json`/`jsonbytearray`. + +|[[debezium-format-header-props]]<> +| +|Configuration properties passed to the header converter. + |=== [id="debezium-transformations-configuration-options"]