DBZ-563 MongoDB Unwrapper supports "delete.handling.mode"

This commit is contained in:
Renato Mefi 2018-11-28 18:12:34 +01:00 committed by Gunnar Morling
parent 8c1ebc2a8f
commit fcef819dc8
3 changed files with 46 additions and 7 deletions

View File

@ -10,6 +10,7 @@
import java.util.Map.Entry;
import java.util.Set;
import io.debezium.transforms.UnwrapFromEnvelope.DeleteHandling;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
@ -19,6 +20,7 @@
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Flatten;
import org.apache.kafka.connect.transforms.Transformation;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonValue;
@ -39,15 +41,13 @@
*/
public class UnwrapFromMongoDbEnvelope<R extends ConnectRecord<R>> implements Transformation<R> {
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
public static enum ArrayEncoding implements EnumeratedValue {
public enum ArrayEncoding implements EnumeratedValue {
ARRAY("array"),
DOCUMENT("document");
private final String value;
private ArrayEncoding(String value) {
ArrayEncoding(String value) {
this.value = value;
}
@ -84,6 +84,11 @@ public static ArrayEncoding parse(String value, String defaultValue) {
return mode;
}
}
final static String DEBEZIUM_OPERATION_HEADER_KEY = "__debezium-operation";
private static final String DELETED_FIELD = "__deleted";
private static final Logger LOGGER = LoggerFactory.getLogger(UnwrapFromMongoDbEnvelope.class);
private static final Field ARRAY_ENCODING = Field.create("array.encoding")
@ -122,6 +127,16 @@ public static ArrayEncoding parse(String value, String defaultValue) {
.withDescription("Adds the operation {@link FieldName#OPERATION operation} as a header." +
"Its key is '" + DEBEZIUM_OPERATION_HEADER_KEY +"'");
private static final Field HANDLE_DELETES = Field.create("delete.handling.mode")
.withDisplayName("Handle delete records")
.withEnum(DeleteHandling.class, DeleteHandling.DROP)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("How to handle delete records. Options are: "
+ "none - records are passed,"
+ "drop - records are removed,"
+ "rewrite - __deleted field is added to records.");
private static final Field DROP_TOMBSTONES = Field.create("drop.tombstones")
.withDisplayName("Drop tombstones")
.withType(ConfigDef.Type.BOOLEAN)
@ -144,6 +159,7 @@ public static ArrayEncoding parse(String value, String defaultValue) {
private String delimiter;
private boolean dropTombstones;
private DeleteHandling handleDeletes;
private R getTombstoneRecord(R r) {
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct();
@ -246,6 +262,16 @@ public R apply(R r) {
// delete
else {
valueDocument = new BsonDocument();
switch (handleDeletes) {
case DROP:
LOGGER.trace("Delete message {} requested to be dropped", r.key());
return null;
case REWRITE:
LOGGER.trace("Delete message {} requested to be rewritten", r.key());
valueDocument.append(DELETED_FIELD, new BsonBoolean(true));
case NONE:
break;
}
}
// insert
}
@ -255,11 +281,15 @@ public R apply(R r) {
valueDocument.append("id", keyDocument.get("id"));
}
if (!valueDocument.containsKey(DELETED_FIELD) && handleDeletes.equals(DeleteHandling.REWRITE)) {
valueDocument.append(DELETED_FIELD, new BsonBoolean(false));
}
Set<Entry<String, BsonValue>> valuePairs = valueDocument.entrySet();
Set<Entry<String, BsonValue>> keyPairs = keyDocument.entrySet();
for (Entry<String, BsonValue> valuePairsforSchema : valuePairs) {
if (valuePairsforSchema.getKey().toString().equalsIgnoreCase("$set")) {
if (valuePairsforSchema.getKey().equalsIgnoreCase("$set")) {
BsonDocument val1 = BsonDocument.parse(valuePairsforSchema.getValue().toString());
Set<Entry<String, BsonValue>> keyValuesforSetSchema = val1.entrySet();
for (Entry<String, BsonValue> keyValuesforSetSchemaEntry : keyValuesforSetSchema) {
@ -280,7 +310,7 @@ public R apply(R r) {
Struct finalKeyStruct = new Struct(finalKeySchema);
for (Entry<String, BsonValue> valuePairsforStruct : valuePairs) {
if (valuePairsforStruct.getKey().toString().equalsIgnoreCase("$set")) {
if (valuePairsforStruct.getKey().equalsIgnoreCase("$set")) {
BsonDocument val1 = BsonDocument.parse(valuePairsforStruct.getValue().toString());
Set<Entry<String, BsonValue>> keyvalueforSetStruct = val1.entrySet();
for (Entry<String, BsonValue> keyvalueforSetStructEntry : keyvalueforSetStruct) {
@ -326,7 +356,7 @@ public void close() {
@Override
public void configure(final Map<String, ?> map) {
final Configuration config = Configuration.from(map);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, DROP_TOMBSTONES);
final Field.Set configFields = Field.setOf(ARRAY_ENCODING, FLATTEN_STRUCT, DELIMITER, OPERATION_HEADER, HANDLE_DELETES, DROP_TOMBSTONES);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
@ -340,6 +370,7 @@ public void configure(final Map<String, ?> map) {
delimiter = config.getString(DELIMITER);
dropTombstones = config.getBoolean(DROP_TOMBSTONES);
handleDeletes = DeleteHandling.parse(config.getString(HANDLE_DELETES));
final Map<String, String> afterExtractorConfig = new HashMap<>();
afterExtractorConfig.put("field", "after");

View File

@ -52,6 +52,7 @@ public class UnwrapFromMongoDbEnvelopeTest {
private static final String FLATTEN_STRUCT = "flatten.struct";
private static final String DELIMITER = "flatten.struct.delimiter";
private static final String OPERATION_HEADER = "operation.header";
private static final String HANDLE_DELETES = "delete.handling.mode";
private Filters filters;
private SourceInfo source;
@ -323,6 +324,10 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Interrup
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "none");
transformation.configure(props);
// when
SourceRecord transformed = transformation.apply(record);
@ -358,6 +363,7 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
final Map<String, String> props = new HashMap<>();
props.put(OPERATION_HEADER, "true");
props.put(HANDLE_DELETES, "none");
transformation.configure(props);
// when

View File

@ -50,6 +50,7 @@ public class UnwrapFromMongoDbEnvelopeTestIT extends AbstractConnectorTest {
private static final String TOPIC_NAME = "mongo.transform.source";
private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
private static final String HANDLE_DELETES = "delete.handling.mode";
private Configuration config;
private MongoDbTaskContext context;
@ -143,6 +144,7 @@ public void shouldTransformEvents() throws InterruptedException, IOException {
final Map<String, String> transformationConfig = new HashMap<>();
transformationConfig.put(CONFIG_DROP_TOMBSTONES, "false");
transformationConfig.put(HANDLE_DELETES, "none");
transformation.configure(transformationConfig);
// Test insert