frostfs-node/pkg/services/object/get/assembler.go
Dmitrii Stepanov 99bb488ebd
Some checks failed
Build / Build Components (1.20) (pull_request) Failing after 1s
Build / Build Components (1.19) (pull_request) Failing after 3s
Tests and linters / Lint (pull_request) Failing after 2s
Tests and linters / Tests (1.19) (pull_request) Failing after 1s
Tests and linters / Tests (1.20) (pull_request) Failing after 2s
Tests and linters / Tests with -race (pull_request) Failing after 1s
Vulncheck / Vulncheck (pull_request) Failing after 2s
Tests and linters / Staticcheck (pull_request) Failing after 2s
[#539] getsvc: Write payload direct to out stream
To reduce memory allocations.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-07-27 17:02:08 +03:00

278 lines
6.9 KiB
Go

package getsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type objectGetter interface {
GetObjectAndWritePayload(ctx context.Context, id oid.ID, rng *objectSDK.Range, writer ChunkWriter) (*objectSDK.Object, error)
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
}
type assembler struct {
addr oid.Address
splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range
objGetter objectGetter
currentOffset uint64
parentObject *objectSDK.Object
}
func newAssembler(
addr oid.Address,
splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range,
objGetter objectGetter) *assembler {
return &assembler{
addr: addr,
rng: rng,
splitInfo: splitInfo,
objGetter: objGetter,
}
}
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
previousID, childrenIDs, err := a.initializeFromSourceObjectID(ctx, sourceObjectID)
if err != nil {
return nil, err
}
if previousID == nil && len(childrenIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if len(childrenIDs) > 0 {
if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil {
return nil, err
}
} else {
if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil {
return nil, err
}
}
return a.parentObject, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link()
if ok {
return sourceObjectID, true
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
return sourceObjectID, true
}
return oid.ID{}, false
}
func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) (*oid.ID, []oid.ID, error) {
w := NewSimpleObjectWriter()
sourceObject, err := a.getChildObject(ctx, id, nil, true, w)
if err != nil {
return nil, nil, err
}
sourceObject.SetPayload(w.pld)
parentObject := sourceObject.Parent()
if parentObject == nil {
return nil, nil, errChildWithEmptyParent
}
a.parentObject = parentObject
var payload []byte
if a.rng != nil {
seekOff := a.rng.GetOffset()
seekLen := a.rng.GetLength()
seekTo := seekOff + seekLen
parentSize := parentObject.PayloadSize()
if seekTo < seekOff || parentSize < seekOff || parentSize < seekTo {
return nil, nil, &apistatus.ObjectOutOfRange{}
}
sourceSize := sourceObject.PayloadSize()
a.currentOffset = parentSize - sourceSize
from := uint64(0)
if a.currentOffset < seekOff {
from = seekOff - a.currentOffset
}
to := uint64(0)
if seekOff+seekLen > a.currentOffset+from {
to = seekOff + seekLen - a.currentOffset
}
payload = sourceObject.Payload()[from:to]
a.rng.SetLength(a.rng.GetLength() - to + from)
} else {
payload = sourceObject.Payload()
}
a.parentObject.SetPayload(payload)
idPrev, ok := sourceObject.PreviousID()
if ok {
return &idPrev, sourceObject.Children(), nil
}
return nil, sourceObject.Children(), nil
}
func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, verifyIsChild bool, writer ChunkWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.GetObjectAndWritePayload(ctx, id, rng, writer)
if err != nil {
return nil, err
}
if verifyIsChild && !a.isChild(obj) {
return nil, errParentAddressDiffers
}
return obj, nil
}
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true)
}
if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
return err
}
return writer.WriteChunk(ctx, a.parentObject.Payload())
}
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
}
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
return err
}
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
return err
}
return nil
}
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
withRng := len(partRanges) > 0 && a.rng != nil
for i := range partIDs {
var r *objectSDK.Range
if withRng {
r = &partRanges[i]
}
_, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild, writer)
if err != nil {
return err
}
}
return nil
}
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
chain, rngs, err := a.buildChain(ctx, prevID)
if err != nil {
return err
}
reverseRngs := len(rngs) > 0
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
chain[left], chain[right] = chain[right], chain[left]
if reverseRngs {
rngs[left], rngs[right] = rngs[right], rngs[left]
}
}
return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false)
}
func (a *assembler) isChild(obj *objectSDK.Object) bool {
parent := obj.Parent()
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
}
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
var (
chain []oid.ID
rngs []objectSDK.Range
from = a.rng.GetOffset()
to = from + a.rng.GetLength()
hasPrev = true
)
// fill the chain end-to-start
for hasPrev {
// check that only for "range" requests,
// for `GET` it stops via the false `withPrev`
if a.rng != nil && a.currentOffset <= from {
break
}
head, err := a.objGetter.HeadObject(ctx, prevID)
if err != nil {
return nil, nil, err
}
if !a.isChild(head) {
return nil, nil, errParentAddressDiffers
}
if a.rng != nil {
sz := head.PayloadSize()
a.currentOffset -= sz
if a.currentOffset < to {
off := uint64(0)
if from > a.currentOffset {
off = from - a.currentOffset
sz -= from - a.currentOffset
}
if to < a.currentOffset+off+sz {
sz = to - off - a.currentOffset
}
index := len(rngs)
rngs = append(rngs, objectSDK.Range{})
rngs[index].SetOffset(off)
rngs[index].SetLength(sz)
id, _ := head.ID()
chain = append(chain, id)
}
} else {
id, _ := head.ID()
chain = append(chain, id)
}
prevID, hasPrev = head.PreviousID()
}
return chain, rngs, nil
}