From 07a660fbc440dbd3974250608506e39c093bf6cc Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 13 Mar 2025 18:01:27 +0300 Subject: [PATCH] [#1677] writecache: Add QoS limiter usage Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 1 + internal/logs/logs.go | 1 + pkg/local_object_storage/writecache/cache.go | 2 ++ pkg/local_object_storage/writecache/flush.go | 15 ++++++++++++++- pkg/local_object_storage/writecache/options.go | 9 +++++++++ 5 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index e2fe23135..2531e9173 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -930,6 +930,7 @@ func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { writecache.WithMaxCacheCount(wcRead.countLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log), + writecache.WithQoSLimiter(shCfg.limiter), ) } return writeCacheOpts diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6115cdf90..3503c922e 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -514,4 +514,5 @@ const ( NotSupportedIncomingIOTagReplacedWithClient = "incoming IO tag is not supported, replaced with `client`" FailedToGetNetmapToAdjustIOTag = "failed to get netmap to adjust IO tag, replaced with `client`" FailedToValidateIncomingIOTag = "failed to validate incoming IO tag, replaced with `client`" + WriteCacheFailedToAcquireRPSQuota = "writecache failed to acquire RPS quota to flush object" ) diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index b99d73d3a..ee709ea73 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -6,6 +6,7 @@ import ( "sync" "sync/atomic" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" @@ -61,6 +62,7 @@ func New(opts ...Option) Cache { maxCacheSize: defaultMaxCacheSize, metrics: DefaultMetrics(), flushSizeLimit: defaultFlushWorkersCount * defaultMaxObjectSize, + qosLimiter: qos.NewNoopLimiter(), }, } diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 2d07d8b32..893d27ba2 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -67,7 +67,13 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { continue } - err := c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { + release, err := c.qosLimiter.ReadRequest(ctx) + if err != nil { + c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.IterateInfo"), zap.Error(err)) + c.modeMtx.RUnlock() + continue + } + err = c.fsTree.IterateInfo(ctx, func(oi fstree.ObjectInfo) error { if err := fl.acquire(oi.DataSize); err != nil { return err } @@ -82,6 +88,7 @@ func (c *cache) pushToFlushQueue(ctx context.Context, fl *flushLimiter) { return ctx.Err() } }) + release() if err != nil { c.log.Warn(ctx, logs.BlobstorErrorOccurredDuringTheIteration, zap.Error(err)) } @@ -113,6 +120,12 @@ func (c *cache) workerFlush(ctx context.Context, fl *flushLimiter) { func (c *cache) flushIfAnObjectExistsWorker(ctx context.Context, objInfo objectInfo, fl *flushLimiter) { defer fl.release(objInfo.size) + release, err := c.qosLimiter.WriteRequest(ctx) + if err != nil { + c.log.Warn(ctx, logs.WriteCacheFailedToAcquireRPSQuota, zap.String("operation", "fstree.Get"), zap.Error(err)) + return + } + defer release() res, err := c.fsTree.Get(ctx, common.GetPrm{ Address: objInfo.addr, }) diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index f2957fe98..dbbe66c19 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -3,6 +3,7 @@ package writecache import ( "context" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "go.uber.org/zap" ) @@ -38,6 +39,8 @@ type options struct { disableBackgroundFlush bool // flushSizeLimit is total size of flushing objects. flushSizeLimit uint64 + // qosLimiter used to limit flush RPS. + qosLimiter qos.Limiter } // WithLogger sets logger. @@ -136,3 +139,9 @@ func WithFlushSizeLimit(v uint64) Option { o.flushSizeLimit = v } } + +func WithQoSLimiter(l qos.Limiter) Option { + return func(o *options) { + o.qosLimiter = l + } +}