From 69e1e6ca20f1979f982f6bbd6c94f70a1ee92a69 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Tue, 22 Feb 2022 10:20:33 +0300 Subject: [PATCH] [#1186] engine: Read object directly from blobstor in case of conflicts Metabase is expected to contain actual information about objects stored in shard. If the object is present in metabase but is missing from blobstor, peform an additional attempt to fetch it directly without consulting metabase. Such a situation is unexpected, so error counter is increased for the shard which has the object in the metabase. We don't increase error counter for the shard which has the object in blobstor, because some garbage can be expected there. In this implementation there is no overhead for objects which are really missing, i.e. are not present in any metabase. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/error_test.go | 125 +++++++++++++----- pkg/local_object_storage/engine/get.go | 28 +++- pkg/local_object_storage/shard/get.go | 44 ++++-- pkg/local_object_storage/shard/range.go | 2 +- 4 files changed, 151 insertions(+), 48 deletions(-) diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index b968a3731..57a1e5043 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -7,6 +7,7 @@ import ( "strconv" "testing" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -15,48 +16,52 @@ import ( "go.uber.org/zap/zaptest" ) -func TestErrorReporting(t *testing.T) { - const smallSize = 100 +const errSmallSize = 256 - log := zaptest.NewLogger(t) - newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { - dir, err := os.MkdirTemp("", "*") +func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + if dir == "" { + var err error + + dir, err = os.MkdirTemp("", "*") require.NoError(t, err) t.Cleanup(func() { _ = os.RemoveAll(dir) }) - - e := New( - WithLogger(log), - WithShardPoolSize(1), - WithErrorThreshold(errThreshold)) - - var ids [2]*shard.ID - - for i := range ids { - ids[i], err = e.AddShard( - shard.WithLogger(log), - shard.WithBlobStorOptions( - blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), - blobstor.WithShallowDepth(1), - blobstor.WithBlobovniczaShallowWidth(1), - blobstor.WithBlobovniczaShallowDepth(1), - blobstor.WithSmallSizeLimit(100), - blobstor.WithRootPerm(0700)), - shard.WithMetaBaseOptions( - meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), - meta.WithPermissions(0700))) - require.NoError(t, err) - } - require.NoError(t, e.Open()) - require.NoError(t, e.Init()) - - return e, dir, ids } + e := New( + WithLogger(zaptest.NewLogger(t)), + WithShardPoolSize(1), + WithErrorThreshold(errThreshold)) + + var ids [2]*shard.ID + var err error + + for i := range ids { + ids[i], err = e.AddShard( + shard.WithLogger(zaptest.NewLogger(t)), + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithSmallSizeLimit(errSmallSize), + blobstor.WithRootPerm(0700)), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700))) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, dir, ids +} + +func TestErrorReporting(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) { - e, dir, id := newEngine(t, 0) + e, dir, id := newEngineWithErrorThreshold(t, "", 0) obj := generateRawObjectWithCID(t, cidtest.ID()) - obj.SetPayload(make([]byte, smallSize)) + obj.SetPayload(make([]byte, errSmallSize)) prm := new(shard.PutPrm).WithObject(obj.Object()) e.mtx.RLock() @@ -82,10 +87,10 @@ func TestErrorReporting(t *testing.T) { t.Run("with error threshold", func(t *testing.T) { const errThreshold = 3 - e, dir, id := newEngine(t, errThreshold) + e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold) obj := generateRawObjectWithCID(t, cidtest.ID()) - obj.SetPayload(make([]byte, smallSize)) + obj.SetPayload(make([]byte, errSmallSize)) prm := new(shard.PutPrm).WithObject(obj.Object()) e.mtx.RLock() @@ -123,6 +128,54 @@ func TestErrorReporting(t *testing.T) { }) } +// Issue #1186. +func TestBlobstorFailback(t *testing.T) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) }) + + e, _, id := newEngineWithErrorThreshold(t, dir, 1) + + objs := make([]*object.Object, 0, 2) + for _, size := range []int{1, errSmallSize + 1} { + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, size)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err = e.shards[id[0].String()].Shard.Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + objs = append(objs, obj.Object()) + } + + for i := range objs { + _, err = e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + } + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + require.NoError(t, e.Close()) + + p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.RootPath + p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.RootPath + tmp := filepath.Join(dir, "tmp") + require.NoError(t, os.Rename(p1, tmp)) + require.NoError(t, os.Rename(p2, p1)) + require.NoError(t, os.Rename(tmp, p2)) + + e, _, id = newEngineWithErrorThreshold(t, dir, 1) + + for i := range objs { + actual, err := e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + require.Equal(t, objs[i], actual.Object()) + } + + checkShardState(t, e, id[0], 2, shard.ModeReadOnly) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) +} + func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) { e.mtx.RLock() sh := e.shards[id.String()] diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index e80117d09..0c590925b 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" ) // GetPrm groups the parameters of Get operation. @@ -64,6 +65,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { outSI *objectSDK.SplitInfo outError = object.ErrNotFound + + shardWithMeta hashedShard + metaError error ) shPrm := new(shard.GetPrm). @@ -72,6 +76,10 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Get(shPrm) if err != nil { + if res.HasMeta() { + shardWithMeta = sh + metaError = err + } switch { case errors.Is(err, object.ErrNotFound): return false // ignore, go to next shard @@ -110,7 +118,25 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { } if obj == nil { - return nil, outError + if shardWithMeta.Shard == nil || !errors.Is(outError, object.ErrNotFound) { + return nil, outError + } + + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + shPrm = shPrm.WithIgnoreMeta(true) + + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + res, err := sh.Get(shPrm) + obj = res.Object() + return err == nil + }) + if obj == nil { + return nil, outError + } + e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + metaError, zap.Stringer("address", prm.addr)) } return &GetRes{ diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 5238a56a6..07252477e 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob // GetPrm groups the parameters of Get operation. type GetPrm struct { - addr *addressSDK.Address + addr *addressSDK.Address + skipMeta bool } // GetRes groups resulting values of Get operation. type GetRes struct { - obj *object.Object + obj *object.Object + hasMeta bool } // WithAddress is a Get option to set the address of the requested object. @@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *addressSDK.Address) *GetPrm { return p } +// WithIgnoreMeta is a Get option try to fetch object from blobstor directly, +// without accessing metabase. +func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm { + p.skipMeta = ignore + return p +} + // Object returns the requested object. func (r *GetRes) Object() *object.Object { return r.obj } +// HasMeta returns true if info about the object was found in the metabase. +func (r *GetRes) HasMeta() bool { + return r.hasMeta +} + // Get reads an object from shard. // // Returns any error encountered that @@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) { return res.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small) return &GetRes{ - obj: obj, + obj: obj, + hasMeta: hasMeta, }, err } // fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher) (*object.Object, error) { +func (s *Shard) fetchObjectData(addr *addressSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) { var ( err error res *object.Object @@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher if s.hasWriteCache() { res, err = s.writeCache.Get(addr) if err == nil { - return res, nil + return res, false, nil } if errors.Is(err, object.ErrNotFound) { @@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher } } + if skipMeta { + res, err = small(s.blobStor, nil) + if err == nil { + return res, false, err + } + res, err = big(s.blobStor, nil) + return res, false, err + } + exists, err := meta.Exists(s.metaBase, addr) if err != nil { - return nil, err + return nil, false, err } if !exists { - return nil, object.ErrNotFound + return nil, false, object.ErrNotFound } blobovniczaID, err := meta.IsSmall(s.metaBase, addr) if err != nil { - return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) + return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } if blobovniczaID != nil { @@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher res, err = big(s.blobStor, nil) } - return res, err + return res, true, err } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index c397d69b4..aecb4d4cb 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -95,7 +95,7 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) { return obj.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, _, err := s.fetchObjectData(prm.addr, false, big, small) return &RngRes{ obj: obj,