DBZ-8082: Pass Headers to Key/Value Converters
This commit is contained in:
parent
42fbde8268
commit
8617f474c5
@ -7,13 +7,13 @@
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.apache.kafka.connect.source.SourceRecord;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.storage.ConverterConfig;
|
||||
@ -100,19 +100,25 @@ public Function<SourceRecord, R> toFormat(HeaderConverter headerConverter) {
|
||||
if (topicName == null) {
|
||||
topicName = TOPIC_NAME;
|
||||
}
|
||||
final byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key());
|
||||
final byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value());
|
||||
org.apache.kafka.common.header.internals.RecordHeaders recordHeaders = new RecordHeaders();
|
||||
|
||||
List<Header<?>> headers = Collections.emptyList();
|
||||
if (headerConverter != null) {
|
||||
List<Header<byte[]>> byteArrayHeaders = convertHeaders(record, topicName, headerConverter);
|
||||
headers = (List) byteArrayHeaders;
|
||||
for (org.apache.kafka.connect.header.Header header : record.headers()) {
|
||||
byte[] rawHeader = headerConverter.fromConnectHeader(topicName, header.key(), header.schema(), header.value());
|
||||
recordHeaders.add(header.key(), rawHeader);
|
||||
}
|
||||
}
|
||||
|
||||
final byte[] key = keyConverter.fromConnectData(topicName, recordHeaders, record.keySchema(), record.key());
|
||||
final byte[] value = valueConverter.fromConnectData(topicName, recordHeaders, record.valueSchema(), record.value());
|
||||
|
||||
List<Header<byte[]>> byteArrayHeaders = convertHeaders(recordHeaders);
|
||||
List<Header<?>> headers = (List) byteArrayHeaders;
|
||||
if (shouldConvertHeadersToString()) {
|
||||
headers = byteArrayHeaders.stream()
|
||||
.map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8)))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
Object convertedKey = key;
|
||||
Object convertedValue = value;
|
||||
if (key != null && shouldConvertKeyToString()) {
|
||||
@ -148,14 +154,11 @@ private boolean shouldConvertHeadersToString() {
|
||||
return isFormat(formatHeader, Json.class);
|
||||
}
|
||||
|
||||
private List<Header<byte[]>> convertHeaders(
|
||||
SourceRecord record, String topicName, HeaderConverter headerConverter) {
|
||||
private List<Header<byte[]>> convertHeaders(org.apache.kafka.common.header.Headers recordHeaders) {
|
||||
List<Header<byte[]>> headers = new ArrayList<>();
|
||||
|
||||
for (org.apache.kafka.connect.header.Header header : record.headers()) {
|
||||
String headerKey = header.key();
|
||||
byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value());
|
||||
headers.add(new EmbeddedEngineHeader<>(headerKey, rawHeader));
|
||||
for (org.apache.kafka.common.header.Header header : recordHeaders) {
|
||||
headers.add(new EmbeddedEngineHeader<>(header.key(), header.value()));
|
||||
}
|
||||
|
||||
return headers;
|
||||
|
@ -41,8 +41,12 @@
|
||||
import io.debezium.doc.FixFor;
|
||||
import io.debezium.embedded.AbstractConnectorTest;
|
||||
import io.debezium.embedded.DebeziumEngineTestUtils;
|
||||
import io.debezium.embedded.EmbeddedEngineChangeEvent;
|
||||
import io.debezium.embedded.EmbeddedEngineConfig;
|
||||
import io.debezium.embedded.EmbeddedEngineHeader;
|
||||
import io.debezium.engine.DebeziumEngine;
|
||||
import io.debezium.engine.format.Json;
|
||||
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
|
||||
import io.debezium.junit.logging.LogInterceptor;
|
||||
import io.debezium.util.LoggingContext;
|
||||
import io.debezium.util.Testing;
|
||||
@ -219,6 +223,47 @@ public void testTasksAreStoppedIfSomeFailsToStart() {
|
||||
waitForEngineToStop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeaderConverter() throws Exception {
|
||||
final Properties props = new Properties();
|
||||
props.setProperty(ConnectorConfig.NAME_CONFIG, "debezium-engine");
|
||||
props.setProperty(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||
props.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName());
|
||||
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
|
||||
props.setProperty(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0");
|
||||
props.setProperty(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString());
|
||||
props.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic");
|
||||
props.setProperty("transforms", "header");
|
||||
props.setProperty("transforms.header.type", "io.debezium.embedded.async.FixedValueHeader");
|
||||
|
||||
appendLinesToSource(1);
|
||||
CountDownLatch recordsLatch = new CountDownLatch(1); // 1 count down for headers
|
||||
|
||||
DebeziumEngine.Builder<EmbeddedEngineChangeEvent> builder = new AsyncEmbeddedEngine.AsyncEngineBuilder(
|
||||
KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class));
|
||||
DebeziumEngine<EmbeddedEngineChangeEvent> embeddedEngine = builder
|
||||
.using(props)
|
||||
.using(new TestEngineConnectorCallback())
|
||||
.notifying((records, committer) -> {
|
||||
for (EmbeddedEngineChangeEvent r : records) {
|
||||
committer.markProcessed(r);
|
||||
assertThat(r.headers().size()).isEqualTo(1);
|
||||
assertThat(
|
||||
((EmbeddedEngineHeader) r.headers().get(0)).getValue()).isEqualTo("{\"schema\":{\"type\":\"int32\",\"optional\":false},\"payload\":2}");
|
||||
}
|
||||
committer.markBatchFinished();
|
||||
}).build();
|
||||
|
||||
engineExecSrv.submit(() -> {
|
||||
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
|
||||
embeddedEngine.run();
|
||||
});
|
||||
|
||||
recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS);
|
||||
|
||||
embeddedEngine.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompletionCallbackCalledUponSuccess() throws Exception {
|
||||
final Properties props = new Properties();
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Copyright Debezium Authors.
|
||||
*
|
||||
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
|
||||
*/
|
||||
package io.debezium.embedded.async;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.connect.connector.ConnectRecord;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.header.ConnectHeaders;
|
||||
import org.apache.kafka.connect.header.Headers;
|
||||
import org.apache.kafka.connect.transforms.Transformation;
|
||||
|
||||
public class FixedValueHeader<R extends ConnectRecord<R>> implements Transformation<R> {
|
||||
|
||||
@Override
|
||||
public R apply(R record) {
|
||||
Headers headers = new ConnectHeaders();
|
||||
headers.add("fixed-key", 2, Schema.INT32_SCHEMA);
|
||||
headers.forEach(h -> record.headers().add(h));
|
||||
|
||||
return record.newRecord(
|
||||
record.topic(),
|
||||
record.kafkaPartition(),
|
||||
record.keySchema(),
|
||||
record.key(),
|
||||
record.valueSchema(),
|
||||
record.value(),
|
||||
record.timestamp(),
|
||||
headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef config() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user