DBZ-612 Transform $unset into null values

This commit is contained in:
Renato Mefi 2018-11-07 18:56:22 +01:00 committed by Gunnar Morling
parent 6e115043ca
commit 7a4e1c48f1
2 changed files with 56 additions and 3 deletions

View File

@ -20,6 +20,7 @@
import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.Flatten;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.bson.BsonDocument; import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonValue; import org.bson.BsonValue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -128,7 +129,8 @@ public R apply(R r) {
} }
SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().name(newValueSchemaName); SchemaBuilder valueSchemaBuilder = SchemaBuilder.struct().name(newValueSchemaName);
SchemaBuilder keySchemabuilder = SchemaBuilder.struct(); SchemaBuilder keySchemabuilder = SchemaBuilder.struct();
BsonDocument valueDocument = null; BsonDocument document;
BsonDocument valueDocument;
final R afterRecord = afterExtractor.apply(r); final R afterRecord = afterExtractor.apply(r);
final R key = keyExtractor.apply(r); final R key = keyExtractor.apply(r);
@ -139,8 +141,22 @@ public R apply(R r) {
// update // update
if (patchRecord.value() != null) { if (patchRecord.value() != null) {
valueDocument = BsonDocument.parse(patchRecord.value().toString()); document = BsonDocument.parse(patchRecord.value().toString());
valueDocument = valueDocument.getDocument("$set"); valueDocument = document.getDocument("$set");
if (document.containsKey("$unset")) {
Set<Entry<String, BsonValue>> unsetDocumentEntry = document.getDocument("$unset").entrySet();
for (Entry<String, BsonValue> valueEntry : unsetDocumentEntry) {
// In case unset of a key is false we don't have to do anything with it,
// if it's true we want to set the value to null
if (!valueEntry.getValue().asBoolean().getValue()) {
continue;
}
valueDocument.append(valueEntry.getKey(), new BsonNull());
}
}
if (!valueDocument.containsKey("id")) { if (!valueDocument.containsKey("id")) {
valueDocument.append("id", keyDocument.get("id")); valueDocument.append("id", keyDocument.get("id"));
} }

View File

@ -166,6 +166,7 @@ public void shouldTransformRecordForInsertEventWithComplexIdType() throws Interr
assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA); assertThat(value.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(2); assertThat(value.schema().fields()).hasSize(2);
} }
@Test @Test
public void shouldGenerateRecordForUpdateEvent() throws InterruptedException { public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1); BsonTimestamp ts = new BsonTimestamp(1000, 1);
@ -206,6 +207,42 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
assertThat(value.schema().fields()).hasSize(2); assertThat(value.schema().fields()).hasSize(2);
} }
@Test
public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("$set", new Document("name", "Sally"))
.append("$unset", new Document().append("phone", true).append("active", false))
;
// given
Document event = new Document().append("o", obj)
.append("o2", objId)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "u");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
// when
SourceRecord transformed = transformation.apply(record);
Struct value = (Struct) transformed.value();
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("name")).isEqualTo("Sally");
assertThat(value.get("phone")).isEqualTo(null);
assertThat(value.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().fields()).hasSize(3);
}
@Test @Test
@FixFor("DBZ-582") @FixFor("DBZ-582")
public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException { public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {