DBZ-989 Provide test for the bug confirmation

This commit is contained in:
Renato Mefi 2018-11-18 14:32:51 +01:00 committed by Gunnar Morling
parent b2aed1f46e
commit 488dfe4f72

View File

@ -13,6 +13,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import io.debezium.data.SchemaUtil;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@ -198,21 +199,31 @@ public void shouldTransformEvents() throws InterruptedException, IOException {
assertThat(transformedFullUpdateValue.get("id")).isEqualTo(1);
assertThat(transformedFullUpdateValue.get("dataStr")).isEqualTo("Hi again");
// Test update
// Test Delete
primary().execute("delete", client -> {
client.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
});
records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(TOPIC_NAME).size()).isEqualTo(2);
// Test mongo Deletion operation
final SourceRecord deleteRecord = records.recordsForTopic(TOPIC_NAME).get(0);
final SourceRecord transformedDelete = transformation.apply(deleteRecord);
final Struct transformedDeleteValue = (Struct)transformedDelete.value();
assertThat(transformedDeleteValue).isNull();
assertThat(records.recordsForTopic(TOPIC_NAME).get(1).value()).isNull();
}
// Test tombstone record
final SourceRecord tombstoneRecord = records.recordsForTopic(TOPIC_NAME).get(1);
final SourceRecord transformedTombstone = transformation.apply(tombstoneRecord);
assertThat(transformedTombstone.value()).isNull();
// Assert deletion preserves key
assertThat(SchemaUtil.asString(transformedDelete.keySchema())).isEqualTo(SchemaUtil.asString(transformedTombstone.keySchema()));
assertThat(transformedDelete.key().toString()).isEqualTo(transformedTombstone.key().toString());
}
private MongoPrimary primary() {
ReplicaSet replicaSet = ReplicaSet.parse(context.getConnectionContext().hosts());