forked from TrueCloudLab/frostfs-node
[#1186] engine: Allow to skip metabase in GetRange
Similarly to `Get`. Also fix a bug where `ErrNotFound` is returned instead of `ErrRangeOutOfBounds`. Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
2b5550ccf6
commit
1fe9cd4d36
6 changed files with 72 additions and 10 deletions
|
@ -321,12 +321,16 @@ func (b *blobovniczas) getRange(prm *GetRangeSmallPrm) (res *GetRangeSmallRes, e
|
||||||
|
|
||||||
res, err = b.getRangeFromLevel(prm, p, !ok)
|
res, err = b.getRangeFromLevel(prm, p, !ok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, object.ErrNotFound) {
|
outOfBounds := errors.Is(err, object.ErrRangeOutOfBounds)
|
||||||
|
if !errors.Is(err, object.ErrNotFound) && !outOfBounds {
|
||||||
b.log.Debug("could not get object from level",
|
b.log.Debug("could not get object from level",
|
||||||
zap.String("level", p),
|
zap.String("level", p),
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
if outOfBounds {
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
activeCache[dirPath] = struct{}{}
|
activeCache[dirPath] = struct{}{}
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
package blobstor
|
package blobstor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
// GetRangeBigPrm groups the parameters of GetRangeBig operation.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package engine
|
package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -137,7 +138,7 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
objs := make([]*object.Object, 0, 2)
|
objs := make([]*object.Object, 0, 2)
|
||||||
for _, size := range []int{1, errSmallSize + 1} {
|
for _, size := range []int{15, errSmallSize + 1} {
|
||||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||||
obj.SetPayload(make([]byte, size))
|
obj.SetPayload(make([]byte, size))
|
||||||
|
|
||||||
|
@ -152,6 +153,8 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
_, err = e.GetRange(&RngPrm{addr: objs[i].Address()})
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
|
||||||
|
@ -167,12 +170,19 @@ func TestBlobstorFailback(t *testing.T) {
|
||||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||||
|
|
||||||
for i := range objs {
|
for i := range objs {
|
||||||
actual, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
getRes, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, objs[i], actual.Object())
|
require.Equal(t, objs[i], getRes.Object())
|
||||||
|
|
||||||
|
rngRes, err := e.GetRange(&RngPrm{addr: objs[i].Address(), off: 1, ln: 10})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload())
|
||||||
|
|
||||||
|
_, err = e.GetRange(&RngPrm{addr: objs[i].Address(), off: errSmallSize + 10, ln: 1})
|
||||||
|
require.True(t, errors.Is(err, object.ErrRangeOutOfBounds), "got: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
checkShardState(t, e, id[0], 2, shard.ModeReadOnly)
|
checkShardState(t, e, id[0], 4, shard.ModeReadOnly)
|
||||||
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RngPrm groups the parameters of GetRange operation.
|
// RngPrm groups the parameters of GetRange operation.
|
||||||
|
@ -82,6 +83,9 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
|
|
||||||
outSI *objectSDK.SplitInfo
|
outSI *objectSDK.SplitInfo
|
||||||
outError = object.ErrNotFound
|
outError = object.ErrNotFound
|
||||||
|
|
||||||
|
shardWithMeta hashedShard
|
||||||
|
metaError error
|
||||||
)
|
)
|
||||||
|
|
||||||
shPrm := new(shard.RngPrm).
|
shPrm := new(shard.RngPrm).
|
||||||
|
@ -91,6 +95,10 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
res, err := sh.GetRange(shPrm)
|
res, err := sh.GetRange(shPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if res.HasMeta() {
|
||||||
|
shardWithMeta = sh
|
||||||
|
metaError = err
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, object.ErrNotFound):
|
case errors.Is(err, object.ErrNotFound):
|
||||||
return false // ignore, go to next shard
|
return false // ignore, go to next shard
|
||||||
|
@ -131,9 +139,31 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
|
if shardWithMeta.Shard == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||||
return nil, outError
|
return nil, outError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the object is not found but is present in metabase,
|
||||||
|
// try to fetch it from blobstor directly. If it is found in any
|
||||||
|
// blobstor, increase the error counter for the shard which contains the meta.
|
||||||
|
shPrm = shPrm.WithIgnoreMeta(true)
|
||||||
|
|
||||||
|
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
|
||||||
|
res, err := sh.GetRange(shPrm)
|
||||||
|
if errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||||
|
outError = object.ErrRangeOutOfBounds
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
obj = res.Object()
|
||||||
|
return err == nil
|
||||||
|
})
|
||||||
|
if obj == nil {
|
||||||
|
return nil, outError
|
||||||
|
}
|
||||||
|
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
|
||||||
|
metaError, zap.Stringer("address", prm.addr))
|
||||||
|
}
|
||||||
|
|
||||||
return &RngRes{
|
return &RngRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
}, nil
|
}, nil
|
||||||
|
|
|
@ -120,7 +120,7 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, skipMeta bool, big, sm
|
||||||
|
|
||||||
if skipMeta {
|
if skipMeta {
|
||||||
res, err = small(s.blobStor, nil)
|
res, err = small(s.blobStor, nil)
|
||||||
if err == nil {
|
if err == nil || errors.Is(err, object.ErrRangeOutOfBounds) {
|
||||||
return res, false, err
|
return res, false, err
|
||||||
}
|
}
|
||||||
res, err = big(s.blobStor, nil)
|
res, err = big(s.blobStor, nil)
|
||||||
|
|
|
@ -15,11 +15,14 @@ type RngPrm struct {
|
||||||
off uint64
|
off uint64
|
||||||
|
|
||||||
addr *addressSDK.Address
|
addr *addressSDK.Address
|
||||||
|
|
||||||
|
skipMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// RngRes groups resulting values of GetRange operation.
|
// RngRes groups resulting values of GetRange operation.
|
||||||
type RngRes struct {
|
type RngRes struct {
|
||||||
obj *object.Object
|
obj *object.Object
|
||||||
|
hasMeta bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithAddress is a Rng option to set the address of the requested object.
|
// WithAddress is a Rng option to set the address of the requested object.
|
||||||
|
@ -42,6 +45,13 @@ func (p *RngPrm) WithRange(off uint64, ln uint64) *RngPrm {
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||||
|
// without accessing metabase.
|
||||||
|
func (p *RngPrm) WithIgnoreMeta(ignore bool) *RngPrm {
|
||||||
|
p.skipMeta = ignore
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
// Object returns the requested object part.
|
// Object returns the requested object part.
|
||||||
//
|
//
|
||||||
// Instance payload contains the requested range of the original object.
|
// Instance payload contains the requested range of the original object.
|
||||||
|
@ -49,6 +59,11 @@ func (r *RngRes) Object() *object.Object {
|
||||||
return r.obj
|
return r.obj
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasMeta returns true if info about the object was found in the metabase.
|
||||||
|
func (r *RngRes) HasMeta() bool {
|
||||||
|
return r.hasMeta
|
||||||
|
}
|
||||||
|
|
||||||
// GetRange reads part of an object from shard.
|
// GetRange reads part of an object from shard.
|
||||||
//
|
//
|
||||||
// Returns any error encountered that
|
// Returns any error encountered that
|
||||||
|
@ -95,9 +110,10 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
||||||
return obj.Object(), nil
|
return obj.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, _, err := s.fetchObjectData(prm.addr, false, big, small)
|
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||||
|
|
||||||
return &RngRes{
|
return &RngRes{
|
||||||
obj: obj,
|
obj: obj,
|
||||||
|
hasMeta: hasMeta,
|
||||||
}, err
|
}, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue