From 6ad2624552d572fa7ba2bdfb5520d4790aed56e5 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Mon, 31 Jan 2022 17:58:32 +0300 Subject: [PATCH] [#1118] engine: allow to set error threshold There are certain errors which are not expected during usual node operation and which tell us that something is wrong with the shard. To prevent possible data corruption, move shard in read-only mode after amount of errors exceeded some threshold. By default no actions are performed. Signed-off-by: Evgenii Stratonikov --- pkg/local_object_storage/engine/container.go | 20 +-- pkg/local_object_storage/engine/delete.go | 16 +- pkg/local_object_storage/engine/engine.go | 50 ++++++- .../engine/engine_test.go | 8 +- pkg/local_object_storage/engine/error_test.go | 139 ++++++++++++++++++ pkg/local_object_storage/engine/exists.go | 9 +- pkg/local_object_storage/engine/get.go | 11 +- pkg/local_object_storage/engine/head.go | 11 +- pkg/local_object_storage/engine/inhume.go | 18 +-- pkg/local_object_storage/engine/put.go | 14 +- pkg/local_object_storage/engine/range.go | 11 +- pkg/local_object_storage/engine/select.go | 17 +-- pkg/local_object_storage/engine/shards.go | 32 ++-- 13 files changed, 243 insertions(+), 113 deletions(-) create mode 100644 pkg/local_object_storage/engine/error_test.go diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 94808409..d2ac4c48 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -72,14 +72,11 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (*ContainerSizeRes, var res ContainerSizeRes - e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { - size, err := shard.ContainerSize(s, prm.cid) + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + size, err := shard.ContainerSize(sh.Shard, prm.cid) if err != nil { - e.log.Warn("can't get container size", - zap.Stringer("shard_id", s.ID()), - zap.Stringer("container_id", prm.cid), - zap.String("error", err.Error())) - + e.reportShardError(sh, "can't get container size", err, + zap.Stringer("container_id", prm.cid)) return false } @@ -122,13 +119,10 @@ func (e *StorageEngine) listContainers() (*ListContainersRes, error) { uniqueIDs := make(map[string]*cid.ID) - e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { - cnrs, err := shard.ListContainers(s) + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + cnrs, err := shard.ListContainers(sh.Shard) if err != nil { - e.log.Warn("can't get list of containers", - zap.Stringer("shard_id", s.ID()), - zap.String("error", err.Error())) - + e.reportShardError(sh, "can't get list of containers", err) return false } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 489d480c..de6aac69 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -3,7 +3,6 @@ package engine import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -46,15 +45,10 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) { existsPrm := new(shard.ExistsPrm) for i := range prm.addr { - e.iterateOverSortedShards(prm.addr[i], func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) { resExists, err := sh.Exists(existsPrm.WithAddress(prm.addr[i])) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check object existence", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not check object existence", err) return false } else if !resExists.Exists() { return false @@ -62,11 +56,7 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) { _, err = sh.Inhume(shPrm.MarkAsGarbage(prm.addr[i])) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not inhume object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not inhume object in shard", err) } return err == nil diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index a3fcaa71..24fb6620 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -15,7 +16,7 @@ type StorageEngine struct { mtx *sync.RWMutex - shards map[string]*shard.Shard + shards map[string]shardWrapper shardPools map[string]util.WorkerPool @@ -26,12 +27,49 @@ type StorageEngine struct { } } +type shardWrapper struct { + errorCount *atomic.Uint32 + *shard.Shard +} + +// reportShardError checks that amount of errors doesn't exceed configured threshold. +// If it does, shard is set to read-only mode. +func (e *StorageEngine) reportShardError( + sh hashedShard, + msg string, + err error, + fields ...zap.Field) { + errCount := sh.errorCount.Inc() + e.log.Warn(msg, append([]zap.Field{ + zap.Stringer("shard_id", sh.ID()), + zap.Uint32("error count", errCount), + zap.String("error", err.Error()), + }, fields...)...) + + if e.errorsThreshold == 0 || errCount < e.errorsThreshold { + return + } + + err = sh.SetMode(shard.ModeReadOnly) + if err != nil { + e.log.Error("failed to move shard in read-only mode", + zap.Uint32("error count", errCount), + zap.Error(err)) + } else { + e.log.Info("shard is moved in read-only due to error threshold", + zap.Stringer("shard_id", sh.ID()), + zap.Uint32("error count", errCount)) + } +} + // Option represents StorageEngine's constructor option. type Option func(*cfg) type cfg struct { log *logger.Logger + errorsThreshold uint32 + metrics MetricRegister shardPoolSize uint32 @@ -56,7 +94,7 @@ func New(opts ...Option) *StorageEngine { return &StorageEngine{ cfg: c, mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard), + shards: make(map[string]shardWrapper), shardPools: make(map[string]util.WorkerPool), } } @@ -80,3 +118,11 @@ func WithShardPoolSize(sz uint32) Option { c.shardPoolSize = sz } } + +// WithErrorThreshold returns an option to specify size amount of errors after which +// shard is moved to read-only mode. +func WithErrorThreshold(sz uint32) Option { + return func(c *cfg) { + c.errorsThreshold = sz + } +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 6565cd44..feba5e3c 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -23,6 +23,7 @@ import ( "github.com/nspcc-dev/tzhash/tz" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -32,7 +33,7 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { log: zap.L(), }, mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard, len(shards)), + shards: make(map[string]shardWrapper, len(shards)), shardPools: make(map[string]util.WorkerPool, len(shards)), } @@ -42,7 +43,10 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { panic(err) } - engine.shards[s.ID().String()] = s + engine.shards[s.ID().String()] = shardWrapper{ + errorCount: atomic.NewUint32(0), + Shard: s, + } engine.shardPools[s.ID().String()] = pool } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go new file mode 100644 index 00000000..13d8a63f --- /dev/null +++ b/pkg/local_object_storage/engine/error_test.go @@ -0,0 +1,139 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestErrorReporting(t *testing.T) { + const smallSize = 100 + + log := zaptest.NewLogger(t) + newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + e := New( + WithLogger(log), + WithShardPoolSize(1), + WithErrorThreshold(errThreshold)) + + var ids [2]*shard.ID + + for i := range ids { + ids[i], err = e.AddShard( + shard.WithLogger(log), + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithSmallSizeLimit(100), + blobstor.WithRootPerm(0700)), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700))) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, dir, ids + } + + t.Run("ignore errors by default", func(t *testing.T) { + e, dir, id := newEngine(t, 0) + + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, smallSize)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err := e.shards[id[0].String()].Shard.Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.NoError(t, err) + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + + corruptSubDir(t, filepath.Join(dir, "0")) + + for i := uint32(1); i < 3; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], i, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + }) + t.Run("with error threshold", func(t *testing.T) { + const errThreshold = 3 + + e, dir, id := newEngine(t, errThreshold) + + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, smallSize)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err := e.shards[id[0].String()].Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.NoError(t, err) + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + + corruptSubDir(t, filepath.Join(dir, "0")) + + for i := uint32(1); i < errThreshold; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], i, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + + for i := uint32(0); i < 2; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], errThreshold+i, shard.ModeReadOnly) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + }) +} + +func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) { + e.mtx.RLock() + sh := e.shards[id.String()] + e.mtx.RUnlock() + + require.Equal(t, mode, sh.GetMode()) + require.Equal(t, errCount, sh.errorCount.Load()) +} + +// corruptSubDir makes random directory in blobstor FSTree unreadable. +func corruptSubDir(t *testing.T, dir string) { + de, err := os.ReadDir(dir) + require.NoError(t, err) + for i := range de { + if de[i].IsDir() { + require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0)) + return + } + } +} diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 2f18123f..3de12fc4 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { @@ -14,7 +13,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { alreadyRemoved := false exists := false - e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Exists(shPrm) if err != nil { if errors.Is(err, object.ErrAlreadyRemoved) { @@ -23,11 +22,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { return true } - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check existence of object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not check existence of object in shard", err) } if res != nil && !exists { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index f83bdfb3..c24ac900 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // GetPrm groups the parameters of Get operation. @@ -69,7 +68,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { shPrm := new(shard.GetPrm). WithAddress(prm.addr) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Get(shPrm) if err != nil { switch { @@ -95,13 +94,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not get object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not get object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 96f99395..e7a01884 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // HeadPrm groups the parameters of Head operation. @@ -85,7 +84,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) { WithAddress(prm.addr). WithRaw(prm.raw) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Head(shPrm) if err != nil { switch { @@ -111,13 +110,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not head object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not head object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 68cfe24a..6a17b075 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // InhumePrm encapsulates parameters for inhume operation. @@ -89,7 +88,7 @@ func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) { func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm, checkExists bool) (ok bool) { root := false - e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { defer func() { // if object is root we continue since information about it // can be presented in other shards @@ -111,12 +110,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check for presents in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not check for presents in shard", err) return } @@ -128,11 +122,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm _, err := sh.Inhume(prm) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not inhume object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not inhume object in shard", err) } else { ok = true } @@ -150,7 +140,7 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*o tss[addrs[i].String()] = struct{}{} } - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(tss) select { diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index cda62b93..98316fa6 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -59,9 +59,9 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { finished := false - e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.obj.Address(), func(ind int, sh hashedShard) (stop bool) { e.mtx.RLock() - pool := e.shardPools[s.ID().String()] + pool := e.shardPools[sh.ID().String()] e.mtx.RUnlock() exitCh := make(chan struct{}) @@ -69,7 +69,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { if err := pool.Submit(func() { defer close(exitCh) - exists, err := s.Exists(existPrm) + exists, err := sh.Exists(existPrm) if err != nil { return // this is not ErrAlreadyRemoved error so we can go to the next shard } @@ -79,10 +79,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { toMoveItPrm := new(shard.ToMoveItPrm) toMoveItPrm.WithAddress(prm.obj.Address()) - _, err = s.ToMoveIt(toMoveItPrm) + _, err = sh.ToMoveIt(toMoveItPrm) if err != nil { e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", s.ID()), + zap.Stringer("shard", sh.ID()), zap.String("error", err.Error()), ) } @@ -96,10 +96,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { putPrm := new(shard.PutPrm) putPrm.WithObject(prm.obj) - _, err = s.Put(putPrm) + _, err = sh.Put(putPrm) if err != nil { e.log.Warn("could not put object in shard", - zap.Stringer("shard", s.ID()), + zap.Stringer("shard", sh.ID()), zap.String("error", err.Error()), ) diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 585b9978..23463544 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // RngPrm groups the parameters of GetRange operation. @@ -88,7 +87,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) { WithAddress(prm.addr). WithRange(prm.off, prm.ln) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.GetRange(shPrm) if err != nil { switch { @@ -116,13 +115,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not get object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not get object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 468eb18c..f10f31cb 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // SelectPrm groups the parameters of Select operation. @@ -72,7 +71,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) { WithContainerID(prm.cid). WithFilters(prm.filters) - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.Select(shPrm) if err != nil { switch { @@ -82,11 +81,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) { return true default: - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not select objects from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not select objects from shard", err) return false } } else { @@ -129,14 +124,10 @@ func (e *StorageEngine) list(limit uint64) (*SelectRes, error) { ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.List() // consider limit result of shard iterator if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not select objects from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not select objects from shard", err) } else { for _, addr := range res.AddressList() { // save only unique values if _, ok := uniqueMap[addr.String()]; !ok { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index b43f1723..d4c4fcde 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,13 +9,12 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/panjf2000/ants/v2" + "go.uber.org/atomic" ) var errShardNotFound = errors.New("shard not found") -type hashedShard struct { - sh *shard.Shard -} +type hashedShard shardWrapper // AddShard adds a new shard to the storage engine. // @@ -37,10 +36,13 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { strID := id.String() - e.shards[strID] = shard.New(append(opts, - shard.WithID(id), - shard.WithExpiredObjectsCallback(e.processExpiredTombstones), - )...) + e.shards[strID] = shardWrapper{ + errorCount: atomic.NewUint32(0), + Shard: shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + )...), + } e.shardPools[strID] = pool @@ -75,8 +77,8 @@ func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []hashedShard { weights := make([]float64, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard{sh}) - weights = append(weights, e.shardWeight(sh)) + shards = append(shards, hashedShard(sh)) + weights = append(weights, e.shardWeight(sh.Shard)) } hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.String()))) @@ -91,23 +93,23 @@ func (e *StorageEngine) unsortedShards() []hashedShard { shards := make([]hashedShard, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard{sh}) + shards = append(shards, hashedShard(sh)) } return shards } -func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, *shard.Shard) (stop bool)) { +func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, hashedShard) (stop bool)) { for i, sh := range e.sortShardsByWeight(addr) { - if handler(i, sh.sh) { + if handler(i, sh) { break } } } -func (e *StorageEngine) iterateOverUnsortedShards(handler func(*shard.Shard) (stop bool)) { +func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) { for _, sh := range e.unsortedShards() { - if handler(sh.sh) { + if handler(sh) { break } } @@ -131,6 +133,6 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode) error { func (s hashedShard) Hash() uint64 { return hrw.Hash( - []byte(s.sh.ID().String()), + []byte(s.Shard.ID().String()), ) }