Assemble complex object headers if linking object is deleted on EC containers #1295
|
@ -127,17 +127,16 @@ func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error)
|
|||
return true
|
||||
})
|
||||
|
||||
if head != nil {
|
||||
|
||||
return HeadRes{head: head}, nil
|
||||
}
|
||||
if outSI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI))
|
||||
} else if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
} else if head == nil {
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
return HeadRes{
|
||||
head: head,
|
||||
}, nil
|
||||
if outEI != nil {
|
||||
return HeadRes{}, logicerr.Wrap(objectSDK.NewECInfoError(outEI))
|
||||
}
|
||||
return HeadRes{}, outError
|
||||
}
|
||||
|
||||
// Head reads object header from local storage by provided address.
|
||||
|
|
|
@ -160,11 +160,23 @@ func getVirtualObject(tx *bbolt.Tx, cnr cid.ID, key []byte, raw bool) (*objectSD
|
|||
return nil, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||
}
|
||||
|
||||
// pick last item, for now there is not difference which address to pick
|
||||
// but later list might be sorted so first or last value can be more
|
||||
// prioritized to choose
|
||||
virtualOID := relativeLst[len(relativeLst)-1]
|
||||
data := getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||
var data []byte
|
||||
for i := 0; i < len(relativeLst) && len(data) == 0; i++ {
|
||||
virtualOID := relativeLst[len(relativeLst)-i-1]
|
||||
data = getFromBucket(tx, primaryBucketName(cnr, bucketName), virtualOID)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
// check if any of the relatives is an EC object
|
||||
for _, relative := range relativeLst {
|
||||
data = getFromBucket(tx, ecInfoBucketName(cnr, bucketName), relative)
|
||||
if len(data) > 0 {
|
||||
// we can't return object headers, but can return error,
|
||||
// so assembler can try to assemble complex object
|
||||
return nil, getSplitInfoError(tx, cnr, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
child := objectSDK.New()
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
)
|
||||
|
||||
func (r *request) assemble(ctx context.Context) {
|
||||
if !r.canAssemble() {
|
||||
if !r.canAssembleComplexObject() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (r *request) assemble(ctx context.Context) {
|
|||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r, r.headOnly())
|
||||
|
||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
|
|
@ -19,6 +19,7 @@ type assembler struct {
|
|||
splitInfo *objectSDK.SplitInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
head bool
|
||||
|
||||
currentOffset uint64
|
||||
|
||||
|
@ -30,18 +31,23 @@ func newAssembler(
|
|||
splitInfo *objectSDK.SplitInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
head bool,
|
||||
) *assembler {
|
||||
return &assembler{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
splitInfo: splitInfo,
|
||||
objGetter: objGetter,
|
||||
head: head,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
if a.head {
|
||||
return a.assembleHeader(ctx, writer)
|
||||
}
|
||||
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
||||
if !ok {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
|
@ -65,6 +71,43 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
|||
return a.parentObject, nil
|
||||
}
|
||||
|
||||
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
var sourceObjectIDs []oid.ID
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||
fyrchik
commented
It seems to be completely independent of other It seems to be completely independent of other `assembler` methods, do we have a reason to provide `headOnly` as a parameter to the `assembler` instead of writing a separate function?
dstepanov-yadro
commented
Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects. Good point. Now this method looks more related to assembling: it tries to find parent headers from linking and last part objects.
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
if len(sourceObjectIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
for _, sourceObjectID = range sourceObjectIDs {
|
||||
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||
if err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
|
||||
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parent := obj.Parent()
|
||||
if parent == nil {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||
return nil, err
|
||||
fyrchik
commented
We return We return `false`, then create an error and will log again. Why is this `Warn`?
dstepanov-yadro
commented
This This `Warn` explains why `Assemble` returns error.
fyrchik
commented
I don't like the assembler being able to log anything, as it will be eventually moved to the SDK. I don't like the assembler being able to log anything, as it will be eventually moved to the SDK.
dstepanov-yadro
commented
1. But now assembler is here. Logger could be replaced with an interface.
2. I think it is useful for troubleshooting.
fyrchik
commented
It only adds complexity (by producing side-effects), even though we can return everything in the result. It only adds complexity (by producing side-effects), even though we can return everything in the result.
The result in turn is what we log and can use for troubleshooting.
dstepanov-yadro
commented
I disagree, but fixed. I disagree, but fixed.
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
|
|
|
@ -730,7 +730,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
|
||||
t.Run("VIRTUAL", func(t *testing.T) {
|
||||
testHeadVirtual := func(svc *Service, addr oid.Address, i *objectSDK.SplitInfo) {
|
||||
headPrm := newHeadPrm(false, nil)
|
||||
dstepanov-yadro
commented
For For `raw == false` cases it is expected to return object headers, but not split info error.
|
||||
headPrm := newHeadPrm(true, nil)
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
I would expect the tests to stay the same, what is the reason? I would expect the tests to stay the same, what is the reason?
dstepanov-yadro
commented
For For `raw == false` cases it is expected to return object headers, but not split info error, as test does. So this test looks loke something unrelated to real code and works just because of mocks.
|
||||
headPrm.WithAddress(addr)
|
||||
|
||||
errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo())
|
||||
|
|
|
@ -88,8 +88,8 @@ func (r *request) key() (*ecdsa.PrivateKey, error) {
|
|||
return r.keyStore.GetKey(sessionInfo)
|
||||
}
|
||||
|
||||
func (r *request) canAssemble() bool {
|
||||
return !r.isRaw() && !r.headOnly()
|
||||
func (r *request) canAssembleComplexObject() bool {
|
||||
dstepanov-yadro
commented
To make method name more clear. To make method name more clear.
|
||||
return !r.isRaw()
|
||||
}
|
||||
|
||||
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||
|
|
Reason to change priority: if some shard returns split info error, but other shard has linking object, then return head result directly.