diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index dfe5e48a..6857a363 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -127,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) return true }) + if head != nil { + return HeadRes{head: head}, nil + } if outSI != nil { return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) - } else if outEI != nil { - return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI)) - } else if head == nil { - return HeadRes{}, outError } - - return HeadRes{ - head: head, - }, nil + if outEI != nil { + return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI)) + } + return HeadRes{}, outError } // Head reads object header from local storage by provided address. diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d9acd4ce..b79f6cb1 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) } - // pick last item, for now there is not difference which address to pick - // but later list might be sorted so first or last value can be more - // prioritized to choose - virtualOID := relativeLst[len(relativeLst)-1] - data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) + var data []byte + for i := 0; i < len(relativeLst) && len(data) == 0; i++ { + virtualOID := relativeLst[len(relativeLst)-i-1] + data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) + } + + if len(data) == 0 { + // check if any of the relatives is an EC object + for _, relative := range relativeLst { + data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative) + if len(data) > 0 { + // we can't return object headers, but can return error, + // so assembler can try to assemble complex object + return nil, getSplitInfoError(tx, cnr, key) + } + } + } child := objectSDK.New() diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index ba6fddec..9f17f1e4 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -12,7 +12,7 @@ import ( ) func (r *request) assemble(ctx context.Context) { - if !r.canAssemble() { + if !r.canAssembleComplexObject() { r.log.Debug(logs.GetCanNotAssembleTheObject) return } @@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) { r.log.Debug(logs.GetTryingToAssembleTheObject) r.prm.common = r.prm.common.WithLocalOnly(false) - assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) + assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly()) r.log.Debug(logs.GetAssemblingSplittedObject, zap.Uint64("range_offset", r.ctxRange().GetOffset()), diff --git a/pkg/services/object/get/assembler.go b/pkg/services/object/get/assembler.go index 025296ec..ff3f90bf 100644 --- a/pkg/services/object/get/assembler.go +++ b/pkg/services/object/get/assembler.go @@ -19,6 +19,7 @@ type assembler struct { splitInfo *objectSDK.SplitInfo rng *objectSDK.Range objGetter objectGetter + head bool currentOffset uint64 @@ -30,18 +31,23 @@ func newAssembler( splitInfo *objectSDK.SplitInfo, rng *objectSDK.Range, objGetter objectGetter, + head bool, ) *assembler { return &assembler{ addr: addr, rng: rng, splitInfo: splitInfo, objGetter: objGetter, + head: head, } } // Assemble assembles splitted large object and writes it's content to ObjectWriter. // It returns parent object. func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { + if a.head { + return a.assembleHeader(ctx, writer) + } sourceObjectID, ok := a.getLastPartOrLinkObjectID() if !ok { return nil, objectSDK.NewSplitInfoError(a.splitInfo) @@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS return a.parentObject, nil } +func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { + var sourceObjectIDs []oid.ID + sourceObjectID, ok := a.splitInfo.Link() + if ok { + sourceObjectIDs = append(sourceObjectIDs, sourceObjectID) + } + sourceObjectID, ok = a.splitInfo.LastPart() + if ok { + sourceObjectIDs = append(sourceObjectIDs, sourceObjectID) + } + if len(sourceObjectIDs) == 0 { + return nil, objectSDK.NewSplitInfoError(a.splitInfo) + } + for _, sourceObjectID = range sourceObjectIDs { + obj, err := a.getParent(ctx, sourceObjectID, writer) + if err == nil { + return obj, nil + } + } + return nil, objectSDK.NewSplitInfoError(a.splitInfo) +} + +func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) { + obj, err := a.objGetter.HeadObject(ctx, sourceObjectID) + if err != nil { + return nil, err + } + parent := obj.Parent() + if parent == nil { + return nil, objectSDK.NewSplitInfoError(a.splitInfo) + } + if err := writer.WriteHeader(ctx, parent); err != nil { + return nil, err + } + return obj, nil +} + func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { sourceObjectID, ok := a.splitInfo.Link() if ok { diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 29a15ba7..1fc6b7b2 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) { t.Run("VIRTUAL", func(t *testing.T) { testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) { - headPrm := newHeadPrm(false, nil) + headPrm := newHeadPrm(true, nil) headPrm.WithAddress(addr) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index 9ddfeddf..1a7a43a3 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) { return r.keyStore.GetKey(sessionInfo) } -func (r *request) canAssemble() bool { - return !r.isRaw() && !r.headOnly() +func (r *request) canAssembleComplexObject() bool { + return !r.isRaw() } func (r *request) splitInfo() *objectSDK.SplitInfo {