[#2035] engine: Allow moving to degraded from background workers

Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
Evgenii Stratonikov 2022-11-10 13:58:46 +03:00 committed by fyrchik
parent c85bea15ef
commit f2d7e65e39
5 changed files with 126 additions and 44 deletions

View file

@ -7,6 +7,7 @@ Changelog for NeoFS Node
- `morph list-containers` in `neofs-adm` (#1689) - `morph list-containers` in `neofs-adm` (#1689)
- `--binary` flag in `neofs-cli object put/get/delete` commands (#1338) - `--binary` flag in `neofs-cli object put/get/delete` commands (#1338)
- `session` flag support to `neofs-cli object hash` (#2029) - `session` flag support to `neofs-cli object hash` (#2029)
- Shard can now change mode when encountering background disk errors (#2035)
### Changed ### Changed
- `object lock` command reads CID and OID the same way other commands do (#1971) - `object lock` command reads CID and OID the same way other commands do (#1971)

View file

@ -92,6 +92,9 @@ func (e *StorageEngine) Init() error {
return errors.New("failed initialization on all shards") return errors.New("failed initialization on all shards")
} }
e.wg.Add(1)
go e.setModeLoop()
return nil return nil
} }
@ -100,8 +103,10 @@ var errClosed = errors.New("storage engine is closed")
// Close releases all StorageEngine's components. Waits for all data-related operations to complete. // Close releases all StorageEngine's components. Waits for all data-related operations to complete.
// After the call, all the next ones will fail. // After the call, all the next ones will fail.
// //
// The method is supposed to be called when the application exits. // The method MUST only be called when the application exits.
func (e *StorageEngine) Close() error { func (e *StorageEngine) Close() error {
close(e.closeCh)
defer e.wg.Wait()
return e.setBlockExecErr(errClosed) return e.setBlockExecErr(errClosed)
} }

View file

@ -20,7 +20,6 @@ import (
func TestExecBlocks(t *testing.T) { func TestExecBlocks(t *testing.T) {
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
t.Cleanup(func() { t.Cleanup(func() {
e.Close()
os.RemoveAll(t.Name()) os.RemoveAll(t.Name())
}) })

View file

@ -23,6 +23,10 @@ type StorageEngine struct {
shardPools map[string]util.WorkerPool shardPools map[string]util.WorkerPool
closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
blockExec struct { blockExec struct {
mtx sync.RWMutex mtx sync.RWMutex
@ -35,52 +39,51 @@ type shardWrapper struct {
*shard.Shard *shard.Shard
} }
// reportShardErrorBackground increases shard error counter and logs an error. type setModeRequest struct {
// It is intended to be used from background workers and sh *shard.Shard
// doesn't change shard mode because of possible deadlocks. errorCount uint32
func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
e.mtx.RLock()
sh, ok := e.shards[id]
e.mtx.RUnlock()
if !ok {
return
}
errCount := sh.errorCount.Inc()
e.log.Warn(msg,
zap.String("shard_id", id),
zap.Uint32("error count", errCount),
zap.String("error", err.Error()))
} }
// reportShardError checks that the amount of errors doesn't exceed the configured threshold. // setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// If it does, shard is set to read-only mode. // Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) reportShardError( func (e *StorageEngine) setModeLoop() {
sh hashedShard, defer e.wg.Done()
msg string,
err error, var (
fields ...zap.Field) { mtx sync.RWMutex // protects inProgress map
if isLogical(err) { inProgress = make(map[string]struct{})
e.log.Warn(msg, )
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error())) for {
select {
case <-e.closeCh:
return return
case r := <-e.setModeCh:
sid := r.sh.ID().String()
mtx.Lock()
_, ok := inProgress[sid]
if !ok {
inProgress[sid] = struct{}{}
go func() {
e.moveToDegraded(r.sh, r.errorCount)
mtx.Lock()
delete(inProgress, sid)
mtx.Unlock()
}()
} }
mtx.Unlock()
}
}
}
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) {
e.mtx.RLock()
defer e.mtx.RUnlock()
sid := sh.ID() sid := sh.ID()
errCount := sh.errorCount.Inc() err := sh.SetMode(mode.DegradedReadOnly)
e.log.Warn(msg, append([]zap.Field{
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount),
zap.String("error", err.Error()),
}, fields...)...)
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
return
}
err = sh.SetMode(mode.DegradedReadOnly)
if err != nil { if err != nil {
e.log.Error("failed to move shard in degraded-read-only mode, moving to read-only", e.log.Error("failed to move shard in degraded-read-only mode, moving to read-only",
zap.Stringer("shard_id", sid), zap.Stringer("shard_id", sid),
@ -105,6 +108,78 @@ func (e *StorageEngine) reportShardError(
} }
} }
// reportShardErrorBackground increases shard error counter and logs an error.
// It is intended to be used from background workers and
// doesn't change shard mode because of possible deadlocks.
func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
e.mtx.RLock()
sh, ok := e.shards[id]
e.mtx.RUnlock()
if !ok {
return
}
errCount := sh.errorCount.Inc()
e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
}
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
// If it does, shard is set to read-only mode.
func (e *StorageEngine) reportShardError(
sh hashedShard,
msg string,
err error,
fields ...zap.Field) {
if isLogical(err) {
e.log.Warn(msg,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()))
return
}
errCount := sh.errorCount.Inc()
e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
}
func (e *StorageEngine) reportShardErrorWithFlags(
sh *shard.Shard,
errCount uint32,
block bool,
msg string,
err error,
fields ...zap.Field) {
sid := sh.ID()
e.log.Warn(msg, append([]zap.Field{
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount),
zap.String("error", err.Error()),
}, fields...)...)
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
return
}
if block {
e.moveToDegraded(sh, errCount)
} else {
req := setModeRequest{
errorCount: errCount,
sh: sh,
}
select {
case e.setModeCh <- req:
default:
// For background workers we can have a lot of such errors,
// thus logging is done with DEBUG level.
e.log.Debug("mode change is in progress, ignoring set-mode request",
zap.Stringer("shard_id", sid),
zap.Uint32("error_count", errCount))
}
}
}
func isLogical(err error) bool { func isLogical(err error) bool {
return errors.As(err, &logicerr.Logical{}) return errors.As(err, &logicerr.Logical{})
} }
@ -143,6 +218,8 @@ func New(opts ...Option) *StorageEngine {
mtx: new(sync.RWMutex), mtx: new(sync.RWMutex),
shards: make(map[string]shardWrapper), shards: make(map[string]shardWrapper),
shardPools: make(map[string]util.WorkerPool), shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
} }
} }

View file

@ -200,8 +200,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
sh := e.shards[id.String()] sh := e.shards[id.String()]
e.mtx.RUnlock() e.mtx.RUnlock()
require.Equal(t, mode, sh.GetMode())
require.Equal(t, errCount, sh.errorCount.Load()) require.Equal(t, errCount, sh.errorCount.Load())
require.Equal(t, mode, sh.GetMode())
} }
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable. // corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.