DBZ-201 Corrected handling of ObjectID within non-insert events

Corrected how the MongoDB extracts the event key from the update event’s ObjectID, and verified the behavior with an integration test.
This commit is contained in:
Randall Hauch 2017-03-17 15:02:29 -05:00
parent db1aacc5f9
commit 88b5ffa113
2 changed files with 63 additions and 0 deletions

View File

@ -220,6 +220,11 @@ protected String objectIdLiteral(Object id) {
return (String) id; return (String) id;
} }
if (id instanceof Document) { if (id instanceof Document) {
Document doc = (Document)id;
if (doc.containsKey("_id") && doc.size() == 1) {
// This is an embedded ObjectId ...
return objectIdLiteral(doc.get("_id"));
}
return valueTransformer.apply((Document) id); return valueTransformer.apply((Document) id);
} }
return id.toString(); return id.toString();

View File

@ -13,6 +13,7 @@
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.Config;
@ -259,6 +260,63 @@ public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IO
verifyCreateOperation(record); verifyCreateOperation(record);
}); });
// ---------------------------------------------------------------------------------------------------------------
// Create and then update a document
// ---------------------------------------------------------------------------------------------------------------
//Testing.Debug.enable();
AtomicReference<String> id = new AtomicReference<>();
primary().execute("create", mongo->{
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection("arbitrary");
coll.drop();
// Insert the document with a generated ID ...
Document doc = Document.parse("{\"a\": 1, \"b\": 2}");
InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(true);
coll.insertOne(doc, insertOptions);
// Find the document to get the generated ID ...
doc = coll.find().first();
Testing.debug("Document: " + doc);
id.set(doc.getObjectId("_id").toString());
Testing.debug("Document ID: " + id.get());
});
primary().execute("update", mongo->{
MongoDatabase db1 = mongo.getDatabase("dbit");
MongoCollection<Document> coll = db1.getCollection("arbitrary");
// Find the document ...
Document doc = coll.find().first();
Testing.debug("Document: " + doc);
Document filter = Document.parse("{\"a\": 1}");
Document operation = Document.parse("{ \"$set\": { \"b\": 10 } }");
coll.updateOne(filter, operation);
doc = coll.find().first();
Testing.debug("Document: " + doc);
});
// Wait until we can consume the 1 insert and 1 update ...
SourceRecords insertAndUpdate = consumeRecordsByTopic(2);
assertThat(insertAndUpdate.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2);
assertThat(insertAndUpdate.topics().size()).isEqualTo(1);
records4.forEach(record -> {
// Check that all records are valid, and can be serialized and deserialized ...
validate(record);
verifyNotFromInitialSync(record);
verifyCreateOperation(record);
});
SourceRecord insertRecord = insertAndUpdate.allRecordsInOrder().get(0);
SourceRecord updateRecord = insertAndUpdate.allRecordsInOrder().get(1);
Testing.debug("Insert event: " + insertRecord);
Testing.debug("Update event: " + updateRecord);
Struct insertKey = (Struct)insertRecord.key();
Struct updateKey = (Struct)updateRecord.key();
String insertId = insertKey.getString("_id");
String updatetId = updateKey.getString("_id");
assertThat(insertId).isEqualTo(id.get());
assertThat(updatetId).isEqualTo(id.get());
} }
protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) { protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) {