diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go new file mode 100644 index 00000000..0ba51bde --- /dev/null +++ b/pkg/local_object_storage/engine/error_test.go @@ -0,0 +1,100 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +const errSmallSize = 256 + +func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + if dir == "" { + var err error + + dir, err = os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + } + + e := New( + WithLogger(zaptest.NewLogger(t)), + WithShardPoolSize(1)) + + var ids [2]*shard.ID + var err error + + for i := range ids { + ids[i], err = e.AddShard( + shard.WithLogger(zaptest.NewLogger(t)), + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithSmallSizeLimit(errSmallSize), + blobstor.WithRootPerm(0700)), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700))) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, dir, ids +} + +// Issue #1186. +func TestBlobstorFailback(t *testing.T) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) }) + + e, _, id := newEngineWithErrorThreshold(t, dir, 1) + + objs := make([]*object.Object, 0, 2) + for _, size := range []int{1, errSmallSize + 1} { + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, size)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err = e.shards[id[0].String()].Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + objs = append(objs, obj.Object()) + } + + for i := range objs { + _, err = e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + } + + require.NoError(t, e.Close()) + + p1 := e.shards[id[0].String()].DumpInfo().BlobStorInfo.RootPath + p2 := e.shards[id[1].String()].DumpInfo().BlobStorInfo.RootPath + tmp := filepath.Join(dir, "tmp") + require.NoError(t, os.Rename(p1, tmp)) + require.NoError(t, os.Rename(p2, p1)) + require.NoError(t, os.Rename(tmp, p2)) + + e, _, id = newEngineWithErrorThreshold(t, dir, 1) + + for i := range objs { + actual, err := e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + require.Equal(t, objs[i], actual.Object()) + } +} diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index f83bdfb3..5baafa7b 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -64,6 +64,8 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { outSI *objectSDK.SplitInfo outError = object.ErrNotFound + + shardWithMeta hashedShard ) shPrm := new(shard.GetPrm). @@ -72,6 +74,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { res, err := sh.Get(shPrm) if err != nil { + if res.HasMeta() { + shardWithMeta = hashedShard{sh: sh} + } switch { case errors.Is(err, object.ErrNotFound): return false // ignore, go to next shard @@ -116,7 +121,23 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { } if obj == nil { - return nil, outError + if shardWithMeta.sh == nil || !errors.Is(outError, object.ErrNotFound) { + return nil, outError + } + + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + shPrm = shPrm.WithIgnoreMeta(true) + + e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + res, err := sh.Get(shPrm) + obj = res.Object() + return err == nil + }) + if obj == nil { + return nil, outError + } } return &GetRes{ diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 383799fc..cdd85882 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob // GetPrm groups the parameters of Get operation. type GetPrm struct { - addr *objectSDK.Address + addr *objectSDK.Address + skipMeta bool } // GetRes groups resulting values of Get operation. type GetRes struct { - obj *object.Object + obj *object.Object + hasMeta bool } // WithAddress is a Get option to set the address of the requested object. @@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *objectSDK.Address) *GetPrm { return p } +// WithIgnoreMeta is a Get option try to fetch object from blobstor directly, +// without accessing metabase. +func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm { + p.skipMeta = ignore + return p +} + // Object returns the requested object. func (r *GetRes) Object() *object.Object { return r.obj } +// HasMeta returns true if info about the object was found in the metabase. +func (r *GetRes) HasMeta() bool { + return r.hasMeta +} + // Get reads an object from shard. // // Returns any error encountered that @@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) { return res.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small) return &GetRes{ - obj: obj, + obj: obj, + hasMeta: hasMeta, }, err } // fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) (*object.Object, error) { +func (s *Shard) fetchObjectData(addr *objectSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) { var ( err error res *object.Object @@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) if s.hasWriteCache() { res, err = s.writeCache.Get(addr) if err == nil { - return res, nil + return res, false, nil } if errors.Is(err, object.ErrNotFound) { @@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) } } + if skipMeta { + res, err = small(s.blobStor, nil) + if err == nil { + return res, false, err + } + res, err = big(s.blobStor, nil) + return res, false, err + } + exists, err := meta.Exists(s.metaBase, addr) if err != nil { - return nil, err + return nil, false, err } if !exists { - return nil, object.ErrNotFound + return nil, false, object.ErrNotFound } blobovniczaID, err := meta.IsSmall(s.metaBase, addr) if err != nil { - return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) + return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } if blobovniczaID != nil { @@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *objectSDK.Address, big, small storFetcher) res, err = big(s.blobStor, nil) } - return res, err + return res, true, err } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index abc4a533..9ef3a1fa 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -94,7 +94,7 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) { return obj.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, _, err := s.fetchObjectData(prm.addr, false, big, small) return &RngRes{ obj: obj,