DBZ-7066 Extract common creating transformation methods to ConnectRecordUtil

This commit is contained in:
harveyyue 2023-10-30 15:02:11 +08:00 committed by Jiri Pechanec
parent 4c3b5540ec
commit 223b6e4335
6 changed files with 94 additions and 54 deletions

View File

@ -9,7 +9,6 @@
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -39,6 +38,7 @@
import io.debezium.schema.FieldNameSelector;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.transforms.AbstractExtractNewRecordState;
import io.debezium.util.ConnectRecordUtil;
/**
* Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse
@ -130,8 +130,8 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Delimiter to concat between field names from the input record when generating field names for the"
+ "output record.");
private final ExtractField<R> keyExtractor = new ExtractField.Key<>();
private final Flatten<R> recordFlattener = new Flatten.Value<>();
private ExtractField<R> keyExtractor;
private Flatten<R> recordFlattener;
private MongoDataConverter converter;
private boolean flattenStruct;
private String delimiter;
@ -153,13 +153,8 @@ public void configure(final Map<String, ?> configs) {
flattenStruct = config.getBoolean(FLATTEN_STRUCT);
delimiter = config.getString(DELIMITER);
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", "id");
keyExtractor.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("delimiter", delimiter);
recordFlattener.configure(delegateConfig);
keyExtractor = ConnectRecordUtil.extractKeyDelegate("id");
recordFlattener = ConnectRecordUtil.flattenValueDelegate(delimiter);
}
@Override

View File

@ -27,10 +27,10 @@
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState;
import io.debezium.connector.mongodb.transforms.MongoDataConverter;
import io.debezium.data.Envelope;
import io.debezium.time.Timestamp;
import io.debezium.transforms.outbox.EventRouterConfigDefinition;
import io.debezium.transforms.outbox.EventRouterDelegate;
import io.debezium.util.ConnectRecordUtil;
/**
* Debezium MongoDB Outbox Event Router SMT
@ -54,7 +54,7 @@ public class MongoEventRouter<R extends ConnectRecord<R>> implements Transformat
private String fieldPayload;
private boolean expandPayload;
private final ExtractField<R> afterExtractor = new ExtractField.Value<>();
private ExtractField<R> afterExtractor;
private final EventRouterDelegate<R> eventRouterDelegate = new EventRouterDelegate<>();
@Override
@ -87,10 +87,7 @@ public void configure(Map<String, ?> configMap) {
expandPayload = config.getBoolean(MongoEventRouterConfigDefinition.EXPAND_JSON_PAYLOAD);
fieldPayload = config.getString(MongoEventRouterConfigDefinition.FIELD_PAYLOAD);
final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", Envelope.FieldName.AFTER);
afterExtractor.configure(afterExtractorConfig);
afterExtractor = ConnectRecordUtil.extractAfterDelegate();
// Convert configuration fields from MongoDB Outbox Event Router to SQL Outbox Event Router's
Map<String, ?> convertedConfigMap = convertConfigMap(configMap);

View File

@ -5,19 +5,16 @@
*/
package io.debezium.transforms.extractnewstate;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.InsertField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.util.ConnectRecordUtil;
/**
* An abstract implementation of {@link ExtractRecordStrategy}.
*
@ -26,40 +23,23 @@
public abstract class AbstractExtractRecordStrategy<R extends ConnectRecord<R>> implements ExtractRecordStrategy<R> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExtractRecordStrategy.class);
private static final String UPDATE_DESCRIPTION = "updateDescription";
protected final ExtractField<R> afterDelegate = new ExtractField.Value<>();
protected final ExtractField<R> beforeDelegate = new ExtractField.Value<>();
protected final InsertField<R> removedDelegate = new InsertField.Value<>();
protected final InsertField<R> updatedDelegate = new InsertField.Value<>();
protected ExtractField<R> afterDelegate;
protected ExtractField<R> beforeDelegate;
protected InsertField<R> removedDelegate;
protected InsertField<R> updatedDelegate;
// for mongodb
protected final ExtractField<R> updateDescriptionDelegate = new ExtractField.Value<>();
protected ExtractField<R> updateDescriptionDelegate;
public AbstractExtractRecordStrategy() {
init();
}
private void init() {
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", BEFORE);
beforeDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("field", AFTER);
afterDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "true");
removedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("static.field", DELETED_FIELD);
delegateConfig.put("static.value", "false");
updatedDelegate.configure(delegateConfig);
delegateConfig = new HashMap<>();
delegateConfig.put("field", UPDATE_DESCRIPTION);
updateDescriptionDelegate.configure(delegateConfig);
afterDelegate = ConnectRecordUtil.extractAfterDelegate();
beforeDelegate = ConnectRecordUtil.extractBeforeDelegate();
removedDelegate = ConnectRecordUtil.insertStaticValueDelegate(DELETED_FIELD, "true");
updatedDelegate = ConnectRecordUtil.insertStaticValueDelegate(DELETED_FIELD, "false");
updateDescriptionDelegate = ConnectRecordUtil.extractUpdateDescriptionDelegate();
}
@Override

View File

@ -46,6 +46,7 @@
import io.debezium.transforms.outbox.EventRouterConfigDefinition.JsonPayloadNullFieldBehavior;
import io.debezium.transforms.tracing.ActivateTracingSpan;
import io.debezium.util.BoundedConcurrentHashMap;
import io.debezium.util.ConnectRecordUtil;
/**
* A delegate class having common logic between Outbox Event Routers for SQL DBs and MongoDB
@ -63,7 +64,7 @@ public interface RecordConverter<R> {
private static final String ENVELOPE_PAYLOAD = "payload";
private final ExtractField<R> afterExtractor = new ExtractField.Value<>();
private ExtractField<R> afterExtractor;
private final RegexRouter<R> regexRouter = new RegexRouter<>();
private InvalidOperationBehavior invalidOperationBehavior;
private final ActivateTracingSpan<R> tracingSmt = new ActivateTracingSpan<>();
@ -368,10 +369,7 @@ public void configure(Map<String, ?> configMap) {
regexRouter.configure(regexRouterConfig);
final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", Envelope.FieldName.AFTER);
afterExtractor.configure(afterExtractorConfig);
afterExtractor = ConnectRecordUtil.extractAfterDelegate();
additionalFields = parseAdditionalFieldsConfig(config);
onlyHeadersInOutputMessage = additionalFields.stream().noneMatch(field -> field.getPlacement() == AdditionalFieldPlacement.ENVELOPE);

View File

@ -15,7 +15,7 @@
import io.debezium.relational.Table;
/**
* Utility class for mapping columns to various data structures from from {@link Table} and {@link ResultSet}.
* Utility class for mapping columns to various data structures from {@link Table} and {@link ResultSet}.
*/
public class ColumnUtils {

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.util;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Flatten;
import org.apache.kafka.connect.transforms.InsertField;
/**
* A set of utilities for more easily creating various kinds of transformations.
*/
public class ConnectRecordUtil {
private static final String UPDATE_DESCRIPTION = "updateDescription";
public static <R extends ConnectRecord<R>> ExtractField<R> extractAfterDelegate() {
return extractValueDelegate(AFTER);
}
public static <R extends ConnectRecord<R>> ExtractField<R> extractBeforeDelegate() {
return extractValueDelegate(BEFORE);
}
public static <R extends ConnectRecord<R>> ExtractField<R> extractUpdateDescriptionDelegate() {
return extractValueDelegate(UPDATE_DESCRIPTION);
}
public static <R extends ConnectRecord<R>> ExtractField<R> extractValueDelegate(String field) {
ExtractField<R> extractField = new ExtractField.Value<>();
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", field);
extractField.configure(delegateConfig);
return extractField;
}
public static <R extends ConnectRecord<R>> ExtractField<R> extractKeyDelegate(String field) {
ExtractField<R> extractField = new ExtractField.Key<>();
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("field", field);
extractField.configure(delegateConfig);
return extractField;
}
public static <R extends ConnectRecord<R>> InsertField<R> insertStaticValueDelegate(String field, String value) {
InsertField<R> insertDelegate = new InsertField.Value<>();
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("static.field", field);
delegateConfig.put("static.value", value);
insertDelegate.configure(delegateConfig);
return insertDelegate;
}
public static <R extends ConnectRecord<R>> Flatten<R> flattenValueDelegate(String delimiter) {
Flatten<R> recordFlattener = new Flatten.Value<>();
Map<String, String> delegateConfig = new HashMap<>();
delegateConfig.put("delimiter", delimiter);
recordFlattener.configure(delegateConfig);
return recordFlattener;
}
}