From 7a4e1c48f19f2d644d0340d9bd396324c7fd0ac7 Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Wed, 7 Nov 2018 18:56:22 +0100 Subject: [PATCH] DBZ-612 Transform $unset into null values --- .../transforms/UnwrapFromMongoDbEnvelope.java | 22 +++++++++-- .../UnwrapFromMongoDbEnvelopeTest.java | 37 +++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java index 7e319b97b..46df64918 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.Transformation; import org.bson.BsonDocument; +import org.bson.BsonNull; import org.bson.BsonValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,7 +129,8 @@ public R apply(R r) { } SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().name(newValueSchemaName); SchemaBuilder keySchemabuilder = SchemaBuilder.struct(); - BsonDocument valueDocument = null; + BsonDocument document; + BsonDocument valueDocument; final R afterRecord = afterExtractor.apply(r); final R key = keyExtractor.apply(r); @@ -139,8 +141,22 @@ public R apply(R r) { // update if (patchRecord.value() != null) { - valueDocument = BsonDocument.parse(patchRecord.value().toString()); - valueDocument = valueDocument.getDocument("$set"); + document = BsonDocument.parse(patchRecord.value().toString()); + valueDocument = document.getDocument("$set"); + + if (document.containsKey("$unset")) { + Set> unsetDocumentEntry = document.getDocument("$unset").entrySet(); + + for (Entry valueEntry : unsetDocumentEntry) { + // In case unset of a key is false we don't have to do anything with it, + // if it's true we want to set the value to null + if (!valueEntry.getValue().asBoolean().getValue()) { + continue; + } + valueDocument.append(valueEntry.getKey(), new BsonNull()); + } + } + if (!valueDocument.containsKey("id")) { valueDocument.append("id", keyDocument.get("id")); } diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java index 99bcc5857..1519637ff 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java @@ -166,6 +166,7 @@ public void shouldTransformRecordForInsertEventWithComplexIdType() throws Interr assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA); assertThat(value.schema().fields()).hasSize(2); } + @Test public void shouldGenerateRecordForUpdateEvent() throws InterruptedException { BsonTimestamp ts = new BsonTimestamp(1000, 1); @@ -206,6 +207,42 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException { assertThat(value.schema().fields()).hasSize(2); } + @Test + public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException { + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("$set", new Document("name", "Sally")) + .append("$unset", new Document().append("phone", true).append("active", false)) + ; + + // given + Document event = new Document().append("o", obj) + .append("o2", objId) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "u"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + SourceRecord record = produced.get(0); + + // when + SourceRecord transformed = transformation.apply(record); + + Struct value = (Struct) transformed.value(); + + // and then assert value and its schema + assertThat(value.schema()).isSameAs(transformed.valueSchema()); + assertThat(value.get("name")).isEqualTo("Sally"); + assertThat(value.get("phone")).isEqualTo(null); + + assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA); + assertThat(value.schema().fields()).hasSize(3); + } + @Test @FixFor("DBZ-582") public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {