diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 948084098..d2ac4c480 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -72,14 +72,11 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (*ContainerSizeRes, var res ContainerSizeRes - e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { - size, err := shard.ContainerSize(s, prm.cid) + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + size, err := shard.ContainerSize(sh.Shard, prm.cid) if err != nil { - e.log.Warn("can't get container size", - zap.Stringer("shard_id", s.ID()), - zap.Stringer("container_id", prm.cid), - zap.String("error", err.Error())) - + e.reportShardError(sh, "can't get container size", err, + zap.Stringer("container_id", prm.cid)) return false } @@ -122,13 +119,10 @@ func (e *StorageEngine) listContainers() (*ListContainersRes, error) { uniqueIDs := make(map[string]*cid.ID) - e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { - cnrs, err := shard.ListContainers(s) + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + cnrs, err := shard.ListContainers(sh.Shard) if err != nil { - e.log.Warn("can't get list of containers", - zap.Stringer("shard_id", s.ID()), - zap.String("error", err.Error())) - + e.reportShardError(sh, "can't get list of containers", err) return false } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 489d480c3..de6aac690 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -3,7 +3,6 @@ package engine import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // DeletePrm groups the parameters of Delete operation. @@ -46,15 +45,10 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) { existsPrm := new(shard.ExistsPrm) for i := range prm.addr { - e.iterateOverSortedShards(prm.addr[i], func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr[i], func(_ int, sh hashedShard) (stop bool) { resExists, err := sh.Exists(existsPrm.WithAddress(prm.addr[i])) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check object existence", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not check object existence", err) return false } else if !resExists.Exists() { return false @@ -62,11 +56,7 @@ func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) { _, err = sh.Inhume(shPrm.MarkAsGarbage(prm.addr[i])) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not inhume object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not inhume object in shard", err) } return err == nil diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index a3fcaa71b..24fb66206 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -15,7 +16,7 @@ type StorageEngine struct { mtx *sync.RWMutex - shards map[string]*shard.Shard + shards map[string]shardWrapper shardPools map[string]util.WorkerPool @@ -26,12 +27,49 @@ type StorageEngine struct { } } +type shardWrapper struct { + errorCount *atomic.Uint32 + *shard.Shard +} + +// reportShardError checks that amount of errors doesn't exceed configured threshold. +// If it does, shard is set to read-only mode. +func (e *StorageEngine) reportShardError( + sh hashedShard, + msg string, + err error, + fields ...zap.Field) { + errCount := sh.errorCount.Inc() + e.log.Warn(msg, append([]zap.Field{ + zap.Stringer("shard_id", sh.ID()), + zap.Uint32("error count", errCount), + zap.String("error", err.Error()), + }, fields...)...) + + if e.errorsThreshold == 0 || errCount < e.errorsThreshold { + return + } + + err = sh.SetMode(shard.ModeReadOnly) + if err != nil { + e.log.Error("failed to move shard in read-only mode", + zap.Uint32("error count", errCount), + zap.Error(err)) + } else { + e.log.Info("shard is moved in read-only due to error threshold", + zap.Stringer("shard_id", sh.ID()), + zap.Uint32("error count", errCount)) + } +} + // Option represents StorageEngine's constructor option. type Option func(*cfg) type cfg struct { log *logger.Logger + errorsThreshold uint32 + metrics MetricRegister shardPoolSize uint32 @@ -56,7 +94,7 @@ func New(opts ...Option) *StorageEngine { return &StorageEngine{ cfg: c, mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard), + shards: make(map[string]shardWrapper), shardPools: make(map[string]util.WorkerPool), } } @@ -80,3 +118,11 @@ func WithShardPoolSize(sz uint32) Option { c.shardPoolSize = sz } } + +// WithErrorThreshold returns an option to specify size amount of errors after which +// shard is moved to read-only mode. +func WithErrorThreshold(sz uint32) Option { + return func(c *cfg) { + c.errorsThreshold = sz + } +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 6565cd440..feba5e3ca 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -23,6 +23,7 @@ import ( "github.com/nspcc-dev/tzhash/tz" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -32,7 +33,7 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { log: zap.L(), }, mtx: new(sync.RWMutex), - shards: make(map[string]*shard.Shard, len(shards)), + shards: make(map[string]shardWrapper, len(shards)), shardPools: make(map[string]util.WorkerPool, len(shards)), } @@ -42,7 +43,10 @@ func testNewEngineWithShards(shards ...*shard.Shard) *StorageEngine { panic(err) } - engine.shards[s.ID().String()] = s + engine.shards[s.ID().String()] = shardWrapper{ + errorCount: atomic.NewUint32(0), + Shard: s, + } engine.shardPools[s.ID().String()] = pool } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go new file mode 100644 index 000000000..13d8a63f7 --- /dev/null +++ b/pkg/local_object_storage/engine/error_test.go @@ -0,0 +1,139 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestErrorReporting(t *testing.T) { + const smallSize = 100 + + log := zaptest.NewLogger(t) + newEngine := func(t *testing.T, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { + dir, err := os.MkdirTemp("", "*") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + + e := New( + WithLogger(log), + WithShardPoolSize(1), + WithErrorThreshold(errThreshold)) + + var ids [2]*shard.ID + + for i := range ids { + ids[i], err = e.AddShard( + shard.WithLogger(log), + shard.WithBlobStorOptions( + blobstor.WithRootPath(filepath.Join(dir, strconv.Itoa(i))), + blobstor.WithShallowDepth(1), + blobstor.WithBlobovniczaShallowWidth(1), + blobstor.WithBlobovniczaShallowDepth(1), + blobstor.WithSmallSizeLimit(100), + blobstor.WithRootPerm(0700)), + shard.WithMetaBaseOptions( + meta.WithPath(filepath.Join(dir, fmt.Sprintf("%d.metabase", i))), + meta.WithPermissions(0700))) + require.NoError(t, err) + } + require.NoError(t, e.Open()) + require.NoError(t, e.Init()) + + return e, dir, ids + } + + t.Run("ignore errors by default", func(t *testing.T) { + e, dir, id := newEngine(t, 0) + + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, smallSize)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err := e.shards[id[0].String()].Shard.Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.NoError(t, err) + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + + corruptSubDir(t, filepath.Join(dir, "0")) + + for i := uint32(1); i < 3; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], i, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + }) + t.Run("with error threshold", func(t *testing.T) { + const errThreshold = 3 + + e, dir, id := newEngine(t, errThreshold) + + obj := generateRawObjectWithCID(t, cidtest.ID()) + obj.SetPayload(make([]byte, smallSize)) + + prm := new(shard.PutPrm).WithObject(obj.Object()) + e.mtx.RLock() + _, err := e.shards[id[0].String()].Put(prm) + e.mtx.RUnlock() + require.NoError(t, err) + + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.NoError(t, err) + + checkShardState(t, e, id[0], 0, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + + corruptSubDir(t, filepath.Join(dir, "0")) + + for i := uint32(1); i < errThreshold; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], i, shard.ModeReadWrite) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + + for i := uint32(0); i < 2; i++ { + _, err = e.Get(&GetPrm{addr: obj.Object().Address()}) + require.Error(t, err) + checkShardState(t, e, id[0], errThreshold+i, shard.ModeReadOnly) + checkShardState(t, e, id[1], 0, shard.ModeReadWrite) + } + }) +} + +func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode shard.Mode) { + e.mtx.RLock() + sh := e.shards[id.String()] + e.mtx.RUnlock() + + require.Equal(t, mode, sh.GetMode()) + require.Equal(t, errCount, sh.errorCount.Load()) +} + +// corruptSubDir makes random directory in blobstor FSTree unreadable. +func corruptSubDir(t *testing.T, dir string) { + de, err := os.ReadDir(dir) + require.NoError(t, err) + for i := range de { + if de[i].IsDir() { + require.NoError(t, os.Chmod(filepath.Join(dir, de[i].Name()), 0)) + return + } + } +} diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 2f18123fb..3de12fc45 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { @@ -14,7 +13,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { alreadyRemoved := false exists := false - e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Exists(shPrm) if err != nil { if errors.Is(err, object.ErrAlreadyRemoved) { @@ -23,11 +22,7 @@ func (e *StorageEngine) exists(addr *objectSDK.Address) (bool, error) { return true } - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check existence of object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not check existence of object in shard", err) } if res != nil && !exists { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index f83bdfb3e..c24ac9002 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // GetPrm groups the parameters of Get operation. @@ -69,7 +68,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { shPrm := new(shard.GetPrm). WithAddress(prm.addr) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Get(shPrm) if err != nil { switch { @@ -95,13 +94,7 @@ func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not get object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not get object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 96f993950..e7a018840 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // HeadPrm groups the parameters of Head operation. @@ -85,7 +84,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) { WithAddress(prm.addr). WithRaw(prm.raw) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Head(shPrm) if err != nil { switch { @@ -111,13 +110,7 @@ func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not head object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not head object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 68cfe24a9..6a17b075d 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // InhumePrm encapsulates parameters for inhume operation. @@ -89,7 +88,7 @@ func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) { func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm, checkExists bool) (ok bool) { root := false - e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { defer func() { // if object is root we continue since information about it // can be presented in other shards @@ -111,12 +110,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not check for presents in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not check for presents in shard", err) return } @@ -128,11 +122,7 @@ func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm _, err := sh.Inhume(prm) if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not inhume object in shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not inhume object in shard", err) } else { ok = true } @@ -150,7 +140,7 @@ func (e *StorageEngine) processExpiredTombstones(ctx context.Context, addrs []*o tss[addrs[i].String()] = struct{}{} } - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleExpiredTombstones(tss) select { diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index cda62b93a..98316fa6d 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -59,9 +59,9 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { finished := false - e.iterateOverSortedShards(prm.obj.Address(), func(ind int, s *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.obj.Address(), func(ind int, sh hashedShard) (stop bool) { e.mtx.RLock() - pool := e.shardPools[s.ID().String()] + pool := e.shardPools[sh.ID().String()] e.mtx.RUnlock() exitCh := make(chan struct{}) @@ -69,7 +69,7 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { if err := pool.Submit(func() { defer close(exitCh) - exists, err := s.Exists(existPrm) + exists, err := sh.Exists(existPrm) if err != nil { return // this is not ErrAlreadyRemoved error so we can go to the next shard } @@ -79,10 +79,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { toMoveItPrm := new(shard.ToMoveItPrm) toMoveItPrm.WithAddress(prm.obj.Address()) - _, err = s.ToMoveIt(toMoveItPrm) + _, err = sh.ToMoveIt(toMoveItPrm) if err != nil { e.log.Warn("could not mark object for shard relocation", - zap.Stringer("shard", s.ID()), + zap.Stringer("shard", sh.ID()), zap.String("error", err.Error()), ) } @@ -96,10 +96,10 @@ func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { putPrm := new(shard.PutPrm) putPrm.WithObject(prm.obj) - _, err = s.Put(putPrm) + _, err = sh.Put(putPrm) if err != nil { e.log.Warn("could not put object in shard", - zap.Stringer("shard", s.ID()), + zap.Stringer("shard", sh.ID()), zap.String("error", err.Error()), ) diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 585b9978d..23463544b 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // RngPrm groups the parameters of GetRange operation. @@ -88,7 +87,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) { WithAddress(prm.addr). WithRange(prm.off, prm.ln) - e.iterateOverSortedShards(prm.addr, func(_ int, sh *shard.Shard) (stop bool) { + e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.GetRange(shPrm) if err != nil { switch { @@ -116,13 +115,7 @@ func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) { return true // stop, return it back default: - // TODO: smth wrong with shard, need to be processed, but - // still go to next shard - e.log.Warn("could not get object from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) - + e.reportShardError(sh, "could not get object from shard", err) return false } } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 468eb18cf..f10f31cb2 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -7,7 +7,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" - "go.uber.org/zap" ) // SelectPrm groups the parameters of Select operation. @@ -72,7 +71,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) { WithContainerID(prm.cid). WithFilters(prm.filters) - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.Select(shPrm) if err != nil { switch { @@ -82,11 +81,7 @@ func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) { return true default: - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not select objects from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not select objects from shard", err) return false } } else { @@ -129,14 +124,10 @@ func (e *StorageEngine) list(limit uint64) (*SelectRes, error) { ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh *shard.Shard) (stop bool) { + e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.List() // consider limit result of shard iterator if err != nil { - // TODO: smth wrong with shard, need to be processed - e.log.Warn("could not select objects from shard", - zap.Stringer("shard", sh.ID()), - zap.String("error", err.Error()), - ) + e.reportShardError(sh, "could not select objects from shard", err) } else { for _, addr := range res.AddressList() { // save only unique values if _, ok := uniqueMap[addr.String()]; !ok { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index b43f1723f..d4c4fcde5 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,13 +9,12 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/panjf2000/ants/v2" + "go.uber.org/atomic" ) var errShardNotFound = errors.New("shard not found") -type hashedShard struct { - sh *shard.Shard -} +type hashedShard shardWrapper // AddShard adds a new shard to the storage engine. // @@ -37,10 +36,13 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { strID := id.String() - e.shards[strID] = shard.New(append(opts, - shard.WithID(id), - shard.WithExpiredObjectsCallback(e.processExpiredTombstones), - )...) + e.shards[strID] = shardWrapper{ + errorCount: atomic.NewUint32(0), + Shard: shard.New(append(opts, + shard.WithID(id), + shard.WithExpiredObjectsCallback(e.processExpiredTombstones), + )...), + } e.shardPools[strID] = pool @@ -75,8 +77,8 @@ func (e *StorageEngine) sortShardsByWeight(objAddr fmt.Stringer) []hashedShard { weights := make([]float64, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard{sh}) - weights = append(weights, e.shardWeight(sh)) + shards = append(shards, hashedShard(sh)) + weights = append(weights, e.shardWeight(sh.Shard)) } hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(objAddr.String()))) @@ -91,23 +93,23 @@ func (e *StorageEngine) unsortedShards() []hashedShard { shards := make([]hashedShard, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard{sh}) + shards = append(shards, hashedShard(sh)) } return shards } -func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, *shard.Shard) (stop bool)) { +func (e *StorageEngine) iterateOverSortedShards(addr *object.Address, handler func(int, hashedShard) (stop bool)) { for i, sh := range e.sortShardsByWeight(addr) { - if handler(i, sh.sh) { + if handler(i, sh) { break } } } -func (e *StorageEngine) iterateOverUnsortedShards(handler func(*shard.Shard) (stop bool)) { +func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) { for _, sh := range e.unsortedShards() { - if handler(sh.sh) { + if handler(sh) { break } } @@ -131,6 +133,6 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m shard.Mode) error { func (s hashedShard) Hash() uint64 { return hrw.Hash( - []byte(s.sh.ID().String()), + []byte(s.Shard.ID().String()), ) }