forked from TrueCloudLab/frostfs-node
[#1186] engine: Read object directly from blobstor in case of conflicts
Metabase is expected to contain actual information about objects stored
in shard. If the object is present in metabase but is missing from
blobstor, peform an additional attempt to fetch it directly without
consulting metabase. Such a situation is unexpected, so error counter
is increased for the shard which has the object in the metabase. We
don't increase error counter for the shard which has the object in
blobstor, because some garbage can be expected there. In this
implementation there is no overhead for objects which are really
missing, i.e. are not present in any metabase.
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
(cherry picked from commit 69e1e6ca20
)
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
parent
ce169491ed
commit
b5bcf90fa1
4 changed files with 157 additions and 12 deletions
100
pkg/local_object_storage/engine/error_test.go
Normal file
100
pkg/local_object_storage/engine/error_test.go
Normal file
|
@ -0,0 +1,100 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
const errSmallSize = 256
|
||||
|
||||
func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) {
|
||||
if dir == "" {
|
||||
var err error
|
||||
|
||||
dir, err = os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = os.RemoveAll(dir) })
|
||||
}
|
||||
|
||||
e := New(
|
||||
WithLogger(zaptest.NewLogger(t)),
|
||||
WithShardPoolSize(1))
|
||||
|
||||
var ids [2]*shard.ID
|
||||
var err error
|
||||
|
||||
for i := range ids {
|
||||
ids[i], err = e.AddShard(
|
||||
shard.WithLogger(zaptest.NewLogger(t)),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))),
|
||||
blobstor.WithShallowDepth(1),
|
||||
blobstor.WithBlobovniczaShallowWidth(1),
|
||||
blobstor.WithBlobovniczaShallowDepth(1),
|
||||
blobstor.WithSmallSizeLimit(errSmallSize),
|
||||
blobstor.WithRootPerm(0700)),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))),
|
||||
meta.WithPermissions(0700)))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, e.Open())
|
||||
require.NoError(t, e.Init())
|
||||
|
||||
return e, dir, ids
|
||||
}
|
||||
|
||||
// Issue #1186.
|
||||
func TestBlobstorFailback(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "*")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) })
|
||||
|
||||
e, _, id := newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
objs := make([]*object.Object, 0, 2)
|
||||
for _, size := range []int{1, errSmallSize + 1} {
|
||||
obj := generateRawObjectWithCID(t, cidtest.ID())
|
||||
obj.SetPayload(make([]byte, size))
|
||||
|
||||
prm := new(shard.PutPrm).WithObject(obj.Object())
|
||||
e.mtx.RLock()
|
||||
_, err = e.shards[id[0].String()].Put(prm)
|
||||
e.mtx.RUnlock()
|
||||
require.NoError(t, err)
|
||||
objs = append(objs, obj.Object())
|
||||
}
|
||||
|
||||
for i := range objs {
|
||||
_, err = e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, e.Close())
|
||||
|
||||
p1 := e.shards[id[0].String()].DumpInfo().BlobStorInfo.RootPath
|
||||
p2 := e.shards[id[1].String()].DumpInfo().BlobStorInfo.RootPath
|
||||
tmp := filepath.Join(dir, "tmp")
|
||||
require.NoError(t, os.Rename(p1, tmp))
|
||||
require.NoError(t, os.Rename(p2, p1))
|
||||
require.NoError(t, os.Rename(tmp, p2))
|
||||
|
||||
e, _, id = newEngineWithErrorThreshold(t, dir, 1)
|
||||
|
||||
for i := range objs {
|
||||
actual, err := e.Get(&GetPrm{addr: objs[i].Address()})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, objs[i], actual.Object())
|
||||
}
|
||||
}
|
|
@ -64,6 +64,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
|
||||
outSI *objectSDK.SplitInfo
|
||||
outError = object.ErrNotFound
|
||||
|
||||
shardWithMeta hashedShard
|
||||
)
|
||||
|
||||
shPrm := new(shard.GetPrm).
|
||||
|
@ -72,6 +74,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
if err != nil {
|
||||
if res.HasMeta() {
|
||||
shardWithMeta = hashedShard{sh: sh}
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, object.ErrNotFound):
|
||||
return false // ignore, go to next shard
|
||||
|
@ -116,7 +121,23 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) {
|
|||
}
|
||||
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) {
|
||||
return nil, outError
|
||||
}
|
||||
|
||||
// If the object is not found but is present in metabase,
|
||||
// try to fetch it from blobstor directly. If it is found in any
|
||||
// blobstor, increase the error counter for the shard which contains the meta.
|
||||
shPrm = shPrm.WithIgnoreMeta(true)
|
||||
|
||||
e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) {
|
||||
res, err := sh.Get(shPrm)
|
||||
obj = res.Object()
|
||||
return err == nil
|
||||
})
|
||||
if obj == nil {
|
||||
return nil, outError
|
||||
}
|
||||
}
|
||||
|
||||
return &GetRes{
|
||||
|
|
|
@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob
|
|||
|
||||
// GetPrm groups the parameters of Get operation.
|
||||
type GetPrm struct {
|
||||
addr *objectSDK.Address
|
||||
addr *objectSDK.Address
|
||||
skipMeta bool
|
||||
}
|
||||
|
||||
// GetRes groups resulting values of Get operation.
|
||||
type GetRes struct {
|
||||
obj *object.Object
|
||||
obj *object.Object
|
||||
hasMeta bool
|
||||
}
|
||||
|
||||
// WithAddress is a Get option to set the address of the requested object.
|
||||
|
@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm {
|
|||
return p
|
||||
}
|
||||
|
||||
// WithIgnoreMeta is a Get option try to fetch object from blobstor directly,
|
||||
// without accessing metabase.
|
||||
func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm {
|
||||
p.skipMeta = ignore
|
||||
return p
|
||||
}
|
||||
|
||||
// Object returns the requested object.
|
||||
func (r *GetRes) Object() *object.Object {
|
||||
return r.obj
|
||||
}
|
||||
|
||||
// HasMeta returns true if info about the object was found in the metabase.
|
||||
func (r *GetRes) HasMeta() bool {
|
||||
return r.hasMeta
|
||||
}
|
||||
|
||||
// Get reads an object from shard.
|
||||
//
|
||||
// Returns any error encountered that
|
||||
|
@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) {
|
|||
return res.Object(), nil
|
||||
}
|
||||
|
||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
||||
obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small)
|
||||
|
||||
return &GetRes{
|
||||
obj: obj,
|
||||
obj: obj,
|
||||
hasMeta: hasMeta,
|
||||
}, err
|
||||
}
|
||||
|
||||
// fetchObjectData looks through writeCache and blobStor to find object.
|
||||
func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) (*object.Object, error) {
|
||||
func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) {
|
||||
var (
|
||||
err error
|
||||
res *object.Object
|
||||
|
@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
if s.hasWriteCache() {
|
||||
res, err = s.writeCache.Get(addr)
|
||||
if err == nil {
|
||||
return res, nil
|
||||
return res, false, nil
|
||||
}
|
||||
|
||||
if errors.Is(err, object.ErrNotFound) {
|
||||
|
@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
}
|
||||
}
|
||||
|
||||
if skipMeta {
|
||||
res, err = small(s.blobStor, nil)
|
||||
if err == nil {
|
||||
return res, false, err
|
||||
}
|
||||
res, err = big(s.blobStor, nil)
|
||||
return res, false, err
|
||||
}
|
||||
|
||||
exists, err := meta.Exists(s.metaBase, addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return nil, object.ErrNotFound
|
||||
return nil, false, object.ErrNotFound
|
||||
}
|
||||
|
||||
blobovniczaID, err := meta.IsSmall(s.metaBase, addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err)
|
||||
}
|
||||
|
||||
if blobovniczaID != nil {
|
||||
|
@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher)
|
|||
res, err = big(s.blobStor, nil)
|
||||
}
|
||||
|
||||
return res, err
|
||||
return res, true, err
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) {
|
|||
return obj.Object(), nil
|
||||
}
|
||||
|
||||
obj, err := s.fetchObjectData(prm.addr, big, small)
|
||||
obj, _, err := s.fetchObjectData(prm.addr, false, big, small)
|
||||
|
||||
return &RngRes{
|
||||
obj: obj,
|
||||
|
|
Loading…
Reference in a new issue