package getsvc import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) type objectGetter interface { GetObjectAndWritePayload(ctx context.Context, id oid.ID, rng *objectSDK.Range, writer ChunkWriter) (*objectSDK.Object, error) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) } type assembler struct { addr oid.Address splitInfo *objectSDK.SplitInfo rng *objectSDK.Range objGetter objectGetter currentOffset uint64 parentObject *objectSDK.Object } func newAssembler( addr oid.Address, splitInfo *objectSDK.SplitInfo, rng *objectSDK.Range, objGetter objectGetter) *assembler { return &assembler{ addr: addr, rng: rng, splitInfo: splitInfo, objGetter: objGetter, } } // Assemble assembles splitted large object and writes it's content to ObjectWriter. // It returns parent object. func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { sourceObjectID, ok := a.getLastPartOrLinkObjectID() if !ok { return nil, objectSDK.NewSplitInfoError(a.splitInfo) } previousID, childrenIDs, err := a.initializeFromSourceObjectID(ctx, sourceObjectID) if err != nil { return nil, err } if previousID == nil && len(childrenIDs) == 0 { return nil, objectSDK.NewSplitInfoError(a.splitInfo) } if len(childrenIDs) > 0 { if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil { return nil, err } } else { if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil { return nil, err } } return a.parentObject, nil } func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { sourceObjectID, ok := a.splitInfo.Link() if ok { return sourceObjectID, true } sourceObjectID, ok = a.splitInfo.LastPart() if ok { return sourceObjectID, true } return oid.ID{}, false } func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) (*oid.ID, []oid.ID, error) { w := NewSimpleObjectWriter() sourceObject, err := a.getChildObject(ctx, id, nil, true, w) if err != nil { return nil, nil, err } sourceObject.SetPayload(w.pld) parentObject := sourceObject.Parent() if parentObject == nil { return nil, nil, errChildWithEmptyParent } a.parentObject = parentObject var payload []byte if a.rng != nil { seekOff := a.rng.GetOffset() seekLen := a.rng.GetLength() seekTo := seekOff + seekLen parentSize := parentObject.PayloadSize() if seekTo < seekOff || parentSize < seekOff || parentSize < seekTo { return nil, nil, &apistatus.ObjectOutOfRange{} } sourceSize := sourceObject.PayloadSize() a.currentOffset = parentSize - sourceSize from := uint64(0) if a.currentOffset < seekOff { from = seekOff - a.currentOffset } to := uint64(0) if seekOff+seekLen > a.currentOffset+from { to = seekOff + seekLen - a.currentOffset } payload = sourceObject.Payload()[from:to] a.rng.SetLength(a.rng.GetLength() - to + from) } else { payload = sourceObject.Payload() } a.parentObject.SetPayload(payload) idPrev, ok := sourceObject.PreviousID() if ok { return &idPrev, sourceObject.Children(), nil } return nil, sourceObject.Children(), nil } func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, verifyIsChild bool, writer ChunkWriter) (*objectSDK.Object, error) { obj, err := a.objGetter.GetObjectAndWritePayload(ctx, id, rng, writer) if err != nil { return nil, err } if verifyIsChild && !a.isChild(obj) { return nil, errParentAddressDiffers } return obj, nil } func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error { if a.rng == nil { if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { return err } return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true) } if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil { return err } return writer.WriteChunk(ctx, a.parentObject.Payload()) } func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error { if a.rng == nil { if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { return err } } if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil { return err } if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part return err } return nil } func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error { withRng := len(partRanges) > 0 && a.rng != nil for i := range partIDs { var r *objectSDK.Range if withRng { r = &partRanges[i] } _, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild, writer) if err != nil { return err } } return nil } func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error { chain, rngs, err := a.buildChain(ctx, prevID) if err != nil { return err } reverseRngs := len(rngs) > 0 for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 { chain[left], chain[right] = chain[right], chain[left] if reverseRngs { rngs[left], rngs[right] = rngs[right], rngs[left] } } return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false) } func (a *assembler) isChild(obj *objectSDK.Object) bool { parent := obj.Parent() return parent == nil || equalAddresses(a.addr, object.AddressOf(parent)) } func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) { var ( chain []oid.ID rngs []objectSDK.Range from = a.rng.GetOffset() to = from + a.rng.GetLength() hasPrev = true ) // fill the chain end-to-start for hasPrev { // check that only for "range" requests, // for `GET` it stops via the false `withPrev` if a.rng != nil && a.currentOffset <= from { break } head, err := a.objGetter.HeadObject(ctx, prevID) if err != nil { return nil, nil, err } if !a.isChild(head) { return nil, nil, errParentAddressDiffers } if a.rng != nil { sz := head.PayloadSize() a.currentOffset -= sz if a.currentOffset < to { off := uint64(0) if from > a.currentOffset { off = from - a.currentOffset sz -= from - a.currentOffset } if to < a.currentOffset+off+sz { sz = to - off - a.currentOffset } index := len(rngs) rngs = append(rngs, objectSDK.Range{}) rngs[index].SetOffset(off) rngs[index].SetLength(sz) id, _ := head.ID() chain = append(chain, id) } } else { id, _ := head.ID() chain = append(chain, id) } prevID, hasPrev = head.PreviousID() } return chain, rngs, nil }