From b01f05c951eeb97ef0e63a63de324e644fad8958 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 6 May 2025 12:08:38 +0300 Subject: [PATCH] [#1689] shard: Use single qos.Limiter instance for shard and writecache Before fix shard and writecache used the same instance of qos.Limiter. In case of SIGHUP signal shard was closing qos.Limiter, but didn't update writecache's pointer. Now shard uses atomic pointer to qos.Limiter and shares it with writecache. On SIGHUP shard updates atomic pointer value and closes old qos.Limiter. Change-Id: Ic2ab62441d3872e71c5771f54d070e0ca48fe375 Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 1 - pkg/local_object_storage/shard/control.go | 17 +++++-- pkg/local_object_storage/shard/shard.go | 56 +++++++++++++++++++---- 3 files changed, 61 insertions(+), 13 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 96274e625..83a9b4d4c 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -920,7 +920,6 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log.WithTag(logger.TagWriteCache)), - writecache.WithQoSLimiter(shCfg.limiter), ) } return writeCacheOpts diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index d489b8b0d..a607f70f7 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -452,11 +452,20 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error { return err } } - if c.opsLimiter != nil { - s.opsLimiter.Close() - s.opsLimiter = c.opsLimiter + if err := s.setMode(ctx, c.info.Mode); err != nil { + return err + } + s.reloadOpsLimiter(&c) + + return nil +} + +func (s *Shard) reloadOpsLimiter(c *cfg) { + if c.configOpsLimiter != nil { + old := s.opsLimiter.ptr.Swap(&qosLimiterHolder{Limiter: c.configOpsLimiter}) + old.Close() + s.opsLimiter.SetParentID(s.info.ID.String()) } - return s.setMode(ctx, c.info.Mode) } func (s *Shard) lockExclusive() func() { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index d89b56266..f21541d9d 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -39,6 +39,8 @@ type Shard struct { rb *rebuilder + opsLimiter *atomicOpsLimiter + gcCancel atomic.Value setModeRequested atomic.Bool writecacheSealCancel atomic.Pointer[writecacheSealCanceler] @@ -100,7 +102,7 @@ type cfg struct { containerInfo container.InfoProvider - opsLimiter qos.Limiter + configOpsLimiter qos.Limiter } func defaultCfg() *cfg { @@ -112,7 +114,7 @@ func defaultCfg() *cfg { zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, zeroCountContainersCallback: func(context.Context, []cid.ID) {}, metricsWriter: noopMetrics{}, - opsLimiter: qos.NewNoopLimiter(), + configOpsLimiter: qos.NewNoopLimiter(), } } @@ -128,10 +130,11 @@ func New(opts ...Option) *Shard { mb := meta.New(c.metaOpts...) s := &Shard{ - cfg: c, - blobStor: bs, - metaBase: mb, - tsSource: c.tsSource, + cfg: c, + blobStor: bs, + metaBase: mb, + tsSource: c.tsSource, + opsLimiter: newAtomicOpsLimiter(c.configOpsLimiter), } reportFunc := func(ctx context.Context, msg string, err error) { @@ -145,7 +148,8 @@ func New(opts ...Option) *Shard { append(c.writeCacheOpts, writecache.WithReportErrorFunc(reportFunc), writecache.WithBlobstor(bs), - writecache.WithMetabase(mb))...) + writecache.WithMetabase(mb), + writecache.WithQoSLimiter(s.opsLimiter))...) s.writeCache.GetMetrics().SetPath(s.writeCache.DumpInfo().Path) } @@ -374,7 +378,7 @@ func WithContainerInfoProvider(containerInfo container.InfoProvider) Option { func WithLimiter(l qos.Limiter) Option { return func(c *cfg) { - c.opsLimiter = l + c.configOpsLimiter = l } } @@ -520,3 +524,39 @@ func (s *Shard) SetEvacuationInProgress(val bool) { s.info.EvacuationInProgress = val s.metricsWriter.SetEvacuationInProgress(val) } + +var _ qos.Limiter = &atomicOpsLimiter{} + +func newAtomicOpsLimiter(l qos.Limiter) *atomicOpsLimiter { + result := &atomicOpsLimiter{} + result.ptr.Store(&qosLimiterHolder{Limiter: l}) + return result +} + +type atomicOpsLimiter struct { + ptr atomic.Pointer[qosLimiterHolder] +} + +func (a *atomicOpsLimiter) Close() { + a.ptr.Load().Close() +} + +func (a *atomicOpsLimiter) ReadRequest(ctx context.Context) (qos.ReleaseFunc, error) { + return a.ptr.Load().ReadRequest(ctx) +} + +func (a *atomicOpsLimiter) SetMetrics(m qos.Metrics) { + a.ptr.Load().SetMetrics(m) +} + +func (a *atomicOpsLimiter) SetParentID(id string) { + a.ptr.Load().SetParentID(id) +} + +func (a *atomicOpsLimiter) WriteRequest(ctx context.Context) (qos.ReleaseFunc, error) { + return a.ptr.Load().WriteRequest(ctx) +} + +type qosLimiterHolder struct { + qos.Limiter +}