diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java index 4ad4033b3..3a228f9e5 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/ExtractNewDocumentState.java @@ -9,7 +9,6 @@ import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,6 +38,7 @@ import io.debezium.schema.FieldNameSelector; import io.debezium.schema.SchemaNameAdjuster; import io.debezium.transforms.AbstractExtractNewRecordState; +import io.debezium.util.ConnectRecordUtil; /** * Debezium Mongo Connector generates the CDC records in String format. Sink connectors usually are not able to parse @@ -130,8 +130,8 @@ public static ArrayEncoding parse(String value, String defaultValue) { .withDescription("Delimiter to concat between field names from the input record when generating field names for the" + "output record."); - private final ExtractField keyExtractor = new ExtractField.Key<>(); - private final Flatten recordFlattener = new Flatten.Value<>(); + private ExtractField keyExtractor; + private Flatten recordFlattener; private MongoDataConverter converter; private boolean flattenStruct; private String delimiter; @@ -153,13 +153,8 @@ public void configure(final Map configs) { flattenStruct = config.getBoolean(FLATTEN_STRUCT); delimiter = config.getString(DELIMITER); - Map delegateConfig = new HashMap<>(); - delegateConfig.put("field", "id"); - keyExtractor.configure(delegateConfig); - - delegateConfig = new HashMap<>(); - delegateConfig.put("delimiter", delimiter); - recordFlattener.configure(delegateConfig); + keyExtractor = ConnectRecordUtil.extractKeyDelegate("id"); + recordFlattener = ConnectRecordUtil.flattenValueDelegate(delimiter); } @Override diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java index 976d6a249..9fee719bf 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/outbox/MongoEventRouter.java @@ -27,10 +27,10 @@ import io.debezium.config.Configuration; import io.debezium.connector.mongodb.transforms.ExtractNewDocumentState; import io.debezium.connector.mongodb.transforms.MongoDataConverter; -import io.debezium.data.Envelope; import io.debezium.time.Timestamp; import io.debezium.transforms.outbox.EventRouterConfigDefinition; import io.debezium.transforms.outbox.EventRouterDelegate; +import io.debezium.util.ConnectRecordUtil; /** * Debezium MongoDB Outbox Event Router SMT @@ -54,7 +54,7 @@ public class MongoEventRouter> implements Transformat private String fieldPayload; private boolean expandPayload; - private final ExtractField afterExtractor = new ExtractField.Value<>(); + private ExtractField afterExtractor; private final EventRouterDelegate eventRouterDelegate = new EventRouterDelegate<>(); @Override @@ -87,10 +87,7 @@ public void configure(Map configMap) { expandPayload = config.getBoolean(MongoEventRouterConfigDefinition.EXPAND_JSON_PAYLOAD); fieldPayload = config.getString(MongoEventRouterConfigDefinition.FIELD_PAYLOAD); - final Map afterExtractorConfig = new HashMap<>(); - afterExtractorConfig.put("field", Envelope.FieldName.AFTER); - - afterExtractor.configure(afterExtractorConfig); + afterExtractor = ConnectRecordUtil.extractAfterDelegate(); // Convert configuration fields from MongoDB Outbox Event Router to SQL Outbox Event Router's Map convertedConfigMap = convertConfigMap(configMap); diff --git a/debezium-core/src/main/java/io/debezium/transforms/extractnewstate/AbstractExtractRecordStrategy.java b/debezium-core/src/main/java/io/debezium/transforms/extractnewstate/AbstractExtractRecordStrategy.java index 49e89600f..085708e4b 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/extractnewstate/AbstractExtractRecordStrategy.java +++ b/debezium-core/src/main/java/io/debezium/transforms/extractnewstate/AbstractExtractRecordStrategy.java @@ -5,19 +5,16 @@ */ package io.debezium.transforms.extractnewstate; -import static io.debezium.data.Envelope.FieldName.AFTER; -import static io.debezium.data.Envelope.FieldName.BEFORE; import static io.debezium.transforms.ExtractNewRecordStateConfigDefinition.DELETED_FIELD; -import java.util.HashMap; -import java.util.Map; - import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.InsertField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.util.ConnectRecordUtil; + /** * An abstract implementation of {@link ExtractRecordStrategy}. * @@ -26,40 +23,23 @@ public abstract class AbstractExtractRecordStrategy> implements ExtractRecordStrategy { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractExtractRecordStrategy.class); - private static final String UPDATE_DESCRIPTION = "updateDescription"; - protected final ExtractField afterDelegate = new ExtractField.Value<>(); - protected final ExtractField beforeDelegate = new ExtractField.Value<>(); - protected final InsertField removedDelegate = new InsertField.Value<>(); - protected final InsertField updatedDelegate = new InsertField.Value<>(); + protected ExtractField afterDelegate; + protected ExtractField beforeDelegate; + protected InsertField removedDelegate; + protected InsertField updatedDelegate; // for mongodb - protected final ExtractField updateDescriptionDelegate = new ExtractField.Value<>(); + protected ExtractField updateDescriptionDelegate; public AbstractExtractRecordStrategy() { init(); } private void init() { - Map delegateConfig = new HashMap<>(); - delegateConfig.put("field", BEFORE); - beforeDelegate.configure(delegateConfig); - - delegateConfig = new HashMap<>(); - delegateConfig.put("field", AFTER); - afterDelegate.configure(delegateConfig); - - delegateConfig = new HashMap<>(); - delegateConfig.put("static.field", DELETED_FIELD); - delegateConfig.put("static.value", "true"); - removedDelegate.configure(delegateConfig); - - delegateConfig = new HashMap<>(); - delegateConfig.put("static.field", DELETED_FIELD); - delegateConfig.put("static.value", "false"); - updatedDelegate.configure(delegateConfig); - - delegateConfig = new HashMap<>(); - delegateConfig.put("field", UPDATE_DESCRIPTION); - updateDescriptionDelegate.configure(delegateConfig); + afterDelegate = ConnectRecordUtil.extractAfterDelegate(); + beforeDelegate = ConnectRecordUtil.extractBeforeDelegate(); + removedDelegate = ConnectRecordUtil.insertStaticValueDelegate(DELETED_FIELD, "true"); + updatedDelegate = ConnectRecordUtil.insertStaticValueDelegate(DELETED_FIELD, "false"); + updateDescriptionDelegate = ConnectRecordUtil.extractUpdateDescriptionDelegate(); } @Override diff --git a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java index b0b26fa90..04cd64361 100644 --- a/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java +++ b/debezium-core/src/main/java/io/debezium/transforms/outbox/EventRouterDelegate.java @@ -46,6 +46,7 @@ import io.debezium.transforms.outbox.EventRouterConfigDefinition.JsonPayloadNullFieldBehavior; import io.debezium.transforms.tracing.ActivateTracingSpan; import io.debezium.util.BoundedConcurrentHashMap; +import io.debezium.util.ConnectRecordUtil; /** * A delegate class having common logic between Outbox Event Routers for SQL DBs and MongoDB @@ -63,7 +64,7 @@ public interface RecordConverter { private static final String ENVELOPE_PAYLOAD = "payload"; - private final ExtractField afterExtractor = new ExtractField.Value<>(); + private ExtractField afterExtractor; private final RegexRouter regexRouter = new RegexRouter<>(); private InvalidOperationBehavior invalidOperationBehavior; private final ActivateTracingSpan tracingSmt = new ActivateTracingSpan<>(); @@ -368,10 +369,7 @@ public void configure(Map configMap) { regexRouter.configure(regexRouterConfig); - final Map afterExtractorConfig = new HashMap<>(); - afterExtractorConfig.put("field", Envelope.FieldName.AFTER); - - afterExtractor.configure(afterExtractorConfig); + afterExtractor = ConnectRecordUtil.extractAfterDelegate(); additionalFields = parseAdditionalFieldsConfig(config); onlyHeadersInOutputMessage = additionalFields.stream().noneMatch(field -> field.getPlacement() == AdditionalFieldPlacement.ENVELOPE); diff --git a/debezium-core/src/main/java/io/debezium/util/ColumnUtils.java b/debezium-core/src/main/java/io/debezium/util/ColumnUtils.java index 1a3a71c31..33bbfb253 100644 --- a/debezium-core/src/main/java/io/debezium/util/ColumnUtils.java +++ b/debezium-core/src/main/java/io/debezium/util/ColumnUtils.java @@ -15,7 +15,7 @@ import io.debezium.relational.Table; /** - * Utility class for mapping columns to various data structures from from {@link Table} and {@link ResultSet}. + * Utility class for mapping columns to various data structures from {@link Table} and {@link ResultSet}. */ public class ColumnUtils { diff --git a/debezium-core/src/main/java/io/debezium/util/ConnectRecordUtil.java b/debezium-core/src/main/java/io/debezium/util/ConnectRecordUtil.java new file mode 100644 index 000000000..9024e4324 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/util/ConnectRecordUtil.java @@ -0,0 +1,70 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.util; + +import static io.debezium.data.Envelope.FieldName.AFTER; +import static io.debezium.data.Envelope.FieldName.BEFORE; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.ExtractField; +import org.apache.kafka.connect.transforms.Flatten; +import org.apache.kafka.connect.transforms.InsertField; + +/** + * A set of utilities for more easily creating various kinds of transformations. + */ +public class ConnectRecordUtil { + + private static final String UPDATE_DESCRIPTION = "updateDescription"; + + public static > ExtractField extractAfterDelegate() { + return extractValueDelegate(AFTER); + } + + public static > ExtractField extractBeforeDelegate() { + return extractValueDelegate(BEFORE); + } + + public static > ExtractField extractUpdateDescriptionDelegate() { + return extractValueDelegate(UPDATE_DESCRIPTION); + } + + public static > ExtractField extractValueDelegate(String field) { + ExtractField extractField = new ExtractField.Value<>(); + Map delegateConfig = new HashMap<>(); + delegateConfig.put("field", field); + extractField.configure(delegateConfig); + return extractField; + } + + public static > ExtractField extractKeyDelegate(String field) { + ExtractField extractField = new ExtractField.Key<>(); + Map delegateConfig = new HashMap<>(); + delegateConfig.put("field", field); + extractField.configure(delegateConfig); + return extractField; + } + + public static > InsertField insertStaticValueDelegate(String field, String value) { + InsertField insertDelegate = new InsertField.Value<>(); + Map delegateConfig = new HashMap<>(); + delegateConfig.put("static.field", field); + delegateConfig.put("static.value", value); + insertDelegate.configure(delegateConfig); + return insertDelegate; + } + + public static > Flatten flattenValueDelegate(String delimiter) { + Flatten recordFlattener = new Flatten.Value<>(); + Map delegateConfig = new HashMap<>(); + delegateConfig.put("delimiter", delimiter); + recordFlattener.configure(delegateConfig); + return recordFlattener; + } +}