diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 96274e625..83a9b4d4c 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -920,7 +920,6 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log.WithTag(logger.TagWriteCache)), - writecache.WithQoSLimiter(shCfg.limiter), ) } return writeCacheOpts diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d489b8b0d..a607f70f7 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -452,11 +452,20 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error { return err } } - if c.opsLimiter != nil { - s.opsLimiter.Close() - s.opsLimiter = c.opsLimiter + if err := s.setMode(ctx, c.info.Mode); err != nil { + return err + } + s.reloadOpsLimiter(&c) + + return nil +} + +func (s *Shard) reloadOpsLimiter(c *cfg) { + if c.configOpsLimiter != nil { + old := s.opsLimiter.ptr.Swap(&qosLimiterHolder{Limiter: c.configOpsLimiter}) + old.Close() + s.opsLimiter.SetParentID(s.info.ID.String()) } - return s.setMode(ctx, c.info.Mode) } func (s *Shard) lockExclusive() func() { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index d89b56266..f21541d9d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -39,6 +39,8 @@ type Shard struct { rb *rebuilder + opsLimiter *atomicOpsLimiter + gcCancel atomic.Value setModeRequested atomic.Bool writecacheSealCancel atomic.Pointer[writecacheSealCanceler] @@ -100,7 +102,7 @@ type cfg struct { containerInfo container.InfoProvider - opsLimiter qos.Limiter + configOpsLimiter qos.Limiter } func defaultCfg() *cfg { @@ -112,7 +114,7 @@ func defaultCfg() *cfg { zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, zeroCountContainersCallback: func(context.Context, []cid.ID) {}, metricsWriter: noopMetrics{}, - opsLimiter: qos.NewNoopLimiter(), + configOpsLimiter: qos.NewNoopLimiter(), } } @@ -128,10 +130,11 @@ func New(opts ...Option) *Shard { mb := meta.New(c.metaOpts...) s := &Shard{ - cfg: c, - blobStor: bs, - metaBase: mb, - tsSource: c.tsSource, + cfg: c, + blobStor: bs, + metaBase: mb, + tsSource: c.tsSource, + opsLimiter: newAtomicOpsLimiter(c.configOpsLimiter), } reportFunc := func(ctx context.Context, msg string, err error) { @@ -145,7 +148,8 @@ func New(opts ...Option) *Shard { append(c.writeCacheOpts, writecache.WithReportErrorFunc(reportFunc), writecache.WithBlobstor(bs), - writecache.WithMetabase(mb))...) + writecache.WithMetabase(mb), + writecache.WithQoSLimiter(s.opsLimiter))...) s.writeCache.GetMetrics().SetPath(s.writeCache.DumpInfo().Path) } @@ -374,7 +378,7 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option { func WithLimiter(l qos.Limiter) Option { return func(c *cfg) { - c.opsLimiter = l + c.configOpsLimiter = l } } @@ -520,3 +524,39 @@ func (s *Shard) SetEvacuationInProgress(val bool) { s.info.EvacuationInProgress = val s.metricsWriter.SetEvacuationInProgress(val) } + +var _ qos.Limiter = &atomicOpsLimiter{} + +func newAtomicOpsLimiter(l qos.Limiter) *atomicOpsLimiter { + result := &atomicOpsLimiter{} + result.ptr.Store(&qosLimiterHolder{Limiter: l}) + return result +} + +type atomicOpsLimiter struct { + ptr atomic.Pointer[qosLimiterHolder] +} + +func (a *atomicOpsLimiter) Close() { + a.ptr.Load().Close() +} + +func (a *atomicOpsLimiter) ReadRequest(ctx context.Context) (qos.ReleaseFunc, error) { + return a.ptr.Load().ReadRequest(ctx) +} + +func (a *atomicOpsLimiter) SetMetrics(m qos.Metrics) { + a.ptr.Load().SetMetrics(m) +} + +func (a *atomicOpsLimiter) SetParentID(id string) { + a.ptr.Load().SetParentID(id) +} + +func (a *atomicOpsLimiter) WriteRequest(ctx context.Context) (qos.ReleaseFunc, error) { + return a.ptr.Load().WriteRequest(ctx) +} + +type qosLimiterHolder struct { + qos.Limiter +}