diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index b968a3731..57a1e5043 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -7,6 +7,7 @@ import ( "strconv" "testing" + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -15,48 +16,52 @@ import ( "go.uber.org/zap/zaptest" ) -func TestErrorReporting(t *testing.T) { - const smallSize = 100 +const errSmallSize = 256 - log := zaptest.NewLogger(t) - newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { - dir, err := os.MkdirTemp("", "*") +func newEngineWithErrorThreshold(t *testing.T, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + if dir == "" { + var err error + + dir, err = os.MkdirTemp("", "*") require.NoError(t, err) t.Cleanup(func() { _ = os.RemoveAll(dir) }) - - e := New( - WithLogger(log), - WithShardPoolSize(1), - WithErrorThreshold(errThreshold)) - - var ids [2]*shard.ID - - for i := range ids { - ids[i], err = e.AddShard( - shard.WithLogger(log), - shard.WithBlobStorOptions( - blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), - blobstor.WithShallowDepth(1), - blobstor.WithBlobovniczaShallowWidth(1), - blobstor.WithBlobovniczaShallowDepth(1), - blobstor.WithSmallSizeLimit(100), - blobstor.WithRootPerm(0700)), - shard.WithMetaBaseOptions( - meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), - meta.WithPermissions(0700))) - require.NoError(t, err) - } - require.NoError(t, e.Open()) - require.NoError(t, e.Init()) - - return e, dir, ids } + e := New( + WithLogger(zaptest.NewLogger(t)), + WithShardPoolSize(1), + WithErrorThreshold(errThreshold)) + + var ids [2]*shard.ID + var err error + + for i := range ids { + ids[i], err = e.AddShard( + shard.WithLogger(zaptest.NewLogger(t)), + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithSmallSizeLimit(errSmallSize), + blobstor.WithRootPerm(0700)), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700))) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, dir, ids +} + +func TestErrorReporting(t *testing.T) { t.Run("ignore errors by default", func(t *testing.T) { - e, dir, id := newEngine(t, 0) + e, dir, id := newEngineWithErrorThreshold(t, "", 0) obj := generateRawObjectWithCID(t, cidtest.ID()) - obj.SetPayload(make([]byte, smallSize)) + obj.SetPayload(make([]byte, errSmallSize)) prm := new(shard.PutPrm).WithObject(obj.Object()) e.mtx.RLock() @@ -82,10 +87,10 @@ func TestErrorReporting(t *testing.T) { t.Run("with error threshold", func(t *testing.T) { const errThreshold = 3 - e, dir, id := newEngine(t, errThreshold) + e, dir, id := newEngineWithErrorThreshold(t, "", errThreshold) obj := generateRawObjectWithCID(t, cidtest.ID()) - obj.SetPayload(make([]byte, smallSize)) + obj.SetPayload(make([]byte, errSmallSize)) prm := new(shard.PutPrm).WithObject(obj.Object()) e.mtx.RLock() @@ -123,6 +128,54 @@ func TestErrorReporting(t *testing.T) { }) } +// Issue #1186. +func TestBlobstorFailback(t *testing.T) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, os.RemoveAll(dir)) }) + + e, _, id := newEngineWithErrorThreshold(t, dir, 1) + + objs := make([]*object.Object, 0, 2) + for _, size := range []int{1, errSmallSize + 1} { + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, size)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err = e.shards[id[0].String()].Shard.Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + objs = append(objs, obj.Object()) + } + + for i := range objs { + _, err = e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + } + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + require.NoError(t, e.Close()) + + p1 := e.shards[id[0].String()].Shard.DumpInfo().BlobStorInfo.RootPath + p2 := e.shards[id[1].String()].Shard.DumpInfo().BlobStorInfo.RootPath + tmp := filepath.Join(dir, "tmp") + require.NoError(t, os.Rename(p1, tmp)) + require.NoError(t, os.Rename(p2, p1)) + require.NoError(t, os.Rename(tmp, p2)) + + e, _, id = newEngineWithErrorThreshold(t, dir, 1) + + for i := range objs { + actual, err := e.Get(&GetPrm{addr: objs[i].Address()}) + require.NoError(t, err) + require.Equal(t, objs[i], actual.Object()) + } + + checkShardState(t, e, id[0], 2, shard.ModeReadOnly) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) +} + func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) { e.mtx.RLock() sh := e.shards[id.String()] diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index e80117d09..0c590925b 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "go.uber.org/zap" ) // GetPrm groups the parameters of Get operation. @@ -64,6 +65,9 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { outSI *objectSDK.SplitInfo outError = object.ErrNotFound + + shardWithMeta hashedShard + metaError error ) shPrm := new(shard.GetPrm). @@ -72,6 +76,10 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Get(shPrm) if err != nil { + if res.HasMeta() { + shardWithMeta = sh + metaError = err + } switch { case errors.Is(err, object.ErrNotFound): return false // ignore, go to next shard @@ -110,7 +118,25 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { } if obj == nil { - return nil, outError + if shardWithMeta.Shard == nil || !errors.Is(outError, object.ErrNotFound) { + return nil, outError + } + + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + shPrm = shPrm.WithIgnoreMeta(true) + + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + res, err := sh.Get(shPrm) + obj = res.Object() + return err == nil + }) + if obj == nil { + return nil, outError + } + e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + metaError, zap.Stringer("address", prm.addr)) } return &GetRes{ diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 5238a56a6..07252477e 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -18,12 +18,14 @@ type storFetcher = func(stor *blobstor.BlobStor, id *blobovnicza.ID) (*object.Ob // GetPrm groups the parameters of Get operation. type GetPrm struct { - addr *addressSDK.Address + addr *addressSDK.Address + skipMeta bool } // GetRes groups resulting values of Get operation. type GetRes struct { - obj *object.Object + obj *object.Object + hasMeta bool } // WithAddress is a Get option to set the address of the requested object. @@ -37,11 +39,23 @@ func (p *GetPrm) WithAddress(addr *addressSDK.Address) *GetPrm { return p } +// WithIgnoreMeta is a Get option try to fetch object from blobstor directly, +// without accessing metabase. +func (p *GetPrm) WithIgnoreMeta(ignore bool) *GetPrm { + p.skipMeta = ignore + return p +} + // Object returns the requested object. func (r *GetRes) Object() *object.Object { return r.obj } +// HasMeta returns true if info about the object was found in the metabase. +func (r *GetRes) HasMeta() bool { + return r.hasMeta +} + // Get reads an object from shard. // // Returns any error encountered that @@ -76,15 +90,16 @@ func (s *Shard) Get(prm *GetPrm) (*GetRes, error) { return res.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, hasMeta, err := s.fetchObjectData(prm.addr, prm.skipMeta, big, small) return &GetRes{ - obj: obj, + obj: obj, + hasMeta: hasMeta, }, err } // fetchObjectData looks through writeCache and blobStor to find object. -func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher) (*object.Object, error) { +func (s *Shard) fetchObjectData(addr *addressSDK.Address, skipMeta bool, big, small storFetcher) (*object.Object, bool, error) { var ( err error res *object.Object @@ -93,7 +108,7 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher if s.hasWriteCache() { res, err = s.writeCache.Get(addr) if err == nil { - return res, nil + return res, false, nil } if errors.Is(err, object.ErrNotFound) { @@ -103,18 +118,27 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher } } + if skipMeta { + res, err = small(s.blobStor, nil) + if err == nil { + return res, false, err + } + res, err = big(s.blobStor, nil) + return res, false, err + } + exists, err := meta.Exists(s.metaBase, addr) if err != nil { - return nil, err + return nil, false, err } if !exists { - return nil, object.ErrNotFound + return nil, false, object.ErrNotFound } blobovniczaID, err := meta.IsSmall(s.metaBase, addr) if err != nil { - return nil, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) + return nil, true, fmt.Errorf("can't fetch blobovnicza id from metabase: %w", err) } if blobovniczaID != nil { @@ -123,5 +147,5 @@ func (s *Shard) fetchObjectData(addr *addressSDK.Address, big, small storFetcher res, err = big(s.blobStor, nil) } - return res, err + return res, true, err } diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index c397d69b4..aecb4d4cb 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -95,7 +95,7 @@ func (s *Shard) GetRange(prm *RngPrm) (*RngRes, error) { return obj.Object(), nil } - obj, err := s.fetchObjectData(prm.addr, big, small) + obj, _, err := s.fetchObjectData(prm.addr, false, big, small) return &RngRes{ obj: obj,