From fcef819dc87d920c4ebcb803300eb5e99c9872a4 Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Wed, 28 Nov 2018 18:12:34 +0100 Subject: [PATCH] DBZ-563 MongoDB Unwrapper supports "delete.handling.mode" --- .../transforms/UnwrapFromMongoDbEnvelope.java | 45 ++++++++++++++++--- .../UnwrapFromMongoDbEnvelopeTest.java | 6 +++ .../UnwrapFromMongoDbEnvelopeTestIT.java | 2 + 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java index 3ee5a127f..0533966e4 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java @@ -10,6 +10,7 @@ import java.util.Map.Entry; import java.util.Set; +import io.debezium.transforms.UnwrapFromEnvelope.DeleteHandling; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; @@ -19,6 +20,7 @@ import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.Transformation; +import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonNull; import org.bson.BsonValue; @@ -39,15 +41,13 @@ */ public class UnwrapFromMongoDbEnvelope> implements Transformation { - final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation"; - - public static enum ArrayEncoding implements EnumeratedValue { + public enum ArrayEncoding implements EnumeratedValue { ARRAY("array"), DOCUMENT("document"); private final String value; - private ArrayEncoding(String value) { + ArrayEncoding(String value) { this.value = value; } @@ -84,6 +84,11 @@ public static ArrayEncoding parse(String value, String defaultValue) { return mode; } } + + final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation"; + + private static final String DELETED_FIELD = "__deleted"; + private static final Logger LOGGER = LoggerFactory.getLogger(UnwrapFromMongoDbEnvelope.class); private static final Field ARRAY_ENCODING = Field.create("array.encoding") @@ -122,6 +127,16 @@ public static ArrayEncoding parse(String value, String defaultValue) { .withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." + "Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY +"'"); + private static final Field HANDLE_DELETES = Field.create("delete.handling.mode") + .withDisplayName("Handle delete records") + .withEnum(DeleteHandling.class, DeleteHandling.DROP) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("How to handle delete records. Options are: " + + "none - records are passed," + + "drop - records are removed," + + "rewrite - __deleted field is added to records."); + private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones") .withDisplayName("Drop tombstones") .withType(ConfigDef.Type.BOOLEAN) @@ -144,6 +159,7 @@ public static ArrayEncoding parse(String value, String defaultValue) { private String delimiter; private boolean dropTombstones; + private DeleteHandling handleDeletes; private R getTombstoneRecord(R r) { SchemaBuilder keySchemaBuilder = SchemaBuilder.struct(); @@ -246,6 +262,16 @@ public R apply(R r) { // delete else { valueDocument = new BsonDocument(); + switch (handleDeletes) { + case DROP: + LOGGER.trace("Delete message {} requested to be dropped", r.key()); + return null; + case REWRITE: + LOGGER.trace("Delete message {} requested to be rewritten", r.key()); + valueDocument.append(DELETED_FIELD, new BsonBoolean(true)); + case NONE: + break; + } } // insert } @@ -255,11 +281,15 @@ public R apply(R r) { valueDocument.append("id", keyDocument.get("id")); } + if (!valueDocument.containsKey(DELETED_FIELD) && handleDeletes.equals(DeleteHandling.REWRITE)) { + valueDocument.append(DELETED_FIELD, new BsonBoolean(false)); + } + Set> valuePairs = valueDocument.entrySet(); Set> keyPairs = keyDocument.entrySet(); for (Entry valuePairsforSchema : valuePairs) { - if (valuePairsforSchema.getKey().toString().equalsIgnoreCase("$set")) { + if (valuePairsforSchema.getKey().equalsIgnoreCase("$set")) { BsonDocument val1 = BsonDocument.parse(valuePairsforSchema.getValue().toString()); Set> keyValuesforSetSchema = val1.entrySet(); for (Entry keyValuesforSetSchemaEntry : keyValuesforSetSchema) { @@ -280,7 +310,7 @@ public R apply(R r) { Struct finalKeyStruct = new Struct(finalKeySchema); for (Entry valuePairsforStruct : valuePairs) { - if (valuePairsforStruct.getKey().toString().equalsIgnoreCase("$set")) { + if (valuePairsforStruct.getKey().equalsIgnoreCase("$set")) { BsonDocument val1 = BsonDocument.parse(valuePairsforStruct.getValue().toString()); Set> keyvalueforSetStruct = val1.entrySet(); for (Entry keyvalueforSetStructEntry : keyvalueforSetStruct) { @@ -326,7 +356,7 @@ public void close() { @Override public void configure(final Map map) { final Configuration config = Configuration.from(map); - final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, DROP_TOMBSTONES); + final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, HANDLE_DELETES, DROP_TOMBSTONES); if (!config.validateAndRecord(configFields, LOGGER::error)) { throw new ConnectException("Unable to validate config."); @@ -340,6 +370,7 @@ public void configure(final Map map) { delimiter = config.getString(DELIMITER); dropTombstones = config.getBoolean(DROP_TOMBSTONES); + handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES)); final Map afterExtractorConfig = new HashMap<>(); afterExtractorConfig.put("field", "after"); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java index d4a90bd9e..98f261d44 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java @@ -52,6 +52,7 @@ public class UnwrapFromMongoDbEnvelopeTest { private static final String FLATTEN_STRUCT = "flatten.struct"; private static final String DELIMITER = "flatten.struct.delimiter"; private static final String OPERATION_HEADER = "operation.header"; + private static final String HANDLE_DELETES = "delete.handling.mode"; private Filters filters; private SourceInfo source; @@ -323,6 +324,10 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Interrup SourceRecord record = produced.get(0); + final Map props = new HashMap<>(); + props.put(HANDLE_DELETES, "none"); + transformation.configure(props); + // when SourceRecord transformed = transformation.apply(record); @@ -358,6 +363,7 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException { final Map props = new HashMap<>(); props.put(OPERATION_HEADER, "true"); + props.put(HANDLE_DELETES, "none"); transformation.configure(props); // when diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java index ccb9aeb94..ffa38249b 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java @@ -50,6 +50,7 @@ public class UnwrapFromMongoDbEnvelopeTestIT extends AbstractConnectorTest { private static final String TOPIC_NAME = "mongo.transform.source"; private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones"; + private static final String HANDLE_DELETES = "delete.handling.mode"; private Configuration config; private MongoDbTaskContext context; @@ -143,6 +144,7 @@ public void shouldTransformEvents() throws InterruptedException, IOException { final Map transformationConfig = new HashMap<>(); transformationConfig.put(CONFIG_DROP_TOMBSTONES, "false"); + transformationConfig.put(HANDLE_DELETES, "none"); transformation.configure(transformationConfig); // Test insert