DBZ-8113 Preserve source info during emitting PK change

This commit is contained in:
Vojtech Juranek 2024-08-08 07:49:02 +02:00 committed by Jiri Pechanec
parent 27bb2d261b
commit d02d129404

View File

@ -171,16 +171,19 @@ protected boolean skipEmptyMessages() {
protected void emitUpdateAsPrimaryKeyChangeRecord(Receiver<P> receiver, TableSchema tableSchema, Struct oldKey,
Struct newKey, Struct oldValue, Struct newValue)
throws InterruptedException {
final OffsetContext offset = getOffset();
final Struct sourceInfo = offset.getSourceInfo();
ConnectHeaders headers = new ConnectHeaders();
headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, sourceInfo, getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, offset, headers);
headers = new ConnectHeaders();
headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
envelope = tableSchema.getEnvelopeSchema().create(newValue, sourceInfo, getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, offset, headers);
}
}