From 8617f474c57fd372c6c12e8450333444ae287d38 Mon Sep 17 00:00:00 2001 From: Ryan van Huuksloot Date: Mon, 12 Aug 2024 21:31:35 -0400 Subject: [PATCH] DBZ-8082: Pass Headers to Key/Value Converters --- .../debezium/embedded/ConverterBuilder.java | 35 +++++++------ .../async/AsyncEmbeddedEngineTest.java | 45 +++++++++++++++++ .../embedded/async/FixedValueHeader.java | 49 +++++++++++++++++++ 3 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 debezium-embedded/src/test/java/io/debezium/embedded/async/FixedValueHeader.java diff --git a/debezium-embedded/src/main/java/io/debezium/embedded/ConverterBuilder.java b/debezium-embedded/src/main/java/io/debezium/embedded/ConverterBuilder.java index a40478909..483902dbd 100644 --- a/debezium-embedded/src/main/java/io/debezium/embedded/ConverterBuilder.java +++ b/debezium-embedded/src/main/java/io/debezium/embedded/ConverterBuilder.java @@ -7,13 +7,13 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; @@ -100,19 +100,25 @@ public Function toFormat(HeaderConverter headerConverter) { if (topicName == null) { topicName = TOPIC_NAME; } - final byte[] key = keyConverter.fromConnectData(topicName, record.keySchema(), record.key()); - final byte[] value = valueConverter.fromConnectData(topicName, record.valueSchema(), record.value()); + org.apache.kafka.common.header.internals.RecordHeaders recordHeaders = new RecordHeaders(); - List> headers = Collections.emptyList(); if (headerConverter != null) { - List> byteArrayHeaders = convertHeaders(record, topicName, headerConverter); - headers = (List) byteArrayHeaders; - if (shouldConvertHeadersToString()) { - headers = byteArrayHeaders.stream() - .map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8))) - .collect(Collectors.toList()); + for (org.apache.kafka.connect.header.Header header : record.headers()) { + byte[] rawHeader = headerConverter.fromConnectHeader(topicName, header.key(), header.schema(), header.value()); + recordHeaders.add(header.key(), rawHeader); } } + + final byte[] key = keyConverter.fromConnectData(topicName, recordHeaders, record.keySchema(), record.key()); + final byte[] value = valueConverter.fromConnectData(topicName, recordHeaders, record.valueSchema(), record.value()); + + List> byteArrayHeaders = convertHeaders(recordHeaders); + List> headers = (List) byteArrayHeaders; + if (shouldConvertHeadersToString()) { + headers = byteArrayHeaders.stream() + .map(h -> new EmbeddedEngineHeader<>(h.getKey(), new String(h.getValue(), StandardCharsets.UTF_8))) + .collect(Collectors.toList()); + } Object convertedKey = key; Object convertedValue = value; if (key != null && shouldConvertKeyToString()) { @@ -148,14 +154,11 @@ private boolean shouldConvertHeadersToString() { return isFormat(formatHeader, Json.class); } - private List> convertHeaders( - SourceRecord record, String topicName, HeaderConverter headerConverter) { + private List> convertHeaders(org.apache.kafka.common.header.Headers recordHeaders) { List> headers = new ArrayList<>(); - for (org.apache.kafka.connect.header.Header header : record.headers()) { - String headerKey = header.key(); - byte[] rawHeader = headerConverter.fromConnectHeader(topicName, headerKey, header.schema(), header.value()); - headers.add(new EmbeddedEngineHeader<>(headerKey, rawHeader)); + for (org.apache.kafka.common.header.Header header : recordHeaders) { + headers.add(new EmbeddedEngineHeader<>(header.key(), header.value())); } return headers; diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java index b8ff8b7cb..ef8f388cd 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/AsyncEmbeddedEngineTest.java @@ -41,8 +41,12 @@ import io.debezium.doc.FixFor; import io.debezium.embedded.AbstractConnectorTest; import io.debezium.embedded.DebeziumEngineTestUtils; +import io.debezium.embedded.EmbeddedEngineChangeEvent; import io.debezium.embedded.EmbeddedEngineConfig; +import io.debezium.embedded.EmbeddedEngineHeader; import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.format.KeyValueHeaderChangeEventFormat; import io.debezium.junit.logging.LogInterceptor; import io.debezium.util.LoggingContext; import io.debezium.util.Testing; @@ -219,6 +223,47 @@ public void testTasksAreStoppedIfSomeFailsToStart() { waitForEngineToStop(); } + @Test + public void testHeaderConverter() throws Exception { + final Properties props = new Properties(); + props.setProperty(ConnectorConfig.NAME_CONFIG, "debezium-engine"); + props.setProperty(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + props.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, FileStreamSourceConnector.class.getName()); + props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString()); + props.setProperty(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "0"); + props.setProperty(FileStreamSourceConnector.FILE_CONFIG, TEST_FILE_PATH.toAbsolutePath().toString()); + props.setProperty(FileStreamSourceConnector.TOPIC_CONFIG, "testTopic"); + props.setProperty("transforms", "header"); + props.setProperty("transforms.header.type", "io.debezium.embedded.async.FixedValueHeader"); + + appendLinesToSource(1); + CountDownLatch recordsLatch = new CountDownLatch(1); // 1 count down for headers + + DebeziumEngine.Builder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder( + KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class)); + DebeziumEngine embeddedEngine = builder + .using(props) + .using(new TestEngineConnectorCallback()) + .notifying((records, committer) -> { + for (EmbeddedEngineChangeEvent r : records) { + committer.markProcessed(r); + assertThat(r.headers().size()).isEqualTo(1); + assertThat( + ((EmbeddedEngineHeader) r.headers().get(0)).getValue()).isEqualTo("{\"schema\":{\"type\":\"int32\",\"optional\":false},\"payload\":2}"); + } + committer.markBatchFinished(); + }).build(); + + engineExecSrv.submit(() -> { + LoggingContext.forConnector(getClass().getSimpleName(), "", "engine"); + embeddedEngine.run(); + }); + + recordsLatch.await(AbstractConnectorTest.waitTimeForEngine(), TimeUnit.SECONDS); + + embeddedEngine.close(); + } + @Test public void testCompletionCallbackCalledUponSuccess() throws Exception { final Properties props = new Properties(); diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/async/FixedValueHeader.java b/debezium-embedded/src/test/java/io/debezium/embedded/async/FixedValueHeader.java new file mode 100644 index 000000000..17955ea62 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/embedded/async/FixedValueHeader.java @@ -0,0 +1,49 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded.async; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.Transformation; + +public class FixedValueHeader> implements Transformation { + + @Override + public R apply(R record) { + Headers headers = new ConnectHeaders(); + headers.add("fixed-key", 2, Schema.INT32_SCHEMA); + headers.forEach(h -> record.headers().add(h)); + + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + record.value(), + record.timestamp(), + headers); + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void configure(Map configs) { + } + + @Override + public void close() { + } + +}