From 92a67a6716723f81dabb4f809ff040ef7014903a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 5 Feb 2025 12:31:01 +0300 Subject: [PATCH] [#1636] qos: Add Limiter Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 12 +++- internal/assert/cond.go | 9 +++ internal/qos/limiter.go | 132 +++++++++++++++++++++++++++++++++++++ internal/qos/validate.go | 11 +++- 4 files changed, 160 insertions(+), 4 deletions(-) create mode 100644 internal/assert/cond.go create mode 100644 internal/qos/limiter.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 004c8f128..c625a041f 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -136,6 +136,7 @@ type shardCfg struct { refillMetabase bool refillMetabaseWorkersCount int mode shardmode.Mode + limiter qos.Limiter metaCfg struct { path string @@ -278,7 +279,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig a.setMetabaseConfig(&newConfig, oldConfig) a.setGCConfig(&newConfig, oldConfig) - if err := a.setLimits(&newConfig, oldConfig); err != nil { + if err := a.setLimiter(&newConfig, oldConfig); err != nil { return err } @@ -374,11 +375,16 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount() } -func (a *applicationConfiguration) setLimits(newConfig *shardCfg, oldConfig *shardconfig.Config) error { +func (a *applicationConfiguration) setLimiter(newConfig *shardCfg, oldConfig *shardconfig.Config) error { limitsConfig := oldConfig.Limits() - if err := qos.ValidateConfig(limitsConfig); err != nil { + limiter, err := qos.NewLimiter(limitsConfig) + if err != nil { return err } + if newConfig.limiter != nil { + newConfig.limiter.Close() + } + newConfig.limiter = limiter return nil } diff --git a/internal/assert/cond.go b/internal/assert/cond.go new file mode 100644 index 000000000..701036fa8 --- /dev/null +++ b/internal/assert/cond.go @@ -0,0 +1,9 @@ +package assert + +import "strings" + +func True(cond bool, details ...string) { + if !cond { + panic(strings.Join(details, " ")) + } +} diff --git a/internal/qos/limiter.go b/internal/qos/limiter.go new file mode 100644 index 000000000..d2a1919f1 --- /dev/null +++ b/internal/qos/limiter.go @@ -0,0 +1,132 @@ +package qos + +import ( + "context" + "fmt" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert" + "git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling" + "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" +) + +const ( + defaultIdleTimeout time.Duration = 0 + defaultShare float64 = 1.0 +) + +type ReleaseFunc scheduling.ReleaseFunc + +type Limiter interface { + ReadRequest(context.Context) (ReleaseFunc, error) + WriteRequest(context.Context) (ReleaseFunc, error) + Close() +} + +func NewLimiter(c *limits.Config) (Limiter, error) { + if err := validateConfig(c); err != nil { + return nil, err + } + read, write := c.Read(), c.Write() + if isNoop(read, write) { + return noopLimiterInstance, nil + } + readScheduler, err := scheduling.NewMClock( + uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps), + converToSchedulingTags(read.Tags), read.IdleTimeout) + if err != nil { + return nil, fmt.Errorf("create read scheduler: %w", err) + } + writeScheduler, err := scheduling.NewMClock( + uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps), + converToSchedulingTags(write.Tags), write.IdleTimeout) + if err != nil { + return nil, fmt.Errorf("create write scheduler: %w", err) + } + return &mClockLimiter{ + readScheduler: readScheduler, + writeScheduler: writeScheduler, + }, nil +} + +func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo { + result := make(map[string]scheduling.TagInfo) + for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} { + result[tag.String()] = scheduling.TagInfo{ + Share: defaultShare, + } + } + for _, l := range limits { + v := result[l.Tag] + if l.Weight != nil && *l.Weight != 0 { + v.Share = *l.Weight + } + if l.LimitOps != nil && *l.LimitOps != 0 { + v.LimitIOPS = l.LimitOps + } + if l.ReservedOps != nil && *l.ReservedOps != 0 { + v.ReservedIOPS = l.ReservedOps + } + result[l.Tag] = v + } + return result +} + +var ( + _ Limiter = (*noopLimiter)(nil) + releaseStub ReleaseFunc = func() {} + noopLimiterInstance = &noopLimiter{} +) + +type noopLimiter struct{} + +func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) { + return releaseStub, nil +} + +func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) { + return releaseStub, nil +} + +func (n *noopLimiter) Close() {} + +var _ Limiter = (*mClockLimiter)(nil) + +type mClockLimiter struct { + readScheduler *scheduling.MClock + writeScheduler *scheduling.MClock +} + +func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { + tag, ok := tagging.IOTagFromContext(ctx) + assert.True(ok, "request has no tag") + if tag == IOTagCritical.String() { + return releaseStub, nil + } + rel, err := n.readScheduler.RequestArrival(ctx, tag) + if err != nil { + return nil, err + } + return ReleaseFunc(rel), nil +} + +func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { + tag, ok := tagging.IOTagFromContext(ctx) + if !ok { + tag = IOTagClient.String() + } + if tag == IOTagCritical.String() { + return releaseStub, nil + } + rel, err := n.writeScheduler.RequestArrival(ctx, tag) + if err != nil { + return nil, err + } + return ReleaseFunc(rel), nil +} + +func (n *mClockLimiter) Close() { + n.readScheduler.Close() + n.writeScheduler.Close() +} diff --git a/internal/qos/validate.go b/internal/qos/validate.go index afced345b..43aa74942 100644 --- a/internal/qos/validate.go +++ b/internal/qos/validate.go @@ -14,7 +14,7 @@ type tagConfig struct { Shares, Limit, Reserved *float64 } -func ValidateConfig(c *limits.Config) error { +func validateConfig(c *limits.Config) error { if err := validateOpConfig(c.Read()); err != nil { return fmt.Errorf("limits 'read' section validation error: %w", err) } @@ -90,3 +90,12 @@ func float64Value(f *float64) float64 { } return *f } + +func isNoop(read, write limits.OpConfig) bool { + return read.MaxRunningOps == limits.NoLimit && + read.MaxWaitingOps == limits.NoLimit && + write.MaxRunningOps == limits.NoLimit && + write.MaxWaitingOps == limits.NoLimit && + len(read.Tags) == 0 && + len(write.Tags) == 0 +}