2020-11-17 12:26:03 +00:00
|
|
|
package engine
|
|
|
|
|
|
|
|
import (
|
2023-08-16 07:39:41 +00:00
|
|
|
"context"
|
2022-10-26 12:23:12 +00:00
|
|
|
"errors"
|
2020-11-17 12:26:03 +00:00
|
|
|
"sync"
|
2023-05-19 15:06:20 +00:00
|
|
|
"sync/atomic"
|
2020-11-17 12:26:03 +00:00
|
|
|
|
2023-04-12 14:35:10 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
2023-12-27 15:58:36 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
2024-06-11 14:05:21 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
2023-03-07 13:38:26 +00:00
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
2023-12-27 15:58:36 +00:00
|
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
2020-11-17 12:26:03 +00:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
2023-02-05 15:59:38 +00:00
|
|
|
// StorageEngine represents FrostFS local storage engine.
|
2020-11-17 12:26:03 +00:00
|
|
|
type StorageEngine struct {
|
|
|
|
*cfg
|
|
|
|
|
2023-03-30 11:49:15 +00:00
|
|
|
removeDuplicatesInProgress atomic.Bool
|
|
|
|
|
2023-05-30 07:14:37 +00:00
|
|
|
mtx sync.RWMutex
|
2020-11-17 12:26:03 +00:00
|
|
|
|
2023-02-27 13:16:37 +00:00
|
|
|
shards map[string]hashedShard
|
2021-10-08 12:25:45 +00:00
|
|
|
|
|
|
|
shardPools map[string]util.WorkerPool
|
2021-11-09 15:46:12 +00:00
|
|
|
|
2022-11-10 10:58:46 +00:00
|
|
|
closeCh chan struct{}
|
|
|
|
setModeCh chan setModeRequest
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
2021-11-09 15:46:12 +00:00
|
|
|
blockExec struct {
|
|
|
|
mtx sync.RWMutex
|
|
|
|
|
|
|
|
err error
|
|
|
|
}
|
2023-05-04 10:58:26 +00:00
|
|
|
evacuateLimiter *evacuationLimiter
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
|
|
|
|
2022-01-31 14:58:32 +00:00
|
|
|
type shardWrapper struct {
|
|
|
|
errorCount *atomic.Uint32
|
|
|
|
*shard.Shard
|
|
|
|
}
|
|
|
|
|
2022-11-10 10:58:46 +00:00
|
|
|
type setModeRequest struct {
|
|
|
|
sh *shard.Shard
|
2024-06-11 14:05:21 +00:00
|
|
|
isMeta bool
|
2022-11-10 10:58:46 +00:00
|
|
|
errorCount uint32
|
|
|
|
}
|
|
|
|
|
|
|
|
// setModeLoop listens setModeCh to perform degraded mode transition of a single shard.
|
|
|
|
// Instead of creating a worker per single shard we use a single goroutine.
|
|
|
|
func (e *StorageEngine) setModeLoop() {
|
|
|
|
defer e.wg.Done()
|
|
|
|
|
|
|
|
var (
|
|
|
|
mtx sync.RWMutex // protects inProgress map
|
|
|
|
inProgress = make(map[string]struct{})
|
|
|
|
)
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-e.closeCh:
|
|
|
|
return
|
|
|
|
case r := <-e.setModeCh:
|
|
|
|
sid := r.sh.ID().String()
|
|
|
|
|
|
|
|
mtx.Lock()
|
|
|
|
_, ok := inProgress[sid]
|
|
|
|
if !ok {
|
|
|
|
inProgress[sid] = struct{}{}
|
|
|
|
go func() {
|
2024-06-11 14:05:21 +00:00
|
|
|
e.moveToDegraded(r.sh, r.errorCount, r.isMeta)
|
2022-11-10 10:58:46 +00:00
|
|
|
|
|
|
|
mtx.Lock()
|
|
|
|
delete(inProgress, sid)
|
|
|
|
mtx.Unlock()
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
mtx.Unlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-06-11 14:05:21 +00:00
|
|
|
func (e *StorageEngine) moveToDegraded(sh *shard.Shard, errCount uint32, isMeta bool) {
|
2023-06-15 08:42:54 +00:00
|
|
|
sid := sh.ID()
|
|
|
|
log := e.log.With(
|
|
|
|
zap.Stringer("shard_id", sid),
|
|
|
|
zap.Uint32("error count", errCount))
|
|
|
|
|
2022-11-10 10:58:46 +00:00
|
|
|
e.mtx.RLock()
|
|
|
|
defer e.mtx.RUnlock()
|
|
|
|
|
2024-06-11 14:05:21 +00:00
|
|
|
if isMeta {
|
|
|
|
err := sh.SetMode(mode.DegradedReadOnly)
|
|
|
|
if err == nil {
|
|
|
|
log.Info(logs.EngineShardIsMovedInDegradedModeDueToErrorThreshold)
|
|
|
|
return
|
|
|
|
}
|
2023-06-15 08:42:54 +00:00
|
|
|
log.Error(logs.EngineFailedToMoveShardInDegradedreadonlyModeMovingToReadonly,
|
2022-11-10 10:58:46 +00:00
|
|
|
zap.Error(err))
|
2024-06-11 14:05:21 +00:00
|
|
|
}
|
2022-11-10 10:58:46 +00:00
|
|
|
|
2024-06-11 14:05:21 +00:00
|
|
|
err := sh.SetMode(mode.ReadOnly)
|
|
|
|
if err != nil {
|
|
|
|
log.Error(logs.EngineFailedToMoveShardInReadonlyMode, zap.Error(err))
|
|
|
|
return
|
2022-11-10 10:58:46 +00:00
|
|
|
}
|
2024-06-11 14:05:21 +00:00
|
|
|
|
|
|
|
log.Info(logs.EngineShardIsMovedInReadonlyModeDueToErrorThreshold)
|
2022-11-10 10:58:46 +00:00
|
|
|
}
|
|
|
|
|
2024-10-04 11:58:45 +00:00
|
|
|
// reportShardErrorByID increases shard error counter and logs an error.
|
|
|
|
func (e *StorageEngine) reportShardErrorByID(id string, msg string, err error) {
|
2022-10-20 10:40:25 +00:00
|
|
|
e.mtx.RLock()
|
|
|
|
sh, ok := e.shards[id]
|
|
|
|
e.mtx.RUnlock()
|
|
|
|
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-10-04 11:58:45 +00:00
|
|
|
e.reportShardError(sh, msg, err)
|
2022-10-20 10:40:25 +00:00
|
|
|
}
|
|
|
|
|
2022-04-21 11:28:05 +00:00
|
|
|
// reportShardError checks that the amount of errors doesn't exceed the configured threshold.
|
2022-01-31 14:58:32 +00:00
|
|
|
// If it does, shard is set to read-only mode.
|
|
|
|
func (e *StorageEngine) reportShardError(
|
|
|
|
sh hashedShard,
|
|
|
|
msg string,
|
|
|
|
err error,
|
2023-10-31 11:56:55 +00:00
|
|
|
fields ...zap.Field,
|
|
|
|
) {
|
2022-10-26 12:23:12 +00:00
|
|
|
if isLogical(err) {
|
|
|
|
e.log.Warn(msg,
|
|
|
|
zap.Stringer("shard_id", sh.ID()),
|
|
|
|
zap.String("error", err.Error()))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-05-19 15:06:20 +00:00
|
|
|
errCount := sh.errorCount.Add(1)
|
2023-06-01 14:28:04 +00:00
|
|
|
sh.Shard.IncErrorCounter()
|
2022-11-10 10:58:46 +00:00
|
|
|
|
|
|
|
sid := sh.ID()
|
2022-01-31 14:58:32 +00:00
|
|
|
e.log.Warn(msg, append([]zap.Field{
|
2022-10-24 08:31:56 +00:00
|
|
|
zap.Stringer("shard_id", sid),
|
2022-01-31 14:58:32 +00:00
|
|
|
zap.Uint32("error count", errCount),
|
|
|
|
zap.String("error", err.Error()),
|
|
|
|
}, fields...)...)
|
|
|
|
|
|
|
|
if e.errorsThreshold == 0 || errCount < e.errorsThreshold {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-04-10 08:49:31 +00:00
|
|
|
req := setModeRequest{
|
|
|
|
errorCount: errCount,
|
2024-10-04 11:58:45 +00:00
|
|
|
sh: sh.Shard,
|
2024-06-11 14:05:21 +00:00
|
|
|
isMeta: errors.As(err, new(metaerr.Error)),
|
2024-04-10 08:49:31 +00:00
|
|
|
}
|
2022-10-24 08:31:56 +00:00
|
|
|
|
2024-04-10 08:49:31 +00:00
|
|
|
select {
|
|
|
|
case e.setModeCh <- req:
|
|
|
|
default:
|
|
|
|
// For background workers we can have a lot of such errors,
|
|
|
|
// thus logging is done with DEBUG level.
|
|
|
|
e.log.Debug(logs.EngineModeChangeIsInProgressIgnoringSetmodeRequest,
|
|
|
|
zap.Stringer("shard_id", sid),
|
|
|
|
zap.Uint32("error_count", errCount))
|
2022-01-31 14:58:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-26 12:23:12 +00:00
|
|
|
func isLogical(err error) bool {
|
2023-08-16 07:39:41 +00:00
|
|
|
return errors.As(err, &logicerr.Logical{}) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
2022-10-26 12:23:12 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 12:26:03 +00:00
|
|
|
// Option represents StorageEngine's constructor option.
|
|
|
|
type Option func(*cfg)
|
|
|
|
|
|
|
|
type cfg struct {
|
|
|
|
log *logger.Logger
|
2021-03-15 13:09:27 +00:00
|
|
|
|
2022-01-31 14:58:32 +00:00
|
|
|
errorsThreshold uint32
|
|
|
|
|
2021-03-16 08:14:56 +00:00
|
|
|
metrics MetricRegister
|
2021-10-08 12:25:45 +00:00
|
|
|
|
|
|
|
shardPoolSize uint32
|
2023-06-22 07:46:56 +00:00
|
|
|
|
|
|
|
lowMem bool
|
2023-10-03 08:58:35 +00:00
|
|
|
|
2023-12-27 15:58:36 +00:00
|
|
|
containerSource atomic.Pointer[containerSource]
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func defaultCfg() *cfg {
|
2023-12-27 15:58:36 +00:00
|
|
|
res := &cfg{
|
2024-08-29 10:51:09 +00:00
|
|
|
log: &logger.Logger{Logger: zap.L()},
|
|
|
|
shardPoolSize: 20,
|
2024-10-03 07:40:56 +00:00
|
|
|
metrics: noopMetrics{},
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
2023-12-27 15:58:36 +00:00
|
|
|
res.containerSource.Store(&containerSource{})
|
|
|
|
return res
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// New creates, initializes and returns new StorageEngine instance.
|
|
|
|
func New(opts ...Option) *StorageEngine {
|
|
|
|
c := defaultCfg()
|
|
|
|
|
|
|
|
for i := range opts {
|
|
|
|
opts[i](c)
|
|
|
|
}
|
|
|
|
|
|
|
|
return &StorageEngine{
|
2023-05-30 07:14:37 +00:00
|
|
|
cfg: c,
|
|
|
|
shards: make(map[string]hashedShard),
|
|
|
|
shardPools: make(map[string]util.WorkerPool),
|
|
|
|
closeCh: make(chan struct{}),
|
|
|
|
setModeCh: make(chan setModeRequest),
|
|
|
|
evacuateLimiter: &evacuationLimiter{},
|
2020-11-17 12:26:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithLogger returns option to set StorageEngine's logger.
|
|
|
|
func WithLogger(l *logger.Logger) Option {
|
|
|
|
return func(c *cfg) {
|
|
|
|
c.log = l
|
|
|
|
}
|
|
|
|
}
|
2021-03-15 13:09:27 +00:00
|
|
|
|
2021-03-16 08:14:56 +00:00
|
|
|
func WithMetrics(v MetricRegister) Option {
|
2021-03-15 13:09:27 +00:00
|
|
|
return func(c *cfg) {
|
2021-03-16 08:14:56 +00:00
|
|
|
c.metrics = v
|
2021-03-15 13:09:27 +00:00
|
|
|
}
|
|
|
|
}
|
2021-10-08 12:25:45 +00:00
|
|
|
|
|
|
|
// WithShardPoolSize returns option to specify size of worker pool for each shard.
|
|
|
|
func WithShardPoolSize(sz uint32) Option {
|
|
|
|
return func(c *cfg) {
|
|
|
|
c.shardPoolSize = sz
|
|
|
|
}
|
|
|
|
}
|
2022-01-31 14:58:32 +00:00
|
|
|
|
|
|
|
// WithErrorThreshold returns an option to specify size amount of errors after which
|
|
|
|
// shard is moved to read-only mode.
|
|
|
|
func WithErrorThreshold(sz uint32) Option {
|
|
|
|
return func(c *cfg) {
|
|
|
|
c.errorsThreshold = sz
|
|
|
|
}
|
|
|
|
}
|
2023-06-22 07:46:56 +00:00
|
|
|
|
|
|
|
// WithLowMemoryConsumption returns an option to set the flag to reduce memory consumption by reducing performance.
|
|
|
|
func WithLowMemoryConsumption(lowMemCons bool) Option {
|
|
|
|
return func(c *cfg) {
|
|
|
|
c.lowMem = lowMemCons
|
|
|
|
}
|
|
|
|
}
|
2023-10-03 08:58:35 +00:00
|
|
|
|
2023-12-27 15:58:36 +00:00
|
|
|
// SetContainerSource sets container source.
|
|
|
|
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
|
|
|
e.containerSource.Store(&containerSource{cs: cs})
|
|
|
|
}
|
|
|
|
|
|
|
|
type containerSource struct {
|
|
|
|
cs container.Source
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return false, ctx.Err()
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
if s == nil || s.cs == nil {
|
|
|
|
return true, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
wasRemoved, err := container.WasRemoved(s.cs, id)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
return !wasRemoved, nil
|
|
|
|
}
|