forked from TrueCloudLab/frostfs-node
[#2035] engine: Allow moving to degraded from background workers
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
This commit is contained in:
parent
f6f911b50c
commit
2ba3abde5c
5 changed files with 126 additions and 44 deletions
|
@ -5,6 +5,7 @@ Changelog for NeoFS Node
|
|||
|
||||
### Added
|
||||
- `session` flag support to `neofs-cli object hash` (#2029)
|
||||
- Shard can now change mode when encountering background disk errors (#2035)
|
||||
|
||||
### Changed
|
||||
- `object lock` command reads CID and OID the same way other commands do (#1971)
|
||||
|
|
|
@ -92,6 +92,9 @@ func (e *StorageEngine) Init() error {
|
|||
return errors.New("failed initialization on all shards")
|
||||
}
|
||||
|
||||
e.wg.Add(1)
|
||||
go e.setModeLoop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -100,8 +103,10 @@ var errClosed = errors.New("storage engine is closed")
|
|||
// Close releases all StorageEngine's components. Waits for all data-related operations to complete.
|
||||
// After the call, all the next ones will fail.
|
||||
//
|
||||
// The method is supposed to be called when the application exits.
|
||||
// The method MUST only be called when the application exits.
|
||||
func (e *StorageEngine) Close() error {
|
||||
close(e.closeCh)
|
||||
defer e.wg.Wait()
|
||||
return e.setBlockExecErr(errClosed)
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
func TestExecBlocks(t *testing.T) {
|
||||
e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many
|
||||
t.Cleanup(func() {
|
||||
e.Close()
|
||||
os.RemoveAll(t.Name())
|
||||
})
|
||||
|
||||
|
|
|
@ -23,6 +23,10 @@ type StorageEngine struct {
|
|||
|
||||
shardPools map[string]util.WorkerPool
|
||||
|
||||
closeCh chan struct{}
|
||||
setModeCh chan setModeRequest
|
||||
wg sync.WaitGroup
|
||||
|
||||
blockExec struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
|
@ -35,52 +39,51 @@ type shardWrapper struct {
|
|||
*shard.Shard
|
||||
}
|
||||
|
||||
// reportShardErrorBackground increases shard error counter and logs an error.
|
||||
// It is intended to be used from background workers and
|
||||
// doesn't change shard mode because of possible deadlocks.
|
||||
func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[id]
|
||||
e.mtx.RUnlock()
|
||||
type setModeRequest struct {
|
||||
sh *shard.Shard
|
||||
errorCount uint32
|
||||
}
|
||||
|
||||
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
|
||||
// Instead of creating a worker per single shard we use a single goroutine.
|
||||
func (e *StorageEngine) setModeLoop() {
|
||||
defer e.wg.Done()
|
||||
|
||||
var (
|
||||
mtx sync.RWMutex // protects inProgress map
|
||||
inProgress = make(map[string]struct{})
|
||||
)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-e.closeCh:
|
||||
return
|
||||
case r := <-e.setModeCh:
|
||||
sid := r.sh.ID().String()
|
||||
|
||||
mtx.Lock()
|
||||
_, ok := inProgress[sid]
|
||||
if !ok {
|
||||
return
|
||||
inProgress[sid] = struct{}{}
|
||||
go func() {
|
||||
e.moveToDegraded(r.sh, r.errorCount)
|
||||
|
||||
mtx.Lock()
|
||||
delete(inProgress, sid)
|
||||
mtx.Unlock()
|
||||
}()
|
||||
}
|
||||
mtx.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errCount := sh.errorCount.Inc()
|
||||
e.log.Warn(msg,
|
||||
zap.String("shard_id", id),
|
||||
zap.Uint32("error count", errCount),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
|
||||
// If it does, shard is set to read-only mode.
|
||||
func (e *StorageEngine) reportShardError(
|
||||
sh hashedShard,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field) {
|
||||
if isLogical(err) {
|
||||
e.log.Warn(msg,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) {
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
sid := sh.ID()
|
||||
errCount := sh.errorCount.Inc()
|
||||
e.log.Warn(msg, append([]zap.Field{
|
||||
zap.Stringer("shard_id", sid),
|
||||
zap.Uint32("error count", errCount),
|
||||
zap.String("error", err.Error()),
|
||||
}, fields...)...)
|
||||
|
||||
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
err = sh.SetMode(mode.DegradedReadOnly)
|
||||
err := sh.SetMode(mode.DegradedReadOnly)
|
||||
if err != nil {
|
||||
e.log.Error("failed to move shard in degraded-read-only mode, moving to read-only",
|
||||
zap.Stringer("shard_id", sid),
|
||||
|
@ -105,6 +108,78 @@ func (e *StorageEngine) reportShardError(
|
|||
}
|
||||
}
|
||||
|
||||
// reportShardErrorBackground increases shard error counter and logs an error.
|
||||
// It is intended to be used from background workers and
|
||||
// doesn't change shard mode because of possible deadlocks.
|
||||
func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) {
|
||||
e.mtx.RLock()
|
||||
sh, ok := e.shards[id]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
errCount := sh.errorCount.Inc()
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err)
|
||||
}
|
||||
|
||||
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
|
||||
// If it does, shard is set to read-only mode.
|
||||
func (e *StorageEngine) reportShardError(
|
||||
sh hashedShard,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field) {
|
||||
if isLogical(err) {
|
||||
e.log.Warn(msg,
|
||||
zap.Stringer("shard_id", sh.ID()),
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
errCount := sh.errorCount.Inc()
|
||||
e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...)
|
||||
}
|
||||
|
||||
func (e *StorageEngine) reportShardErrorWithFlags(
|
||||
sh *shard.Shard,
|
||||
errCount uint32,
|
||||
block bool,
|
||||
msg string,
|
||||
err error,
|
||||
fields ...zap.Field) {
|
||||
sid := sh.ID()
|
||||
e.log.Warn(msg, append([]zap.Field{
|
||||
zap.Stringer("shard_id", sid),
|
||||
zap.Uint32("error count", errCount),
|
||||
zap.String("error", err.Error()),
|
||||
}, fields...)...)
|
||||
|
||||
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
if block {
|
||||
e.moveToDegraded(sh, errCount)
|
||||
} else {
|
||||
req := setModeRequest{
|
||||
errorCount: errCount,
|
||||
sh: sh,
|
||||
}
|
||||
|
||||
select {
|
||||
case e.setModeCh <- req:
|
||||
default:
|
||||
// For background workers we can have a lot of such errors,
|
||||
// thus logging is done with DEBUG level.
|
||||
e.log.Debug("mode change is in progress, ignoring set-mode request",
|
||||
zap.Stringer("shard_id", sid),
|
||||
zap.Uint32("error_count", errCount))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isLogical(err error) bool {
|
||||
return errors.As(err, &logicerr.Logical{})
|
||||
}
|
||||
|
@ -143,6 +218,8 @@ func New(opts ...Option) *StorageEngine {
|
|||
mtx: new(sync.RWMutex),
|
||||
shards: make(map[string]shardWrapper),
|
||||
shardPools: make(map[string]util.WorkerPool),
|
||||
closeCh: make(chan struct{}),
|
||||
setModeCh: make(chan setModeRequest),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -200,8 +200,8 @@ func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint
|
|||
sh := e.shards[id.String()]
|
||||
e.mtx.RUnlock()
|
||||
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
require.Equal(t, errCount, sh.errorCount.Load())
|
||||
require.Equal(t, mode, sh.GetMode())
|
||||
}
|
||||
|
||||
// corruptSubDir makes random directory except "blobovnicza" in blobstor FSTree unreadable.
|
||||
|
|
Loading…
Reference in a new issue