diff --git a/CHANGELOG.md b/CHANGELOG.md index 35d7abb5c..1505507b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Changelog for NeoFS Node ### Added - `session` flag support to `neofs-cli object hash` (#2029) +- Shard can now change mode when encountering background disk errors (#2035) ### Changed - `object lock` command reads CID and OID the same way other commands do (#1971) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 6470e0d96..420691cd6 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -92,6 +92,9 @@ func (e *StorageEngine) Init() error { return errors.New("failed initialization on all shards") } + e.wg.Add(1) + go e.setModeLoop() + return nil } @@ -100,8 +103,10 @@ var errClosed = errors.New("storage engine is closed") // Close releases all StorageEngine's components. Waits for all data-related operations to complete. // After the call, all the next ones will fail. // -// The method is supposed to be called when the application exits. +// The method MUST only be called when the application exits. func (e *StorageEngine) Close() error { + close(e.closeCh) + defer e.wg.Wait() return e.setBlockExecErr(errClosed) } diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 1d2dee6c0..6929ab200 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -20,7 +20,6 @@ import ( func TestExecBlocks(t *testing.T) { e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many t.Cleanup(func() { - e.Close() os.RemoveAll(t.Name()) }) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index cf61253b4..c7aeed0c7 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -23,6 +23,10 @@ type StorageEngine struct { shardPools map[string]util.WorkerPool + closeCh chan struct{} + setModeCh chan setModeRequest + wg sync.WaitGroup + blockExec struct { mtx sync.RWMutex @@ -35,52 +39,51 @@ type shardWrapper struct { *shard.Shard } -// reportShardErrorBackground increases shard error counter and logs an error. -// It is intended to be used from background workers and -// doesn't change shard mode because of possible deadlocks. -func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) { - e.mtx.RLock() - sh, ok := e.shards[id] - e.mtx.RUnlock() - - if !ok { - return - } - - errCount := sh.errorCount.Inc() - e.log.Warn(msg, - zap.String("shard_id", id), - zap.Uint32("error count", errCount), - zap.String("error", err.Error())) +type setModeRequest struct { + sh *shard.Shard + errorCount uint32 } -// reportShardError checks that the amount of errors doesn't exceed the configured threshold. -// If it does, shard is set to read-only mode. -func (e *StorageEngine) reportShardError( - sh hashedShard, - msg string, - err error, - fields ...zap.Field) { - if isLogical(err) { - e.log.Warn(msg, - zap.Stringer("shard_id", sh.ID()), - zap.String("error", err.Error())) - return +// setModeLoop listens setModeCh to perform degraded mode transition of a single shard. +// Instead of creating a worker per single shard we use a single goroutine. +func (e *StorageEngine) setModeLoop() { + defer e.wg.Done() + + var ( + mtx sync.RWMutex // protects inProgress map + inProgress = make(map[string]struct{}) + ) + + for { + select { + case <-e.closeCh: + return + case r := <-e.setModeCh: + sid := r.sh.ID().String() + + mtx.Lock() + _, ok := inProgress[sid] + if !ok { + inProgress[sid] = struct{}{} + go func() { + e.moveToDegraded(r.sh, r.errorCount) + + mtx.Lock() + delete(inProgress, sid) + mtx.Unlock() + }() + } + mtx.Unlock() + } } +} + +func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) { + e.mtx.RLock() + defer e.mtx.RUnlock() sid := sh.ID() - errCount := sh.errorCount.Inc() - e.log.Warn(msg, append([]zap.Field{ - zap.Stringer("shard_id", sid), - zap.Uint32("error count", errCount), - zap.String("error", err.Error()), - }, fields...)...) - - if e.errorsThreshold == 0 || errCount < e.errorsThreshold { - return - } - - err = sh.SetMode(mode.DegradedReadOnly) + err := sh.SetMode(mode.DegradedReadOnly) if err != nil { e.log.Error("failed to move shard in degraded-read-only mode, moving to read-only", zap.Stringer("shard_id", sid), @@ -105,6 +108,78 @@ func (e *StorageEngine) reportShardError( } } +// reportShardErrorBackground increases shard error counter and logs an error. +// It is intended to be used from background workers and +// doesn't change shard mode because of possible deadlocks. +func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) { + e.mtx.RLock() + sh, ok := e.shards[id] + e.mtx.RUnlock() + + if !ok { + return + } + + errCount := sh.errorCount.Inc() + e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err) +} + +// reportShardError checks that the amount of errors doesn't exceed the configured threshold. +// If it does, shard is set to read-only mode. +func (e *StorageEngine) reportShardError( + sh hashedShard, + msg string, + err error, + fields ...zap.Field) { + if isLogical(err) { + e.log.Warn(msg, + zap.Stringer("shard_id", sh.ID()), + zap.String("error", err.Error())) + return + } + + errCount := sh.errorCount.Inc() + e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...) +} + +func (e *StorageEngine) reportShardErrorWithFlags( + sh *shard.Shard, + errCount uint32, + block bool, + msg string, + err error, + fields ...zap.Field) { + sid := sh.ID() + e.log.Warn(msg, append([]zap.Field{ + zap.Stringer("shard_id", sid), + zap.Uint32("error count", errCount), + zap.String("error", err.Error()), + }, fields...)...) + + if e.errorsThreshold == 0 || errCount < e.errorsThreshold { + return + } + + if block { + e.moveToDegraded(sh, errCount) + } else { + req := setModeRequest{ + errorCount: errCount, + sh: sh, + } + + select { + case e.setModeCh <- req: + default: + // For background workers we can have a lot of such errors, + // thus logging is done with DEBUG level. + e.log.Debug("mode change is in progress, ignoring set-mode request", + zap.Stringer("shard_id", sid), + zap.Uint32("error_count", errCount)) + } + } +} + func isLogical(err error) bool { return errors.As(err, &logicerr.Logical{}) } @@ -143,6 +218,8 @@ func New(opts ...Option) *StorageEngine { mtx: new(sync.RWMutex), shards: make(map[string]shardWrapper), shardPools: make(map[string]util.WorkerPool), + closeCh: make(chan struct{}), + setModeCh: make(chan setModeRequest), } } diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index b6bbc1bca..cd0d10658 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -200,8 +200,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint sh := e.shards[id.String()] e.mtx.RUnlock() - require.Equal(t, mode, sh.GetMode()) require.Equal(t, errCount, sh.errorCount.Load()) + require.Equal(t, mode, sh.GetMode()) } // corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.