From 7547592ce3e274460efbcd85bbb3f42dc3d38ca7 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Wed, 14 Apr 2021 11:58:14 +0300 Subject: [PATCH] [#477] engine: Iterate over all shards to return complete SplitInfoError Different SplitInfo parts may be stored in different shards. Storage engine must not stop at first SplitInfoError and should make best effort to complete SplitInfo structure if needed. Signed-off-by: Alex Vanin --- pkg/local_object_storage/engine/head.go | 25 +++++++++++++++++++++--- pkg/local_object_storage/metabase/put.go | 6 +++--- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 4052cee5..f0386b00 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -3,6 +3,7 @@ package engine import ( objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-node/pkg/core/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/pkg/errors" "go.uber.org/zap" @@ -64,6 +65,7 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { head *object.Object siErr *objectSDK.SplitInfoError + outSI *objectSDK.SplitInfo outError = object.ErrNotFound ) @@ -77,9 +79,22 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { switch { case errors.Is(err, object.ErrNotFound): return false // ignore, go to next shard - case - errors.Is(err, object.ErrAlreadyRemoved), - errors.As(err, &siErr): + case errors.As(err, &siErr): + siErr = err.(*objectSDK.SplitInfoError) + + if outSI == nil { + outSI = objectSDK.NewSplitInfo() + } + + meta.MergeSplitInfo(siErr.SplitInfo(), outSI) + + // stop iterating over shards if SplitInfo structure is complete + if outSI.Link() != nil && outSI.LastPart() != nil { + return true + } + + return false + case errors.Is(err, object.ErrAlreadyRemoved): outError = err return true // stop, return it back @@ -100,6 +115,10 @@ func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { return true }) + if outSI != nil { + return nil, objectSDK.NewSplitInfoError(outSI) + } + if head == nil { return nil, outError } diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 2589c5cd..bfe6bcb6 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -416,7 +416,7 @@ func updateSplitInfo(tx *bbolt.Tx, addr *objectSDK.Address, from *objectSDK.Spli return fmt.Errorf("can't unmarshal split info from root index: %w", err) } - result := mergeSplitInfo(from, to) + result := MergeSplitInfo(from, to) rawSplitInfo, err = result.Marshal() if err != nil { @@ -448,9 +448,9 @@ func splitInfoFromObject(obj *object.Object) (*objectSDK.SplitInfo, error) { return info, nil } -// mergeSplitInfo ignores conflicts and rewrites `to` with non empty values +// MergeSplitInfo ignores conflicts and rewrites `to` with non empty values // from `from`. -func mergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo { +func MergeSplitInfo(from, to *objectSDK.SplitInfo) *objectSDK.SplitInfo { to.SetSplitID(from.SplitID()) // overwrite SplitID and ignore conflicts if lp := from.LastPart(); lp != nil {