DBZ-2455 ExtractNewDocumentState support patch in add.fields

This commit is contained in:
Chris Cranford 2020-08-28 11:33:32 -04:00 committed by Gunnar Morling
parent 4309bace1b
commit 2b5c2e3ec7
2 changed files with 64 additions and 1 deletions

View File

@ -566,7 +566,9 @@ else if (parts.length == 2) {
* Determine the struct hosting the given unqualified field.
*/
private static String determineStruct(String simpleFieldName) {
if (simpleFieldName.equals(Envelope.FieldName.OPERATION) || simpleFieldName.equals(Envelope.FieldName.TIMESTAMP)) {
if (simpleFieldName.equals(Envelope.FieldName.OPERATION) ||
simpleFieldName.equals(Envelope.FieldName.TIMESTAMP) ||
simpleFieldName.equals("patch")) {
return null;
}
else if (simpleFieldName.equals(TransactionMonitor.DEBEZIUM_TRANSACTION_ID_KEY) ||

View File

@ -1381,6 +1381,67 @@ public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
assertThat(tombstoneTransformed.value()).isNull();
}
@Test
@FixFor("DBZ-2455")
public void testAddPatchFieldAfterUpdate() throws Exception {
waitForStreamingRunning();
ObjectId objId = new ObjectId();
Document obj = new Document()
.append("_id", objId)
.append("a", 1)
.append("b", 2)
.append("c", 3);
// insert
primary().execute("insert", client -> {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName()).insertOne(obj);
});
SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
// update
Document updateObj = new Document().append("$set", new Document(Collect.hashMapOf("a", 22)));
// update
primary().execute("update", client -> {
client.getDatabase(DB_NAME).getCollection(this.getCollectionName())
.updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objId + "'}}"), updateObj);
});
records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(this.topicName()).size()).isEqualTo(1);
assertNoRecordsToConsume();
final Map<String, String> props = new HashMap<>();
props.put(ADD_FIELDS, "patch");
transformation.configure(props);
// Perform transformation
final SourceRecord transformed = transformation.apply(records.allRecordsInOrder().get(0));
Struct key = (Struct) transformed.key();
Struct value = (Struct) transformed.value();
// then assert key and its schema
assertThat(key.schema()).isSameAs(transformed.keySchema());
assertThat(key.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(key.get("id")).isEqualTo(objId.toString());
// and then assert value and its schema
assertThat(value.schema()).isSameAs(transformed.valueSchema());
assertThat(value.get("id")).isEqualTo(objId.toString());
assertThat(value.get("a")).isEqualTo(22);
assertThat(value.get("__patch")).isEqualTo("{\"$v\": 1,\"$set\": {\"a\": 22}}");
assertThat(value.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
assertThat(value.schema().field("a").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
assertThat(value.schema().field("__patch").schema()).isEqualTo(io.debezium.data.Json.builder().optional().build());
assertThat(value.schema().fields()).hasSize(3);
}
private SourceRecord createCreateRecord() throws Exception {
ObjectId objId = new ObjectId();
Document obj = new Document()