From df738a2abdfab9fe8096fd8cc8c3098daa9b1e7b Mon Sep 17 00:00:00 2001 From: Renato Mefi Date: Thu, 8 Nov 2018 14:11:08 +0100 Subject: [PATCH] DBZ-612 Support MongoDB CDC full $unset operation Mongo internally transforms everything into $set and $unset operations when they are in the oplog, this guarantees that you can have operations which are $set, $unset or combined. --- .../transforms/UnwrapFromMongoDbEnvelope.java | 11 +++++- .../UnwrapFromMongoDbEnvelopeTest.java | 34 +++++++++++++++++++ .../UnwrapFromMongoDbEnvelopeTestIT.java | 18 ++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java index 46df64918..122cdd882 100644 --- a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java @@ -142,7 +142,16 @@ public R apply(R r) { // update if (patchRecord.value() != null) { document = BsonDocument.parse(patchRecord.value().toString()); - valueDocument = document.getDocument("$set"); + + if (!document.containsKey("$set") && !document.containsKey("$unset")) { + throw new ConnectException("Unable to process Mongo Operation, a '$set' or '$unset' is necessary."); + } + + valueDocument = new BsonDocument(); + + if (document.containsKey("$set")) { + valueDocument = document.getDocument("$set"); + } if (document.containsKey("$unset")) { Set> unsetDocumentEntry = document.getDocument("$unset").entrySet(); diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java index 1519637ff..8d537a0a2 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java @@ -243,6 +243,40 @@ public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedExce assertThat(value.schema().fields()).hasSize(3); } + @Test + public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException { + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document() + .append("$unset", new Document().append("phone", true).append("active", false)) + ; + + // given + Document event = new Document().append("o", obj) + .append("o2", objId) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "u"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + SourceRecord record = produced.get(0); + + // when + SourceRecord transformed = transformation.apply(record); + + Struct value = (Struct) transformed.value(); + + // and then assert value and its schema + assertThat(value.schema()).isSameAs(transformed.valueSchema()); + assertThat(value.get("phone")).isEqualTo(null); + + assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA); + assertThat(value.schema().fields()).hasSize(2); + } + @Test @FixFor("DBZ-582") public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException { diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java index b8a55a58d..69fa20d1e 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.java @@ -156,6 +156,24 @@ public void shouldTransformEvents() throws InterruptedException, IOException { assertThat(transformedMultipleUpdateValue.get("newStr")).isEqualTo("hello"); assertThat(transformedMultipleUpdateValue.get("dataInt")).isEqualTo(456); + // Test Update with $unset operation + primary().execute("update", client -> { + client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).updateOne(RawBsonDocument.parse("{'_id' : 1}"), + RawBsonDocument.parse("{'$unset': {'newStr': ''}}")); + }); + + records = consumeRecordsByTopic(1); + + assertThat(records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1); + final SourceRecord updateUnsetRecord = records.recordsForTopic(TOPIC_NAME).get(0); + final SourceRecord transformedUnsetUpdate = transformation.apply(updateUnsetRecord); + final Struct transformedUnsetUpdateValue = (Struct)transformedUnsetUpdate.value(); + + assertThat(transformedUnsetUpdate.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA); + assertThat(transformedUnsetUpdate.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA); + assertThat(transformedUnsetUpdateValue.get("id")).isEqualTo(1); + assertThat(transformedUnsetUpdateValue.get("newStr")).isEqualTo(null); + // Test update primary().execute("delete", client -> { client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));