From 8b9e40a848eedff78c4f9b93bd70bc4a898d7a7e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 7 Mar 2023 09:17:05 +0300 Subject: [PATCH] [#85] get-service: Add assembler Extract assemble logic to assembler Signed-off-by: Dmitrii Stepanov --- pkg/services/object/get/assembler.go | 293 +++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 pkg/services/object/get/assembler.go diff --git a/pkg/services/object/get/assembler.go b/pkg/services/object/get/assembler.go new file mode 100644 index 000000000..97f88cc9c --- /dev/null +++ b/pkg/services/object/get/assembler.go @@ -0,0 +1,293 @@ +package getsvc + +import ( + "context" + "errors" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +type objectGetter interface { + GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) + HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) +} + +type objectWriter interface { + WriteChunk(context.Context, []byte) error + WriteHeader(context.Context, *objectSDK.Object) error +} + +var ( + errParentAddressDiffers = errors.New("parent address in child object differs") +) + +type assembler struct { + addr oid.Address + splitInfo *objectSDK.SplitInfo + rng *objectSDK.Range + objGetter objectGetter + + currentOffset uint64 + + parentObject *objectSDK.Object +} + +func newAssembler( + addr oid.Address, + splitInfo *objectSDK.SplitInfo, + rng *objectSDK.Range, + objGetter objectGetter) *assembler { + return &assembler{ + addr: addr, + rng: rng, + splitInfo: splitInfo, + objGetter: objGetter, + } +} + +// Assemble assembles splitted large object and writes it's content to ObjectWriter. +// It returns parent object. +func (a *assembler) Assemble(ctx context.Context, writer objectWriter) (*objectSDK.Object, error) { + sourceObjectID, ok := a.getLastPartOrLinkObjectID() + if !ok { + return nil, objectSDK.NewSplitInfoError(a.splitInfo) + } + previousID, childrenIDs, err := a.initializeFromSourceObjectID(ctx, sourceObjectID) + if err != nil { + return nil, err + } + if previousID == nil && len(childrenIDs) == 0 { + return nil, objectSDK.NewSplitInfoError(a.splitInfo) + } + if len(childrenIDs) > 0 { + if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil { + return nil, err + } + } else { + if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil { + return nil, err + } + } + return a.parentObject, nil +} + +func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { + sourceObjectID, ok := a.splitInfo.Link() + if ok { + return sourceObjectID, true + } + sourceObjectID, ok = a.splitInfo.LastPart() + if ok { + return sourceObjectID, true + } + return oid.ID{}, false +} + +func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) (*oid.ID, []oid.ID, error) { + sourceObject, err := a.getChildObject(ctx, id, nil, true) + if err != nil { + return nil, nil, err + } + + parentObject := sourceObject.Parent() + if parentObject == nil { + return nil, nil, errors.New("received child with empty parent") + } + + a.parentObject = parentObject + + var payload []byte + + if a.rng != nil { + seekOff := a.rng.GetOffset() + seekLen := a.rng.GetLength() + seekTo := seekOff + seekLen + parentSize := parentObject.PayloadSize() + + if seekTo < seekOff || parentSize < seekOff || parentSize < seekTo { + return nil, nil, &apistatus.ObjectOutOfRange{} + } + + sourceSize := sourceObject.PayloadSize() + + a.currentOffset = parentSize - sourceSize + + from := uint64(0) + if a.currentOffset < seekOff { + from = seekOff - a.currentOffset + } + + to := uint64(0) + if seekOff+seekLen > a.currentOffset+from { + to = seekOff + seekLen - a.currentOffset + } + + payload = sourceObject.Payload()[from:to] + a.rng.SetLength(a.rng.GetLength() - to + from) + } else { + payload = sourceObject.Payload() + } + + a.parentObject.SetPayload(payload) + + idPrev, ok := sourceObject.PreviousID() + if ok { + return &idPrev, sourceObject.Children(), nil + } + + return nil, sourceObject.Children(), nil +} + +func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, verifyIsChild bool) (*objectSDK.Object, error) { + obj, err := a.objGetter.GetObject(ctx, id, rng) + if err != nil { + return nil, err + } + + if verifyIsChild && !a.isChild(obj) { + return nil, errParentAddressDiffers + } + return obj, nil +} + +func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer objectWriter) error { + if a.rng == nil { + if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { + return err + } + return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true) + } + + if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil { + return err + } + if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { + return err + } + return nil +} + +func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer objectWriter) error { + if a.rng == nil { + if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil { + return err + } + } + + if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil { + return err + } + if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part + return err + } + return nil +} + +func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error { + withRng := len(partRanges) > 0 && a.rng != nil + + for i := range partIDs { + var r *objectSDK.Range + if withRng { + r = &partRanges[i] + } + + child, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild) + if err != nil { + return err + } + + if err := writer.WriteChunk(ctx, child.Payload()); err != nil { + return err + } + } + return nil +} + +func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer objectWriter, prevID oid.ID) error { + chain, rngs, err := a.buildChain(ctx, prevID) + if err != nil { + return err + } + + reverseRngs := len(rngs) > 0 + + for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 { + chain[left], chain[right] = chain[right], chain[left] + + if reverseRngs { + rngs[left], rngs[right] = rngs[right], rngs[left] + } + } + + return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false) +} + +func (a *assembler) isChild(obj *objectSDK.Object) bool { + parent := obj.Parent() + return parent == nil || equalAddresses(a.addr, object.AddressOf(parent)) +} + +func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) { + var ( + chain []oid.ID + rngs []objectSDK.Range + from = a.rng.GetOffset() + to = from + a.rng.GetLength() + + hasPrev = true + ) + + // fill the chain end-to-start + for hasPrev { + // check that only for "range" requests, + // for `GET` it stops via the false `withPrev` + if a.rng != nil && a.currentOffset <= from { + break + } + + head, err := a.objGetter.HeadObject(ctx, prevID) + if err != nil { + return nil, nil, err + } + if !a.isChild(head) { + return nil, nil, errParentAddressDiffers + } + + if a.rng != nil { + sz := head.PayloadSize() + + a.currentOffset -= sz + + if a.currentOffset < to { + off := uint64(0) + if from > a.currentOffset { + off = from - a.currentOffset + sz -= from - a.currentOffset + } + + if to < a.currentOffset+off+sz { + sz = to - off - a.currentOffset + } + + index := len(rngs) + rngs = append(rngs, objectSDK.Range{}) + rngs[index].SetOffset(off) + rngs[index].SetLength(sz) + + id, _ := head.ID() + chain = append(chain, id) + } + } else { + id, _ := head.ID() + chain = append(chain, id) + } + + prevID, hasPrev = head.PreviousID() + } + + return chain, rngs, nil +}