From b870fa33776fe9c04a02425bfb315131eb8f81b5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 5 Feb 2025 12:31:01 +0300 Subject: [PATCH] [#9999] qos: Add Limiter Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 12 +++- internal/assert/cond.go | 9 +++ internal/qos/limiter.go | 131 +++++++++++++++++++++++++++++++++++++ internal/qos/validate.go | 11 +++- 4 files changed, 159 insertions(+), 4 deletions(-) create mode 100644 internal/assert/cond.go create mode 100644 internal/qos/limiter.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index e326c02b4..59ed2e918 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -135,6 +135,7 @@ type shardCfg struct { refillMetabase bool refillMetabaseWorkersCount int mode shardmode.Mode + limiter qos.Limiter metaCfg struct { path string @@ -277,7 +278,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig a.setMetabaseConfig(&newConfig, oldConfig) a.setGCConfig(&newConfig, oldConfig) - if err := a.setLimits(&newConfig, oldConfig); err != nil { + if err := a.setLimiter(&newConfig, oldConfig); err != nil { return err } @@ -373,11 +374,16 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount() } -func (a *applicationConfiguration) setLimits(newConfig *shardCfg, oldConfig *shardconfig.Config) error { +func (a *applicationConfiguration) setLimiter(newConfig *shardCfg, oldConfig *shardconfig.Config) error { limitsConfig := oldConfig.Limits() - if err := qos.ValidateConfig(limitsConfig); err != nil { + limiter, err := qos.NewLimiter(limitsConfig) + if err != nil { return err } + if newConfig.limiter != nil { + newConfig.limiter.Close() + } + newConfig.limiter = limiter return nil } diff --git a/internal/assert/cond.go b/internal/assert/cond.go new file mode 100644 index 000000000..4a1b20159 --- /dev/null +++ b/internal/assert/cond.go @@ -0,0 +1,9 @@ +package assert + +import "strings" + +func Cond(cond bool, details ...string) { + if !cond { + panic(strings.Join(details, " ")) + } +} diff --git a/internal/qos/limiter.go b/internal/qos/limiter.go new file mode 100644 index 000000000..1a297dab2 --- /dev/null +++ b/internal/qos/limiter.go @@ -0,0 +1,131 @@ +package qos + +import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert" + "git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" +) + +const ( + defaultIdleTimeout time.Duration = 0 + defaultShare float64 = 1.0 +) + +type ReleaseFunc scheduling.ReleaseFunc + +type Limiter interface { + ReadRequest(context.Context) (ReleaseFunc, error) + WriteRequest(context.Context) (ReleaseFunc, error) + Close() +} + +func NewLimiter(c *limits.Config) (Limiter, error) { + if err := validateConfig(c); err != nil { + return nil, err + } + if isNoop(c) { + return noopLimiterInstance, nil + } + readScheduler, err := scheduling.NewMClock( + uint64(c.MaxReadRunningOps()), uint64(c.MaxReadWaitingOps()), + converToSchedulingTags(c.ReadTags()), defaultIdleTimeout) + if err != nil { + return nil, fmt.Errorf("failed to create read scheduler: %w", err) + } + writeScheduler, err := scheduling.NewMClock( + uint64(c.MaxWriteRunningOps()), uint64(c.MaxWriteWaitingOps()), + converToSchedulingTags(c.WriteTags()), defaultIdleTimeout) + if err != nil { + return nil, fmt.Errorf("failed to create write scheduler: %w", err) + } + return &mClockLimiter{ + readScheduler: readScheduler, + writeScheduler: writeScheduler, + }, nil +} + +func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo { + result := make(map[string]scheduling.TagInfo) + for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} { + result[tag.String()] = scheduling.TagInfo{ + Share: defaultShare, + } + } + for _, l := range limits { + v := result[l.Tag] + if l.Weight != nil && *l.Weight != 0 { + v.Share = *l.Weight + } + if l.LimitOps != nil && *l.LimitOps != 0 { + v.LimitIOPS = l.LimitOps + } + if l.ReservedOps != nil && *l.ReservedOps != 0 { + v.ReservedIOPS = l.ReservedOps + } + result[l.Tag] = v + } + return result +} + +var ( + _ Limiter = (*noopLimiter)(nil) + releaseStub ReleaseFunc = func() {} + noopLimiterInstance = &noopLimiter{} +) + +type noopLimiter struct{} + +func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) { + return releaseStub, nil +} + +func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) { + return releaseStub, nil +} + +func (n *noopLimiter) Close() {} + +var _ Limiter = (*mClockLimiter)(nil) + +type mClockLimiter struct { + readScheduler *scheduling.MClock + writeScheduler *scheduling.MClock +} + +func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { + tag, ok := tagging.IOTagFromContext(ctx) + assert.Cond(ok, "request has no tag") + if tag == IOTagCritical.String() { + return releaseStub, nil + } + rel, err := n.readScheduler.RequestArrival(ctx, tag) + if err != nil { + return nil, err + } + return ReleaseFunc(rel), nil +} + +func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { + tag, ok := tagging.IOTagFromContext(ctx) + if !ok { + tag = IOTagClient.String() + } + if tag == IOTagCritical.String() { + return releaseStub, nil + } + rel, err := n.writeScheduler.RequestArrival(ctx, tag) + if err != nil { + return nil, err + } + return ReleaseFunc(rel), nil +} + +func (n *mClockLimiter) Close() { + n.readScheduler.Close() + n.writeScheduler.Close() +} diff --git a/internal/qos/validate.go b/internal/qos/validate.go index 60e86658f..dd230116c 100644 --- a/internal/qos/validate.go +++ b/internal/qos/validate.go @@ -14,7 +14,7 @@ type tagConfig struct { Shares, Limit, Reserved *float64 } -func ValidateConfig(c *limits.Config) error { +func validateConfig(c *limits.Config) error { if c.MaxReadRunningOps() <= 0 { return fmt.Errorf("invalid 'max_read_running_ops = %d': must be greater than zero", c.MaxReadRunningOps()) } @@ -86,3 +86,12 @@ func float64Value(f *float64) float64 { } return *f } + +func isNoop(c *limits.Config) bool { + return c.MaxReadRunningOps() == limits.NoLimit && + c.MaxReadWaitingOps() == limits.NoLimit && + c.MaxWriteRunningOps() == limits.NoLimit && + c.MaxWriteWaitingOps() == limits.NoLimit && + len(c.ReadTags()) == 0 && + len(c.WriteTags()) == 0 +}