[#1186] engine: Read object directly from blobstor in case of conflicts

Metabase is expected to contain actual information about objects stored
in shard. If the object is present in metabase but is missing from
blobstor, peform an additional attempt to fetch it directly without
consulting metabase. Such a situation is unexpected, so error counter
is increased for the shard which has the object in the metabase. We
don't increase error counter for the shard which has the object in
blobstor, because some garbage can be expected there. In this
implementation there is no overhead for objects which are really
missing, i.e. are not present in any metabase.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-02-22 10:20:33 +03:00 committed by Alex Vanin
parent 6e6f3648d2
commit 69e1e6ca20
4 changed files with 151 additions and 48 deletions

View file

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"testing" "testing"
"github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
@ -15,31 +16,34 @@ import (
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
) )
func TestErrorReporting(t *testing.T) { const errSmallSize = 256
const smallSize = 100
log := zaptest.NewLogger(t) func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { if dir == "" {
dir, err := os.MkdirTemp("", "*") var err error
dir, err = os.MkdirTemp("", "*")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { _ = os.RemoveAll(dir) }) t.Cleanup(func() { _ = os.RemoveAll(dir) })
}
e := New( e := New(
WithLogger(log), WithLogger(zaptest.NewLogger(t)),
WithShardPoolSize(1), WithShardPoolSize(1),
WithErrorThreshold(errThreshold)) WithErrorThreshold(errThreshold))
var ids [2]*shard.ID var ids [2]*shard.ID
var err error
for i := range ids { for i := range ids {
ids[i], err = e.AddShard( ids[i], err = e.AddShard(
shard.WithLogger(log), shard.WithLogger(zaptest.NewLogger(t)),
shard.WithBlobStorOptions( shard.WithBlobStorOptions(
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
blobstor.WithShallowDepth(1), blobstor.WithShallowDepth(1),
blobstor.WithBlobovniczaShallowWidth(1), blobstor.WithBlobovniczaShallowWidth(1),
blobstor.WithBlobovniczaShallowDepth(1), blobstor.WithBlobovniczaShallowDepth(1),
blobstor.WithSmallSizeLimit(100), blobstor.WithSmallSizeLimit(errSmallSize),
blobstor.WithRootPerm(0700)), blobstor.WithRootPerm(0700)),
shard.WithMetaBaseOptions( shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
@ -52,11 +56,12 @@ func TestErrorReporting(t *testing.T) {
return e, dir, ids return e, dir, ids
} }
func TestErrorReporting(t *testing.T) {
t.Run("ignore errors by default", func(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) {
e, dir, id := newEngine(t, 0) e, dir, id := newEngineWithErrorThreshold(t, "", 0)
obj := generateRawObjectWithCID(t, cidtest.ID()) obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, smallSize)) obj.SetPayload(make([]byte, errSmallSize))
prm := new(shard.PutPrm).WithObject(obj.Object()) prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock() e.mtx.RLock()
@ -82,10 +87,10 @@ func TestErrorReporting(t *testing.T) {
t.Run("with error threshold", func(t *testing.T) { t.Run("with error threshold", func(t *testing.T) {
const errThreshold = 3 const errThreshold = 3
e, dir, id := newEngine(t, errThreshold) e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold)
obj := generateRawObjectWithCID(t, cidtest.ID()) obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, smallSize)) obj.SetPayload(make([]byte, errSmallSize))
prm := new(shard.PutPrm).WithObject(obj.Object()) prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock() e.mtx.RLock()
@ -123,6 +128,54 @@ func TestErrorReporting(t *testing.T) {
}) })
} }
// Issue #1186.
func TestBlobstorFailback(t *testing.T) {
dir, err := os.MkdirTemp("", "*")
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
objs := make([]*object.Object, 0, 2)
for _, size := range []int{1, errSmallSize + 1} {
obj := generateRawObjectWithCID(t, cidtest.ID())
obj.SetPayload(make([]byte, size))
prm := new(shard.PutPrm).WithObject(obj.Object())
e.mtx.RLock()
_, err = e.shards[id[0].String()].Shard.Put(prm)
e.mtx.RUnlock()
require.NoError(t, err)
objs = append(objs, obj.Object())
}
for i := range objs {
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
}
checkShardState(t, e, id[0], 0, shard.ModeReadWrite)
require.NoError(t, e.Close())
p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.RootPath
p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.RootPath
tmp := filepath.Join(dir, "tmp")
require.NoError(t, os.Rename(p1, tmp))
require.NoError(t, os.Rename(p2, p1))
require.NoError(t, os.Rename(tmp, p2))
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
for i := range objs {
actual, err := e.Get(&GetPrm{addr: objs[i].Address()})
require.NoError(t, err)
require.Equal(t, objs[i], actual.Object())
}
checkShardState(t, e, id[0], 2, shard.ModeReadOnly)
checkShardState(t, e, id[1], 0, shard.ModeReadWrite)
}
func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) { func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) {
e.mtx.RLock() e.mtx.RLock()
sh := e.shards[id.String()] sh := e.shards[id.String()]

View file

@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
"go.uber.org/zap"
) )
// GetPrm groups the parameters of Get operation. // GetPrm groups the parameters of Get operation.
@ -64,6 +65,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
outSI *objectSDK.SplitInfo outSI *objectSDK.SplitInfo
outError = object.ErrNotFound outError = object.ErrNotFound
shardWithMeta hashedShard
metaError error
) )
shPrm := new(shard.GetPrm). shPrm := new(shard.GetPrm).
@ -72,6 +76,10 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Get(shPrm) res, err := sh.Get(shPrm)
if err != nil { if err != nil {
if res.HasMeta() {
shardWithMeta = sh
metaError = err
}
switch { switch {
case errors.Is(err, object.ErrNotFound): case errors.Is(err, object.ErrNotFound):
return false // ignore, go to next shard return false // ignore, go to next shard
@ -110,9 +118,27 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
} }
if obj == nil { if obj == nil {
if shardWithMeta.Shard == nil || !errors.Is(outError, object.ErrNotFound) {
return nil, outError return nil, outError
} }
// If the object is not found but is present in metabase,
// try to fetch it from blobstor directly. If it is found in any
// blobstor, increase the error counter for the shard which contains the meta.
shPrm = shPrm.WithIgnoreMeta(true)
e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) {
res, err := sh.Get(shPrm)
obj = res.Object()
return err == nil
})
if obj == nil {
return nil, outError
}
e.reportShardError(shardWithMeta, "meta info was present, but object is missing",
metaError, zap.Stringer("address", prm.addr))
}
return &GetRes{ return &GetRes{
obj: obj, obj: obj,
}, nil }, nil

View file

@ -19,11 +19,13 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob
// GetPrm groups the parameters of Get operation. // GetPrm groups the parameters of Get operation.
type GetPrm struct { type GetPrm struct {
addr *addressSDK.Address addr *addressSDK.Address
skipMeta bool
} }
// GetRes groups resulting values of Get operation. // GetRes groups resulting values of Get operation.
type GetRes struct { type GetRes struct {
obj *object.Object obj *object.Object
hasMeta bool
} }
// WithAddress is a Get option to set the address of the requested object. // WithAddress is a Get option to set the address of the requested object.
@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *addressSDK.Address) *GetPrm {
return p return p
} }
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
// without accessing metabase.
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
p.skipMeta = ignore
return p
}
// Object returns the requested object. // Object returns the requested object.
func (r *GetRes) Object() *object.Object { func (r *GetRes) Object() *object.Object {
return r.obj return r.obj
} }
// HasMeta returns true if info about the object was found in the metabase.
func (r *GetRes) HasMeta() bool {
return r.hasMeta
}
// Get reads an object from shard. // Get reads an object from shard.
// //
// Returns any error encountered that // Returns any error encountered that
@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
return res.Object(), nil return res.Object(), nil
} }
obj, err := s.fetchObjectData(prm.addr, big, small) obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
return &GetRes{ return &GetRes{
obj: obj, obj: obj,
hasMeta: hasMeta,
}, err }, err
} }
// fetchObjectData looks through writeCache and blobStor to find object. // fetchObjectData looks through writeCache and blobStor to find object.
func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher) (*object.Object, error) { func (s *Shard) fetchObjectData(addr *addressSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) {
var ( var (
err error err error
res *object.Object res *object.Object
@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher
if s.hasWriteCache() { if s.hasWriteCache() {
res, err = s.writeCache.Get(addr) res, err = s.writeCache.Get(addr)
if err == nil { if err == nil {
return res, nil return res, false, nil
} }
if errors.Is(err, object.ErrNotFound) { if errors.Is(err, object.ErrNotFound) {
@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher
} }
} }
if skipMeta {
res, err = small(s.blobStor, nil)
if err == nil {
return res, false, err
}
res, err = big(s.blobStor, nil)
return res, false, err
}
exists, err := meta.Exists(s.metaBase, addr) exists, err := meta.Exists(s.metaBase, addr)
if err != nil { if err != nil {
return nil, err return nil, false, err
} }
if !exists { if !exists {
return nil, object.ErrNotFound return nil, false, object.ErrNotFound
} }
blobovniczaID, err := meta.IsSmall(s.metaBase, addr) blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
if err != nil { if err != nil {
return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
} }
if blobovniczaID != nil { if blobovniczaID != nil {
@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher
res, err = big(s.blobStor, nil) res, err = big(s.blobStor, nil)
} }
return res, err return res, true, err
} }

View file

@ -95,7 +95,7 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
return obj.Object(), nil return obj.Object(), nil
} }
obj, err := s.fetchObjectData(prm.addr, big, small) obj, _, err := s.fetchObjectData(prm.addr, false, big, small)
return &RngRes{ return &RngRes{
obj: obj, obj: obj,