DBZ-7512 Support arbitrary payloads with outbox event router on

debezium server

1. Support for string and binary serialization formats on debezium api.
2. Allow configuring separate key and value formats on embedded engine.

This change fixes the following issue using outbox event router on
embedded engine:

Outbox event router supports arbitrary payload formats with
BinaryDataConverter as the value.converter which passes payload
transparently. However this is  currently not supported with the
embedded engine which handles message conversion using value.format to
specify the format.

In addition, when we want to pass payload transparently, it makes
sense to also pass aggregateid i.e. the event key transparently. The
default outbox table configuration specifies aggregateid as a
varchar which is also not supported by embedded engine.
This commit is contained in:
akula 2024-02-15 01:30:46 +05:30 committed by Jiri Pechanec
parent 6564579423
commit cd4c6958bd
8 changed files with 158 additions and 13 deletions

View File

@ -587,3 +587,4 @@ leoloel
Clifford Cheefoon
Fr0z3Nn
Xianming Zhou
Akula

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine.format;
/**
* A {@link SerializationFormat} defining the binary format serialized as byte[].
*/
public class Binary implements SerializationFormat<Object> {
}

View File

@ -0,0 +1,12 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.engine.format;
/**
* A {@link SerializationFormat} defining the string format serialized as String
*/
public class SimpleString implements SerializationFormat<String> {
}

View File

@ -41,8 +41,10 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.DebeziumEngine.CompletionCallback;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Binary;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.SimpleString;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenKafkaVersion;
@ -210,6 +212,48 @@ public void shouldSerializeToCloudEvents() throws Exception {
}
}
@Test
public void shouldSerializeArbitraryPayloadFromOutbox() throws Exception {
TestHelper.execute(
"CREATE TABLE engine.outbox (id INT PRIMARY KEY, aggregateid TEXT, aggregatetype TEXT, payload BYTEA);",
"INSERT INTO engine.outbox VALUES(1, 'key1', 'event', 'value1'::BYTEA);");
final Properties props = new Properties();
props.putAll(TestHelper.defaultConfig().build().asMap());
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("converter.schemas.enable", "false");
props.setProperty("table.include.list", "engine.outbox");
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("transforms", "outbox");
props.setProperty("transforms.outbox.type", "io.debezium.transforms.outbox.EventRouter");
CountDownLatch allLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(1);
try (DebeziumEngine<ChangeEvent<String, Object>> engine = DebeziumEngine.create(SimpleString.class, Binary.class).using(props)
.notifying((records, committer) -> {
for (ChangeEvent<String, Object> r : records) {
assertThat(r.key()).isEqualTo("key1");
assertThat(r.value()).isEqualTo("value1".getBytes());
allLatch.countDown();
committer.markProcessed(r);
}
committer.markBatchFinished();
}).using(this.getClass().getClassLoader()).build()) {
executor.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "debezium-engine", "engine");
engine.run();
});
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
}
}
private static final AtomicInteger offsetStoreSetCalls = new AtomicInteger();
public static class TestOffsetStore extends FileOffsetBackingStore {

View File

@ -24,12 +24,14 @@
import io.debezium.config.Configuration;
import io.debezium.engine.Header;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Binary;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SerializationFormat;
import io.debezium.engine.format.SimpleString;
/**
* A builder which creates converter functions for requested format.
@ -111,14 +113,15 @@ public Function<SourceRecord, R> toFormat(HeaderConverter headerConverter) {
.collect(Collectors.toList());
}
}
return shouldConvertKeyAndValueToString()
? (R) new EmbeddedEngineChangeEvent<>(
key != null ? new String(key, StandardCharsets.UTF_8) : null,
value != null ? new String(value, StandardCharsets.UTF_8) : null,
(List) headers,
record)
: (R) new EmbeddedEngineChangeEvent<>(key, value, (List) headers, record);
Object convertedKey = key;
Object convertedValue = value;
if (key != null && shouldConvertKeyToString()) {
convertedKey = new String(key, StandardCharsets.UTF_8);
}
if (value != null && shouldConvertValueToString()) {
convertedValue = new String(value, StandardCharsets.UTF_8);
}
return (R) new EmbeddedEngineChangeEvent<>(convertedKey, convertedValue, (List) headers, record);
};
}
@ -133,9 +136,12 @@ private static boolean isFormat(Class<? extends SerializationFormat<?>> format1,
return format1 == format2;
}
private boolean shouldConvertKeyAndValueToString() {
return isFormat(formatKey, Json.class) && isFormat(formatValue, Json.class)
|| isFormat(formatValue, CloudEvents.class);
private boolean shouldConvertKeyToString() {
return isFormat(formatKey, Json.class) || isFormat(formatKey, SimpleString.class);
}
private boolean shouldConvertValueToString() {
return isFormat(formatValue, Json.class) || isFormat(formatValue, SimpleString.class) || isFormat(formatValue, CloudEvents.class);
}
private boolean shouldConvertHeadersToString() {
@ -206,6 +212,12 @@ else if (isFormat(format, Avro.class)) {
else if (isFormat(format, Protobuf.class)) {
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.confluent.connect.protobuf.ProtobufConverter").build();
}
else if (isFormat(format, Binary.class)) {
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "io.debezium.converters.BinaryDataConverter").build();
}
else if (isFormat(format, SimpleString.class)) {
converterConfig = converterConfig.edit().withDefault(FIELD_CLASS, "org.apache.kafka.connect.storage.StringConverter").build();
}
else {
throw new DebeziumException("Converter '" + format.getSimpleName() + "' is not supported");
}

View File

@ -52,6 +52,7 @@
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.SimpleString;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
@ -651,6 +652,68 @@ public void shouldRunDebeziumEngineWithJson() throws Exception {
stopConnector();
}
@Test
public void shouldRunDebeziumEngineWithString() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
props.setProperty("converter.schemas.enable", "false");
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(SimpleString.class, SimpleString.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
int groupCount = records.size() / NUMBER_OF_LINES;
for (ChangeEvent<String, String> r : records) {
assertThat(r.key()).isNull();
// unlike Json, SimpleString does not wrap value in quotes
assertThat(r.value()).startsWith("Generated line number ");
committer.markProcessed(r);
}
committer.markBatchFinished();
firstLatch.countDown();
for (int i = 0; i < groupCount; i++) {
allLatch.countDown();
}
})
.using(this.getClass().getClassLoader())
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
firstLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(firstLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
// Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10);
}
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
// Stop the connector ...
stopConnector();
}
@Test
@FixFor("DBZ-5926")
public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception {

View File

@ -379,7 +379,7 @@ By default the output is in JSON format but an arbitrary implementation of Kafka
|[[debezium-format-key]]<<debezium-format-key, `debezium.format.key`>>
|`json`
|The name of the output format for key, one of `json`/`jsonbytearray`/`avro`/`protobuf`.
|The name of the output format for key, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`simplestring`/`binary`.
|[[debezium-format-key-props]]<<debezium-format-key-props, `debezium.format.key.*`>>
|
@ -387,7 +387,7 @@ By default the output is in JSON format but an arbitrary implementation of Kafka
|[[debezium-format-value]]<<debezium-format-value, `debezium.format.value`>>
|`json`
|The name of the output format for value, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`cloudevents`.
|The name of the output format for value, one of `json`/`jsonbytearray`/`avro`/`protobuf`/`cloudevents`/`simplestring`/`binary`.
|[[debezium-format-value-props]]<<debezium-format-value-props, `debezium.format.value.*`>>
|

View File

@ -256,3 +256,4 @@ CliffordCheefoon,Clifford Cheefoon
Fr0z3Nn,Ivanov Sergey Vasilevich
Bue-Von-Hun,Bue Von Hun
nrkeli,Emil Lindström
akulapid,Akula