diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java index 98f261d44..22860682c 100644 --- a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTest.java @@ -233,6 +233,7 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException { } @Test + @FixFor("DBZ-612") public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException { BsonTimestamp ts = new BsonTimestamp(1000, 1); CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); @@ -269,6 +270,7 @@ public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedExce } @Test + @FixFor("DBZ-612") public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException { BsonTimestamp ts = new BsonTimestamp(1000, 1); CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); @@ -342,6 +344,110 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Interrup assertThat(value).isNull(); } + @Test + @FixFor("DBZ-583") + public void shouldDropDeleteMessagesByDefault() throws InterruptedException { + RecordMakers recordMakers = new RecordMakers(filters, source, topicSelector, produced::add, false); + + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document("_id", objId); + + // given + Document event = new Document().append("o", obj) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "d"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + + SourceRecord record = produced.get(0); + + // when + SourceRecord transformed = transformation.apply(record); + + // then assert transformed message is skipped + assertThat(transformed).isNull(); + } + + @Test + @FixFor("DBZ-583") + public void shouldRewriteDeleteMessage() throws InterruptedException { + RecordMakers recordMakers = new RecordMakers(filters, source, topicSelector, produced::add, false); + + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document("_id", objId); + + // given + Document event = new Document().append("o", obj) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "d"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + + SourceRecord record = produced.get(0); + + final Map props = new HashMap<>(); + props.put(HANDLE_DELETES, "rewrite"); + transformation.configure(props); + + // when + SourceRecord transformed = transformation.apply(record); + + Struct key = (Struct) transformed.key(); + Struct value = (Struct) transformed.value(); + + // then assert key and its schema + assertThat(key.schema()).isSameAs(transformed.keySchema()); + assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA); + assertThat(key.get("id")).isEqualTo(objId.toString()); + + assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA); + assertThat(value.get("__deleted")).isEqualTo(true); + } + + @Test + @FixFor("DBZ-583") + public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException { + BsonTimestamp ts = new BsonTimestamp(1000, 1); + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + ObjectId objId = new ObjectId(); + Document obj = new Document().append("$set", new Document("name", "Sally")); + + // given + Document event = new Document().append("o", obj) + .append("o2", objId) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "u"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + assertThat(produced.size()).isEqualTo(1); + SourceRecord record = produced.get(0); + + final Map props = new HashMap<>(); + props.put(HANDLE_DELETES, "rewrite"); + transformation.configure(props); + + // when + SourceRecord transformedRecord = transformation.apply(record); + + Struct value = (Struct) transformedRecord.value(); + + // then assert value and its schema + assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA); + assertThat(value.get("__deleted")).isEqualTo(false); + } + @Test public void shouldGenerateRecordForDeleteEvent() throws InterruptedException { BsonTimestamp ts = new BsonTimestamp(1000, 1); @@ -387,6 +493,7 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException { } @Test + @FixFor("DBZ-971") public void shouldPropagatePreviousRecordHeaders() throws InterruptedException { BsonTimestamp ts = new BsonTimestamp(1000, 1); CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");