Assemble complex object headers if linking object is deleted on EC containers support v0.42 #1299

Merged
6 changed files with 72 additions and 18 deletions
Showing only changes of commit 54862250b2 - Show all commits

View file

@ -127,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
return true return true
}) })
if head != nil {
return HeadRes{head: head}, nil
}
if outSI != nil { if outSI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
} else if outEI != nil {
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
} else if head == nil {
return HeadRes{}, outError
} }
if outEI != nil {
return HeadRes{ return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
head: head, }
}, nil return HeadRes{}, outError
} }
// Head reads object header from local storage by provided address. // Head reads object header from local storage by provided address.

View file

@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound)) return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
} }
// pick last item, for now there is not difference which address to pick var data []byte
// but later list might be sorted so first or last value can be more for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
// prioritized to choose virtualOID := relativeLst[len(relativeLst)-i-1]
virtualOID := relativeLst[len(relativeLst)-1] data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID) }
if len(data) == 0 {
// check if any of the relatives is an EC object
for _, relative := range relativeLst {
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
if len(data) > 0 {
// we can't return object headers, but can return error,
// so assembler can try to assemble complex object
return nil, getSplitInfoError(tx, cnr, key)
}
}
}
child := objectSDK.New() child := objectSDK.New()

View file

@ -12,7 +12,7 @@ import (
) )
func (r *request) assemble(ctx context.Context) { func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() { if !r.canAssembleComplexObject() {
r.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheObject) r.log.Debug(logs.GetTryingToAssembleTheObject)
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r) assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
r.log.Debug(logs.GetAssemblingSplittedObject, r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),

View file

@ -19,6 +19,7 @@ type assembler struct {
splitInfo *objectSDK.SplitInfo splitInfo *objectSDK.SplitInfo
rng *objectSDK.Range rng *objectSDK.Range
objGetter objectGetter objGetter objectGetter
head bool
currentOffset uint64 currentOffset uint64
@ -30,18 +31,23 @@ func newAssembler(
splitInfo *objectSDK.SplitInfo, splitInfo *objectSDK.SplitInfo,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, objGetter objectGetter,
head bool,
) *assembler { ) *assembler {
return &assembler{ return &assembler{
addr: addr, addr: addr,
rng: rng, rng: rng,
splitInfo: splitInfo, splitInfo: splitInfo,
objGetter: objGetter, objGetter: objGetter,
head: head,
} }
} }
// Assemble assembles splitted large object and writes it's content to ObjectWriter. // Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.assembleHeader(ctx, writer)
}
sourceObjectID, ok := a.getLastPartOrLinkObjectID() sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok { if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo) return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
return a.parentObject, nil return a.parentObject, nil
} }
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err
}
return obj, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) { func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
sourceObjectID, ok := a.splitInfo.Link() sourceObjectID, ok := a.splitInfo.Link()
if ok { if ok {

View file

@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
t.Run("VIRTUAL", func(t *testing.T) { t.Run("VIRTUAL", func(t *testing.T) {
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) { testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
headPrm := newHeadPrm(false, nil) headPrm := newHeadPrm(true, nil)
headPrm.WithAddress(addr) headPrm.WithAddress(addr)
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())

View file

@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
return r.keyStore.GetKey(sessionInfo) return r.keyStore.GetKey(sessionInfo)
} }
func (r *request) canAssemble() bool { func (r *request) canAssembleComplexObject() bool {
return !r.isRaw() && !r.headOnly() return !r.isRaw()
} }
func (r *request) splitInfo() *objectSDK.SplitInfo { func (r *request) splitInfo() *objectSDK.SplitInfo {