DBZ-563 MongoDB Unwrapper tests for "delete.handling.mode"

This commit is contained in:
Renato Mefi 2018-11-28 18:25:53 +01:00 committed by Gunnar Morling
parent fcef819dc8
commit 8d8ef26dae

View File

@ -233,6 +233,7 @@ public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
}
@Test
@FixFor("DBZ-612")
public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
@ -269,6 +270,7 @@ public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedExce
}
@Test
@FixFor("DBZ-612")
public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
@ -342,6 +344,110 @@ public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Interrup
assertThat(value).isNull();
}
@Test
@FixFor("DBZ-583")
public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
RecordMakers recordMakers = new RecordMakers(filters, source, topicSelector, produced::add, false);
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document("_id", objId);
// given
Document event = new Document().append("o", obj)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "d");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
// when
SourceRecord transformed = transformation.apply(record);
// then assert transformed message is skipped
assertThat(transformed).isNull();
}
@Test
@FixFor("DBZ-583")
public void shouldRewriteDeleteMessage() throws InterruptedException {
RecordMakers recordMakers = new RecordMakers(filters, source, topicSelector, produced::add, false);
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document("_id", objId);
// given
Document event = new Document().append("o", obj)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "d");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
transformation.configure(props);
// when
SourceRecord transformed = transformation.apply(record);
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.get("__deleted")).isEqualTo(true);
}
@Test
@FixFor("DBZ-583")
public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
ObjectId objId = new ObjectId();
Document obj = new Document().append("$set", new Document("name", "Sally"));
// given
Document event = new Document().append("o", obj)
.append("o2", objId)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "u");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
assertThat(produced.size()).isEqualTo(1);
SourceRecord record = produced.get(0);
final Map<String, String> props = new HashMap<>();
props.put(HANDLE_DELETES, "rewrite");
transformation.configure(props);
// when
SourceRecord transformedRecord = transformation.apply(record);
Struct value = (Struct) transformedRecord.value();
// then assert value and its schema
assertThat(value.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
assertThat(value.get("__deleted")).isEqualTo(false);
}
@Test
public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
@ -387,6 +493,7 @@ public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
}
@Test
@FixFor("DBZ-971")
public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
BsonTimestamp ts = new BsonTimestamp(1000, 1);
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");