[#477] engine: Iterate over all shards to return complete SplitInfoError

Different SplitInfo parts may be stored in different shards. Storage
engine must not stop at first SplitInfoError and should make
best effort to complete SplitInfo structure if needed.

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-04-14 11:58:14 +03:00 committed by Alex Vanin
parent e020fe5597
commit 7547592ce3
2 changed files with 25 additions and 6 deletions

View file

@ -3,6 +3,7 @@ package engine
import (
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -64,6 +65,7 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) {
head *object.Object
siErr *objectSDK.SplitInfoError
outSI *objectSDK.SplitInfo
outError = object.ErrNotFound
)
@ -77,9 +79,22 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) {
switch {
case errors.Is(err, object.ErrNotFound):
return false // ignore, go to next shard
case
errors.Is(err, object.ErrAlreadyRemoved),
errors.As(err, &siErr):
case errors.As(err, &siErr):
siErr = err.(*objectSDK.SplitInfoError)
if outSI == nil {
outSI = objectSDK.NewSplitInfo()
}
meta.MergeSplitInfo(siErr.SplitInfo(), outSI)
// stop iterating over shards if SplitInfo structure is complete
if outSI.Link() != nil && outSI.LastPart() != nil {
return true
}
return false
case errors.Is(err, object.ErrAlreadyRemoved):
outError = err
return true // stop, return it back
@ -100,6 +115,10 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) {
return true
})
if outSI != nil {
return nil, objectSDK.NewSplitInfoError(outSI)
}
if head == nil {
return nil, outError
}

View file

@ -416,7 +416,7 @@ func updateSplitInfo(tx *bbolt.Tx, addr *objectSDK.Address, from *objectSDK.Spli
return fmt.Errorf("can't unmarshal split info from root index: %w", err)
}
result := mergeSplitInfo(from, to)
result := MergeSplitInfo(from, to)
rawSplitInfo, err = result.Marshal()
if err != nil {
@ -448,9 +448,9 @@ func splitInfoFromObject(obj *object.Object) (*objectSDK.SplitInfo, error) {
return info, nil
}
// mergeSplitInfo ignores conflicts and rewrites `to` with non empty values
// MergeSplitInfo ignores conflicts and rewrites `to` with non empty values
// from `from`.
func mergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo {
func MergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo {
to.SetSplitID(from.SplitID()) // overwrite SplitID and ignore conflicts
if lp := from.LastPart(); lp != nil {