frostfs-node/pkg/services/object/get/assembler.go
Evgenii Stratonikov c8acdf40bb [#1616] getsvc: Use slices.Reverse() where possible
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2025-01-30 06:50:37 +00:00

233 lines
5.8 KiB
Go

package getsvc
import (
"context"
"slices"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
type objectGetter interface {
GetObjectAndWritePayload(ctx context.Context, id oid.ID, rng *objectSDK.Range, writer ChunkWriter) (*objectSDK.Object, error)
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
}
type assembler struct {
addr oid.Address
splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range
objGetter objectGetter
head bool
currentOffset uint64
parentObject *objectSDK.Object
}
func newAssembler(
addr oid.Address,
splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range,
objGetter objectGetter,
head bool,
) *assembler {
return &assembler{
addr: addr,
rng: rng,
splitInfo: splitInfo,
objGetter: objGetter,
head: head,
}
}
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
previousID, childrenIDs, err := a.initializeFromSourceObjectID(ctx, sourceObjectID)
if err != nil {
return nil, err
}
if previousID == nil && len(childrenIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if len(childrenIDs) > 0 {
if a.rng != nil {
err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer)
} else {
err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer)
}
} else {
if a.rng != nil {
err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer)
} else {
err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer)
}
}
if err != nil {
return nil, err
}
return a.parentObject, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link()
if ok {
return sourceObjectID, true
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
return sourceObjectID, true
}
return oid.ID{}, false
}
func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) (*oid.ID, []oid.ID, error) {
w := NewSimpleObjectWriter()
sourceObject, err := a.getChildObject(ctx, id, nil, true, w)
if err != nil {
return nil, nil, err
}
sourceObject.SetPayload(w.pld)
parentObject := sourceObject.Parent()
if parentObject == nil {
return nil, nil, errChildWithEmptyParent
}
a.parentObject = parentObject
var payload []byte
if a.rng != nil {
seekOff := a.rng.GetOffset()
seekLen := a.rng.GetLength()
seekTo := seekOff + seekLen
parentSize := parentObject.PayloadSize()
if seekTo < seekOff || parentSize < seekOff || parentSize < seekTo {
return nil, nil, &apistatus.ObjectOutOfRange{}
}
sourceSize := sourceObject.PayloadSize()
a.currentOffset = parentSize - sourceSize
from := uint64(0)
if a.currentOffset < seekOff {
from = seekOff - a.currentOffset
}
to := uint64(0)
if seekOff+seekLen >= a.currentOffset+from {
to = seekOff + seekLen - a.currentOffset
}
payload = sourceObject.Payload()[from:to]
a.rng.SetLength(a.rng.GetLength() - to + from)
} else {
payload = sourceObject.Payload()
}
a.parentObject.SetPayload(payload)
idPrev, ok := sourceObject.PreviousID()
if ok {
return &idPrev, sourceObject.Children(), nil
}
return nil, sourceObject.Children(), nil
}
func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, verifyIsChild bool, writer ChunkWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.GetObjectAndWritePayload(ctx, id, rng, writer)
if err != nil {
return nil, err
}
if verifyIsChild && !a.isChild(obj) {
return nil, errParentAddressDiffers
}
return obj, nil
}
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, true)
}
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
return err
}
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
return err
}
return nil
}
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, verifyIsChild bool) error {
for i := range partIDs {
_, err := a.getChildObject(ctx, partIDs[i], nil, verifyIsChild, writer)
if err != nil {
return err
}
}
return nil
}
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
chain, err := a.buildChain(ctx, prevID)
if err != nil {
return err
}
slices.Reverse(chain)
return a.assemblePayloadByObjectIDs(ctx, writer, chain, false)
}
func (a *assembler) isChild(obj *objectSDK.Object) bool {
parent := obj.Parent()
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
}
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, error) {
var (
chain []oid.ID
hasPrev = true
)
// fill the chain end-to-start
for hasPrev {
head, err := a.objGetter.HeadObject(ctx, prevID)
if err != nil {
return nil, err
}
if !a.isChild(head) {
return nil, errParentAddressDiffers
}
id, _ := head.ID()
chain = append(chain, id)
prevID, hasPrev = head.PreviousID()
}
return chain, nil
}