DBZ-8103 Add SMT to decode binary content of a logical decoding message

This commit is contained in:
Roman Kudryashov 2024-07-18 14:43:23 +03:00 committed by Jiri Pechanec
parent 6b4dc064ae
commit 7cc70da4b2
4 changed files with 215 additions and 1 deletions

View File

@ -60,6 +60,11 @@
<artifactId>connect-api</artifactId> <artifactId>connect-api</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>

View File

@ -0,0 +1,198 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.transforms;
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY;
import static io.debezium.connector.postgresql.LogicalDecodingMessageMonitor.DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.Module;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.schema.FieldNameSelector;
import io.debezium.transforms.ConnectRecordUtil;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.outbox.JsonSchemaData;
import io.debezium.util.BoundedConcurrentHashMap;
/**
* The transform converts binary content of a logical decoding message to a structured form.
* One of the possible usages is to apply the transform before {@link io.debezium.transforms.outbox.EventRouter} so that the transform
* will produce a record suitable for the Outbox SMT.
*
* @author Roman Kudryashov
*/
public class DecodeLogicalDecodingMessageContent<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLogicalDecodingMessageContent.class);
private static final String POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME = "io.debezium.connector.postgresql.MessageValue";
private BoundedConcurrentHashMap<Schema, Schema> logicalDecodingMessageContentSchemaCache;
private ObjectMapper objectMapper;
private JsonSchemaData jsonSchemaData;
@Override
public ConfigDef config() {
final ConfigDef config = new ConfigDef();
return config;
}
@Override
public void configure(final Map<String, ?> configs) {
final Configuration config = Configuration.from(configs);
objectMapper = new ObjectMapper();
CommonConnectorConfig.FieldNameAdjustmentMode fieldNameAdjustmentMode = CommonConnectorConfig.FieldNameAdjustmentMode.parse(
config.getString(CommonConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE));
jsonSchemaData = new JsonSchemaData(EventRouterConfigDefinition.JsonPayloadNullFieldBehavior.IGNORE,
FieldNameSelector.defaultNonRelationalSelector(fieldNameAdjustmentMode.createAdjuster()));
logicalDecodingMessageContentSchemaCache = new BoundedConcurrentHashMap<>(10000, 10, BoundedConcurrentHashMap.Eviction.LRU);
}
@Override
public R apply(final R record) {
// ignore all messages that are not logical decoding messages
if (!Objects.equals(record.valueSchema().name(), POSTGRES_LOGICAL_DECODING_MESSAGE_SCHEMA_NAME)) {
LOGGER.debug("Ignore not a logical decoding message. Message key: \"{}\"", record.key());
return record;
}
Struct originalValue = requireStruct(record.value(), "Retrieve a record value");
Struct logicalDecodingMessageContent = getLogicalDecodingMessageContent(originalValue);
R recordWithoutMessageField = removeLogicalDecodingMessageContentField(record);
final Schema updatedValueSchema = getUpdatedValueSchema(logicalDecodingMessageContent.schema(), recordWithoutMessageField.valueSchema());
final Struct updatedValue = getUpdatedValue(updatedValueSchema, originalValue, logicalDecodingMessageContent);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
null,
// clear prefix of a logical decoding message
null,
updatedValueSchema,
updatedValue,
record.timestamp(),
record.headers());
}
private Struct getLogicalDecodingMessageContent(Struct valueStruct) {
Struct logicalDecodingMessageStruct = requireStruct(valueStruct.get(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY),
"Retrieve content of a logical decoding message");
if (logicalDecodingMessageStruct.schema().field(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY).schema().type() != Schema.Type.BYTES) {
throw new DataException("The content of a logical decoding message is non-binary");
}
byte[] logicalDecodingMessageContentBytes = logicalDecodingMessageStruct.getBytes(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY);
return convertLogicalDecodingMessageContentBytesToStruct(logicalDecodingMessageContentBytes);
}
private Struct convertLogicalDecodingMessageContentBytesToStruct(byte[] logicalDecodingMessageContent) {
final String logicalDecodingMessageContentString = new String(logicalDecodingMessageContent);
try {
// parse and get Jackson JsonNode
final JsonNode logicalDecodingMessageContentJson = parseLogicalDecodingMessageContentJsonString(logicalDecodingMessageContentString);
// get schema of a logical decoding message content
Schema logicalDecodingMessageContentSchema = jsonSchemaData.toConnectSchema(null, logicalDecodingMessageContentJson);
// get Struct of a logical decoding message content
return (Struct) jsonSchemaData.toConnectData(logicalDecodingMessageContentJson, logicalDecodingMessageContentSchema);
}
catch (Exception e) {
LOGGER.warn("Conversion of logical decoding message content failed", e);
throw new DataException("Conversion of logical decoding message content failed");
}
}
private JsonNode parseLogicalDecodingMessageContentJsonString(String logicalDecodingMessageContentJsonString) throws Exception {
if (logicalDecodingMessageContentJsonString.startsWith("{") || logicalDecodingMessageContentJsonString.startsWith("[")) {
return objectMapper.readTree(logicalDecodingMessageContentJsonString);
}
throw new Exception("Unable to parse logical decoding message content JSON string starting with '" + logicalDecodingMessageContentJsonString.charAt(0) + "'");
}
private R removeLogicalDecodingMessageContentField(R record) {
final ReplaceField<R> dropFieldDelegate = ConnectRecordUtil.dropFieldFromValueDelegate(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY);
return dropFieldDelegate.apply(record);
}
private Schema getUpdatedValueSchema(Schema logicalDecodingMessageContentSchema, Schema debeziumEventSchema) {
Schema valueSchema = logicalDecodingMessageContentSchemaCache.get(logicalDecodingMessageContentSchema);
if (valueSchema == null) {
valueSchema = getSchemaBuilder(logicalDecodingMessageContentSchema, debeziumEventSchema).build();
logicalDecodingMessageContentSchemaCache.put(logicalDecodingMessageContentSchema, valueSchema);
}
return valueSchema;
}
private SchemaBuilder getSchemaBuilder(Schema logicalDecodingMessageContentSchema, Schema debeziumEventSchema) {
// a schema name ending with such a suffix makes the record processable by Outbox SMT
String schemaName = debeziumEventSchema.name() + Envelope.SCHEMA_NAME_SUFFIX;
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(schemaName);
for (Field originalSchemaField : debeziumEventSchema.fields()) {
schemaBuilder.field(originalSchemaField.name(), originalSchemaField.schema());
}
schemaBuilder.field(Envelope.FieldName.AFTER, logicalDecodingMessageContentSchema);
return schemaBuilder;
}
private Struct getUpdatedValue(Schema updatedValueSchema, Struct originalValue, Struct logicalDecodingMessageContent) {
final Struct updatedValue = new Struct(updatedValueSchema);
for (Field field : updatedValueSchema.fields()) {
Object fieldValue;
switch (field.name()) {
case Envelope.FieldName.AFTER:
fieldValue = logicalDecodingMessageContent;
break;
case Envelope.FieldName.OPERATION:
// replace the original operation so that a record will look as INSERT event
fieldValue = Envelope.Operation.CREATE.code();
break;
default:
fieldValue = originalValue.get(field);
break;
}
updatedValue.put(field, fieldValue);
}
return updatedValue;
}
@Override
public void close() {
}
@Override
public String version() {
return Module.version();
}
}

View File

@ -15,6 +15,7 @@
import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.Flatten;
import org.apache.kafka.connect.transforms.InsertField; import org.apache.kafka.connect.transforms.InsertField;
import org.apache.kafka.connect.transforms.ReplaceField;
/** /**
* A set of utilities for more easily creating various kinds of transformations. * A set of utilities for more easily creating various kinds of transformations.
@ -60,6 +61,14 @@ public static <R extends ConnectRecord<R>> InsertField<R> insertStaticValueDeleg
return insertDelegate; return insertDelegate;
} }
public static <R extends ConnectRecord<R>> ReplaceField<R> dropFieldFromValueDelegate(String field) {
ReplaceField<R> dropFieldDelegate = new ReplaceField.Value<>();
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("exclude", field);
dropFieldDelegate.configure(delegateConfig);
return dropFieldDelegate;
}
public static <R extends ConnectRecord<R>> Flatten<R> flattenValueDelegate(String delimiter) { public static <R extends ConnectRecord<R>> Flatten<R> flattenValueDelegate(String delimiter) {
Flatten<R> recordFlattener = new Flatten.Value<>(); Flatten<R> recordFlattener = new Flatten.Value<>();
Map<String, String> delegateConfig = new HashMap<>(); Map<String, String> delegateConfig = new HashMap<>();

View File

@ -30,6 +30,7 @@
import io.debezium.schema.FieldNameSelector.FieldNamer; import io.debezium.schema.FieldNameSelector.FieldNamer;
import io.debezium.schema.SchemaNameAdjuster; import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.transforms.outbox.EventRouterConfigDefinition.JsonPayloadNullFieldBehavior; import io.debezium.transforms.outbox.EventRouterConfigDefinition.JsonPayloadNullFieldBehavior;
import io.debezium.util.Strings;
public class JsonSchemaData { public class JsonSchemaData {
private final JsonPayloadNullFieldBehavior jsonPayloadNullFieldBehavior; private final JsonPayloadNullFieldBehavior jsonPayloadNullFieldBehavior;
@ -69,7 +70,8 @@ public Schema toConnectSchema(String key, JsonNode node) {
final SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(key).optional(); final SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(key).optional();
node.fields().forEachRemaining(entry -> { node.fields().forEachRemaining(entry -> {
final String fieldName = fieldNamer.fieldNameFor(entry.getKey()); final String fieldName = fieldNamer.fieldNameFor(entry.getKey());
final Schema fieldSchema = toConnectSchema(key + "." + fieldName, entry.getValue()); final String fieldKey = (Strings.isNullOrBlank(key) ? "" : key + ".") + fieldName;
final Schema fieldSchema = toConnectSchema(fieldKey, entry.getValue());
if (fieldSchema != null && !hasField(schemaBuilder, fieldName)) { if (fieldSchema != null && !hasField(schemaBuilder, fieldName)) {
schemaBuilder.field(fieldName, fieldSchema); schemaBuilder.field(fieldName, fieldSchema);
} }