forked from TrueCloudLab/frostfs-node
[#1186] engine: Allow to skip metabase in GetRange
Similarly to `Get`. Also fix a bug where `ErrNotFound` is returned
instead of `ErrRangeOutOfBounds`.
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 1fe9cd4d36
)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
48b5d2cb91
commit
2977552a19
6 changed files with 66 additions and 9 deletions
|
@ -325,12 +325,16 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
|||
|
||||
res, err = b.getRangeFromLevel(prm, p, !ok)
|
||||
if err != nil {
|
||||
if !errors.Is(err, object.ErrNotFound) {
|
||||
outOfBounds := errors.Is(err, object.ErrRangeOutOfBounds)
|
||||
if !errors.Is(err, object.ErrNotFound) && !outOfBounds {
|
||||
b.log.Debug("could not get object from level",
|
||||
zap.String("level", p),
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
if outOfBounds {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
|
||||
activeCache[dirPath] = struct{}{}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package blobstor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
)
|
||||
|
||||
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -64,7 +65,7 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
objs := make([]*object.Object, 0, 2)
|
||||
for _, size := range []int{1, errSmallSize + 1} {
|
||||
for _, size := range []int{15, errSmallSize + 1} {
|
||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, size))
|
||||
|
||||
|
@ -79,6 +80,8 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
for i := range objs {
|
||||
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
_, err = e.GetRange(&RngPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, e.Close())
|
||||
|
@ -93,8 +96,15 @@ func TestBlobstorFailback(t *testing.T) {
|
|||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
for i := range objs {
|
||||
actual, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
getRes, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], actual.Object())
|
||||
require.Equal(t, objs[i], getRes.Object())
|
||||
|
||||
rngRes, err := e.GetRange(&RngPrm{addr: objs[i].Address(), off: 1, ln: 10})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||
|
||||
_, err = e.GetRange(&RngPrm{addr: objs[i].Address(), off: errSmallSize + 10, ln: 1})
|
||||
require.True(t, errors.Is(err, object.ErrRangeOutOfBounds), "got: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,8 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
|
||||
outSI *objectSDK.SplitInfo
|
||||
outError = object.ErrNotFound
|
||||
|
||||
shardWithMeta hashedShard
|
||||
)
|
||||
|
||||
shPrm := new(shard.RngPrm).
|
||||
|
@ -91,6 +93,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
if err != nil {
|
||||
if res.HasMeta() {
|
||||
shardWithMeta = hashedShard{sh: sh}
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, object.ErrNotFound):
|
||||
return false // ignore, go to next shard
|
||||
|
@ -137,9 +142,29 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
|||
}
|
||||
|
||||
if obj == nil {
|
||||
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||
return nil, outError
|
||||
}
|
||||
|
||||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.GetRange(shPrm)
|
||||
if errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||
outError = object.ErrRangeOutOfBounds
|
||||
return true
|
||||
}
|
||||
obj = res.Object()
|
||||
return err == nil
|
||||
})
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
}
|
||||
}
|
||||
|
||||
return &RngRes{
|
||||
obj: obj,
|
||||
}, nil
|
||||
|
|
|
@ -120,7 +120,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, sma
|
|||
|
||||
if skipMeta {
|
||||
res, err = small(s.blobStor, nil)
|
||||
if err == nil {
|
||||
if err == nil || errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||
return res, false, err
|
||||
}
|
||||
res, err = big(s.blobStor, nil)
|
||||
|
|
|
@ -14,11 +14,14 @@ type RngPrm struct {
|
|||
off uint64
|
||||
|
||||
addr *objectSDK.Address
|
||||
|
||||
skipMeta bool
|
||||
}
|
||||
|
||||
// RngRes groups resulting values of GetRange operation.
|
||||
type RngRes struct {
|
||||
obj *object.Object
|
||||
hasMeta bool
|
||||
}
|
||||
|
||||
// WithAddress is a Rng option to set the address of the requested object.
|
||||
|
@ -41,6 +44,13 @@ func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object part.
|
||||
//
|
||||
// Instance payload contains the requested range of the original object.
|
||||
|
@ -48,6 +58,11 @@ func (r *RngRes) Object() *object.Object {
|
|||
return r.obj
|
||||
}
|
||||
|
||||
// HasMeta returns true if info about the object was found in the metabase.
|
||||
func (r *RngRes) HasMeta() bool {
|
||||
return r.hasMeta
|
||||
}
|
||||
|
||||
// GetRange reads part of an object from shard.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -94,9 +109,10 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
|||
return obj.Object(), nil
|
||||
}
|
||||
|
||||
obj, _, err := s.fetchObjectData(prm.addr, false, big, small)
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &RngRes{
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue