package engine import ( "context" "errors" "sync" "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" ) // StorageEngine represents FrostFS local storage engine. type StorageEngine struct { *cfg removeDuplicatesInProgress atomic.Bool mtx sync.RWMutex shards map[string]hashedShard shardPools map[string]util.WorkerPool closeCh chan struct{} setModeCh chan setModeRequest wg sync.WaitGroup blockExec struct { mtx sync.RWMutex err error } evacuateLimiter *evacuationLimiter rebuildLimiter *rebuildLimiter } type shardWrapper struct { errorCount *atomic.Uint32 *shard.Shard } type setModeRequest struct { sh *shard.Shard errorCount uint32 } // setModeLoop listens setModeCh to perform degraded mode transition of a single shard. // Instead of creating a worker per single shard we use a single goroutine. func (e *StorageEngine) setModeLoop() { defer e.wg.Done() var ( mtx sync.RWMutex // protects inProgress map inProgress = make(map[string]struct{}) ) for { select { case <-e.closeCh: return case r := <-e.setModeCh: sid := r.sh.ID().String() mtx.Lock() _, ok := inProgress[sid] if !ok { inProgress[sid] = struct{}{} go func() { e.moveToDegraded(r.sh, r.errorCount) mtx.Lock() delete(inProgress, sid) mtx.Unlock() }() } mtx.Unlock() } } } func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32) { sid := sh.ID() log := e.log.With( zap.Stringer("shard_id", sid), zap.Uint32("error count", errCount)) e.mtx.RLock() defer e.mtx.RUnlock() err := sh.SetMode(mode.DegradedReadOnly) if err != nil { log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly, zap.Error(err)) err = sh.SetMode(mode.ReadOnly) if err != nil { log.Error(logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err)) } else { log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold) } } else { log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold) } } // reportShardErrorBackground increases shard error counter and logs an error. // It is intended to be used from background workers and // doesn't change shard mode because of possible deadlocks. func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err error) { e.mtx.RLock() sh, ok := e.shards[id] e.mtx.RUnlock() if !ok { return } if isLogical(err) { e.log.Warn(msg, zap.Stringer("shard_id", sh.ID()), zap.String("error", err.Error())) return } errCount := sh.errorCount.Add(1) sh.Shard.IncErrorCounter() e.reportShardErrorWithFlags(sh.Shard, errCount, false, msg, err) } // reportShardError checks that the amount of errors doesn't exceed the configured threshold. // If it does, shard is set to read-only mode. func (e *StorageEngine) reportShardError( sh hashedShard, msg string, err error, fields ...zap.Field, ) { if isLogical(err) { e.log.Warn(msg, zap.Stringer("shard_id", sh.ID()), zap.String("error", err.Error())) return } errCount := sh.errorCount.Add(1) sh.Shard.IncErrorCounter() e.reportShardErrorWithFlags(sh.Shard, errCount, true, msg, err, fields...) } func (e *StorageEngine) reportShardErrorWithFlags( sh *shard.Shard, errCount uint32, block bool, msg string, err error, fields ...zap.Field, ) { sid := sh.ID() e.log.Warn(msg, append([]zap.Field{ zap.Stringer("shard_id", sid), zap.Uint32("error count", errCount), zap.String("error", err.Error()), }, fields...)...) if e.errorsThreshold == 0 || errCount < e.errorsThreshold { return } if block { e.moveToDegraded(sh, errCount) } else { req := setModeRequest{ errorCount: errCount, sh: sh, } select { case e.setModeCh <- req: default: // For background workers we can have a lot of such errors, // thus logging is done with DEBUG level. e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest, zap.Stringer("shard_id", sid), zap.Uint32("error_count", errCount)) } } } func isLogical(err error) bool { return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } // Option represents StorageEngine's constructor option. type Option func(*cfg) type cfg struct { log *logger.Logger errorsThreshold uint32 metrics MetricRegister shardPoolSize uint32 lowMem bool rebuildWorkersCount uint32 containerSource atomic.Pointer[containerSource] } func defaultCfg() *cfg { res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, rebuildWorkersCount: 100, } res.containerSource.Store(&containerSource{}) return res } // New creates, initializes and returns new StorageEngine instance. func New(opts ...Option) *StorageEngine { c := defaultCfg() for i := range opts { opts[i](c) } return &StorageEngine{ cfg: c, shards: make(map[string]hashedShard), shardPools: make(map[string]util.WorkerPool), closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), evacuateLimiter: &evacuationLimiter{}, rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount), } } // WithLogger returns option to set StorageEngine's logger. func WithLogger(l *logger.Logger) Option { return func(c *cfg) { c.log = l } } func WithMetrics(v MetricRegister) Option { return func(c *cfg) { c.metrics = v } } // WithShardPoolSize returns option to specify size of worker pool for each shard. func WithShardPoolSize(sz uint32) Option { return func(c *cfg) { c.shardPoolSize = sz } } // WithErrorThreshold returns an option to specify size amount of errors after which // shard is moved to read-only mode. func WithErrorThreshold(sz uint32) Option { return func(c *cfg) { c.errorsThreshold = sz } } // WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance. func WithLowMemoryConsumption(lowMemCons bool) Option { return func(c *cfg) { c.lowMem = lowMemCons } } // WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers. func WithRebuildWorkersCount(count uint32) Option { return func(c *cfg) { c.rebuildWorkersCount = count } } // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) } type containerSource struct { cs container.Source } func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) { select { case <-ctx.Done(): return false, ctx.Err() default: } if s == nil || s.cs == nil { return true, nil } wasRemoved, err := container.WasRemoved(s.cs, id) if err != nil { return false, err } return !wasRemoved, nil }