frostfs-node/pkg/local_object_storage/engine/engine.go
Dmitrii Stepanov 7429553266
[] node: Fix contextcheck linter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-13 10:36:10 +03:00

287 lines
6.6 KiB
Go

package engine
import (
"context"
"errors"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
// StorageEngine represents FrostFS local storage engine.
type StorageEngine struct {
*cfg
removeDuplicatesInProgress atomic.Bool
mtx sync.RWMutex
shards map[string]hashedShard
shardPools map[string]util.WorkerPool
closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
blockExec struct {
mtx sync.RWMutex
err error
}
evacuateLimiter *evacuationLimiter
}
type shardWrapper struct {
errorCount *atomic.Uint32
*shard.Shard
}
type setModeRequest struct {
sh *shard.Shard
isMeta bool
errorCount uint32
}
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) setModeLoop(ctx context.Context) {
defer e.wg.Done()
var (
mtx sync.RWMutex // protects inProgress map
inProgress = make(map[string]struct{})
)
for {
select {
case <-e.closeCh:
return
case r := <-e.setModeCh:
sid := r.sh.ID().String()
mtx.Lock()
_, ok := inProgress[sid]
if !ok {
inProgress[sid] = struct{}{}
go func() {
e.moveToDegraded(ctx, r.sh, r.errorCount, r.isMeta)
mtx.Lock()
delete(inProgress, sid)
mtx.Unlock()
}()
}
mtx.Unlock()
}
}
}
func (e *StorageEngine) moveToDegraded(ctx context.Context, sh *shard.Shard, errCount uint32, isMeta bool) {
sid := sh.ID()
log := e.log.With(
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount))
e.mtx.RLock()
defer e.mtx.RUnlock()
if isMeta {
err := sh.SetMode(ctx, mode.DegradedReadOnly)
if err == nil {
log.Info(ctx, logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
return
}
log.Error(ctx, logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly,
zap.Error(err))
}
err := sh.SetMode(ctx, mode.ReadOnly)
if err != nil {
log.Error(ctx, logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
return
}
log.Info(ctx, logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
}
// reportShardErrorByID increases shard error counter and logs an error.
func (e *StorageEngine) reportShardErrorByID(ctx context.Context, id string, msg string, err error) {
e.mtx.RLock()
sh, ok := e.shards[id]
e.mtx.RUnlock()
if !ok {
return
}
e.reportShardError(ctx, sh, msg, err)
}
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
// If it does, shard is set to read-only mode.
func (e *StorageEngine) reportShardError(
ctx context.Context,
sh hashedShard,
msg string,
err error,
fields ...zap.Field,
) {
if isLogical(err) {
e.log.Warn(ctx, msg,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()))
return
}
errCount := sh.errorCount.Add(1)
e.metrics.IncErrorCounter(sh.ID().String())
sid := sh.ID()
e.log.Warn(ctx, msg, append([]zap.Field{
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount),
zap.String("error", err.Error()),
}, fields...)...)
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
return
}
req := setModeRequest{
errorCount: errCount,
sh: sh.Shard,
isMeta: errors.As(err, new(metaerr.Error)),
}
select {
case e.setModeCh <- req:
default:
// For background workers we can have a lot of such errors,
// thus logging is done with DEBUG level.
e.log.Debug(ctx, logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
zap.Stringer("shard_id", sid),
zap.Uint32("error_count", errCount))
}
}
func isLogical(err error) bool {
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// Option represents StorageEngine's constructor option.
type Option func(*cfg)
type cfg struct {
log *logger.Logger
errorsThreshold uint32
metrics MetricRegister
shardPoolSize uint32
lowMem bool
containerSource atomic.Pointer[containerSource]
}
func defaultCfg() *cfg {
res := &cfg{
log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20,
metrics: noopMetrics{},
}
res.containerSource.Store(&containerSource{})
return res
}
// New creates, initializes and returns new StorageEngine instance.
func New(opts ...Option) *StorageEngine {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &StorageEngine{
cfg: c,
shards: make(map[string]hashedShard),
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
}
}
// WithLogger returns option to set StorageEngine's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithMetrics(v MetricRegister) Option {
return func(c *cfg) {
c.metrics = v
}
}
// WithShardPoolSize returns option to specify size of worker pool for each shard.
func WithShardPoolSize(sz uint32) Option {
return func(c *cfg) {
c.shardPoolSize = sz
}
}
// WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option {
return func(c *cfg) {
c.errorsThreshold = sz
}
}
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
func WithLowMemoryConsumption(lowMemCons bool) Option {
return func(c *cfg) {
c.lowMem = lowMemCons
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})
}
type containerSource struct {
cs container.Source
}
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if s == nil || s.cs == nil {
return true, nil
}
wasRemoved, err := container.WasRemoved(s.cs, id)
if err != nil {
return false, err
}
return !wasRemoved, nil
}