DBZ-612 Support MongoDB CDC full $unset operation

Mongo internally transforms everything into $set and $unset operations
when they are in the oplog, this guarantees that you can have operations
which are $set, $unset or combined.
This commit is contained in:
Renato Mefi 2018-11-08 14:11:08 +01:00 committed by Gunnar Morling
parent d44a44aee5
commit df738a2abd
3 changed files with 62 additions and 1 deletions

View File

@ -142,7 +142,16 @@ public R apply(R r) {
// update
if (patchRecord.value() != null) {
document = BsonDocument.parse(patchRecord.value().toString());
valueDocument = document.getDocument("$set");
if (!document.containsKey("$set") && !document.containsKey("$unset")) {
throw new ConnectException("Unable to process Mongo Operation, a '$set' or '$unset' is necessary.");
}
valueDocument = new BsonDocument();
if (document.containsKey("$set")) {
valueDocument = document.getDocument("$set");
}
if (document.containsKey("$unset")) {
Set<Entry<String, BsonValue>> unsetDocumentEntry = document.getDocument("$unset").entrySet();

View File

@ -243,6 +243,40 @@ public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedExce
assertThat(value.schema().fields()).hasSize(3);
}
@Test
public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("$unset", new Document().append("phone", true).append("active", false))
;
// given
Document event = new Document().append("o", obj)
.append("o2", objId)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "u");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
// when
SourceRecord transformed = transformation.apply(record);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("phone")).isEqualTo(null);
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(2);
}
@Test
@FixFor("DBZ-582")
public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {

View File

@ -156,6 +156,24 @@ public void shouldTransformEvents() throws InterruptedException, IOException {
assertThat(transformedMultipleUpdateValue.get("newStr")).isEqualTo("hello");
assertThat(transformedMultipleUpdateValue.get("dataInt")).isEqualTo(456);
// Test Update with $unset operation
primary().execute("update", client -> {
client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).updateOne(RawBsonDocument.parse("{'_id' : 1}"),
RawBsonDocument.parse("{'$unset': {'newStr': ''}}"));
});
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1);
final SourceRecord updateUnsetRecord = records.recordsForTopic(TOPIC_NAME).get(0);
final SourceRecord transformedUnsetUpdate = transformation.apply(updateUnsetRecord);
final Struct transformedUnsetUpdateValue = (Struct)transformedUnsetUpdate.value();
assertThat(transformedUnsetUpdate.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
assertThat(transformedUnsetUpdate.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
assertThat(transformedUnsetUpdateValue.get("id")).isEqualTo(1);
assertThat(transformedUnsetUpdateValue.get("newStr")).isEqualTo(null);
// Test update
primary().execute("delete", client -> {
client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));