frostfs-node/pkg/local_object_storage/engine/engine.go
Evgenii Stratonikov 4dc9a1b300
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 2m4s
DCO action / DCO (pull_request) Successful in 2m22s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m10s
Vulncheck / Vulncheck (pull_request) Successful in 4m5s
Build / Build Components (pull_request) Successful in 4m31s
Tests and linters / Staticcheck (pull_request) Successful in 4m21s
Tests and linters / gopls check (pull_request) Successful in 4m43s
Tests and linters / Lint (pull_request) Successful in 4m58s
Tests and linters / Tests (pull_request) Successful in 6m36s
Tests and linters / Tests with -race (pull_request) Successful in 7m41s
[#1413] engine: Remove error counting methods from Shard
All error counting and hangling logic is present on the engine level.
Currently, we pass engine metrics with shard ID metric to shard, then
export 3 methods to manipulate these metrics.
In this commits all methods are removed and error counter is tracked on
the engine level exlusively.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-04 15:10:17 +03:00

286 lines
6.4 KiB
Go

package engine
import (
"context"
"errors"
"sync"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"go.uber.org/zap"
)
// StorageEngine represents FrostFS local storage engine.
type StorageEngine struct {
*cfg
removeDuplicatesInProgress atomic.Bool
mtx sync.RWMutex
shards map[string]hashedShard
shardPools map[string]util.WorkerPool
closeCh chan struct{}
setModeCh chan setModeRequest
wg sync.WaitGroup
blockExec struct {
mtx sync.RWMutex
err error
}
evacuateLimiter *evacuationLimiter
}
type shardWrapper struct {
errorCount *atomic.Uint32
*shard.Shard
}
type setModeRequest struct {
sh *shard.Shard
isMeta bool
errorCount uint32
}
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
// Instead of creating a worker per single shard we use a single goroutine.
func (e *StorageEngine) setModeLoop() {
defer e.wg.Done()
var (
mtx sync.RWMutex // protects inProgress map
inProgress = make(map[string]struct{})
)
for {
select {
case <-e.closeCh:
return
case r := <-e.setModeCh:
sid := r.sh.ID().String()
mtx.Lock()
_, ok := inProgress[sid]
if !ok {
inProgress[sid] = struct{}{}
go func() {
e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
mtx.Lock()
delete(inProgress, sid)
mtx.Unlock()
}()
}
mtx.Unlock()
}
}
}
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
sid := sh.ID()
log := e.log.With(
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount))
e.mtx.RLock()
defer e.mtx.RUnlock()
if isMeta {
err := sh.SetMode(mode.DegradedReadOnly)
if err == nil {
log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
return
}
log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly,
zap.Error(err))
}
err := sh.SetMode(mode.ReadOnly)
if err != nil {
log.Error(logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
return
}
log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
}
// reportShardErrorByID increases shard error counter and logs an error.
func (e *StorageEngine) reportShardErrorByID(id string, msg string, err error) {
e.mtx.RLock()
sh, ok := e.shards[id]
e.mtx.RUnlock()
if !ok {
return
}
e.reportShardError(sh, msg, err)
}
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
// If it does, shard is set to read-only mode.
func (e *StorageEngine) reportShardError(
sh hashedShard,
msg string,
err error,
fields ...zap.Field,
) {
if isLogical(err) {
e.log.Warn(msg,
zap.Stringer("shard_id", sh.ID()),
zap.String("error", err.Error()))
return
}
errCount := sh.errorCount.Add(1)
e.metrics.IncErrorCounter(sh.ID().String())
sid := sh.ID()
e.log.Warn(msg, append([]zap.Field{
zap.Stringer("shard_id", sid),
zap.Uint32("error count", errCount),
zap.String("error", err.Error()),
}, fields...)...)
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
return
}
req := setModeRequest{
errorCount: errCount,
sh: sh.Shard,
isMeta: errors.As(err, new(metaerr.Error)),
}
select {
case e.setModeCh <- req:
default:
// For background workers we can have a lot of such errors,
// thus logging is done with DEBUG level.
e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
zap.Stringer("shard_id", sid),
zap.Uint32("error_count", errCount))
}
}
func isLogical(err error) bool {
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// Option represents StorageEngine's constructor option.
type Option func(*cfg)
type cfg struct {
log *logger.Logger
errorsThreshold uint32
metrics MetricRegister
shardPoolSize uint32
lowMem bool
containerSource atomic.Pointer[containerSource]
}
func defaultCfg() *cfg {
res := &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
metrics: noopMetrics{},
}
res.containerSource.Store(&containerSource{})
return res
}
// New creates, initializes and returns new StorageEngine instance.
func New(opts ...Option) *StorageEngine {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
return &StorageEngine{
cfg: c,
shards: make(map[string]hashedShard),
shardPools: make(map[string]util.WorkerPool),
closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{},
}
}
// WithLogger returns option to set StorageEngine's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = l
}
}
func WithMetrics(v MetricRegister) Option {
return func(c *cfg) {
c.metrics = v
}
}
// WithShardPoolSize returns option to specify size of worker pool for each shard.
func WithShardPoolSize(sz uint32) Option {
return func(c *cfg) {
c.shardPoolSize = sz
}
}
// WithErrorThreshold returns an option to specify size amount of errors after which
// shard is moved to read-only mode.
func WithErrorThreshold(sz uint32) Option {
return func(c *cfg) {
c.errorsThreshold = sz
}
}
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
func WithLowMemoryConsumption(lowMemCons bool) Option {
return func(c *cfg) {
c.lowMem = lowMemCons
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})
}
type containerSource struct {
cs container.Source
}
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if s == nil || s.cs == nil {
return true, nil
}
wasRemoved, err := container.WasRemoved(s.cs, id)
if err != nil {
return false, err
}
return !wasRemoved, nil
}