[#9999] qos: Add Limiter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-05 12:31:01 +03:00
parent 8c22d3d156
commit b870fa3377
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
4 changed files with 159 additions and 4 deletions

View file

@ -135,6 +135,7 @@ type shardCfg struct {
refillMetabase bool refillMetabase bool
refillMetabaseWorkersCount int refillMetabaseWorkersCount int
mode shardmode.Mode mode shardmode.Mode
limiter qos.Limiter
metaCfg struct { metaCfg struct {
path string path string
@ -277,7 +278,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
a.setMetabaseConfig(&newConfig, oldConfig) a.setMetabaseConfig(&newConfig, oldConfig)
a.setGCConfig(&newConfig, oldConfig) a.setGCConfig(&newConfig, oldConfig)
if err := a.setLimits(&newConfig, oldConfig); err != nil { if err := a.setLimiter(&newConfig, oldConfig); err != nil {
return err return err
} }
@ -373,11 +374,16 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount() newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
} }
func (a *applicationConfiguration) setLimits(newConfig *shardCfg, oldConfig *shardconfig.Config) error { func (a *applicationConfiguration) setLimiter(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
limitsConfig := oldConfig.Limits() limitsConfig := oldConfig.Limits()
if err := qos.ValidateConfig(limitsConfig); err != nil { limiter, err := qos.NewLimiter(limitsConfig)
if err != nil {
return err return err
} }
if newConfig.limiter != nil {
newConfig.limiter.Close()
}
newConfig.limiter = limiter
return nil return nil
} }

9
internal/assert/cond.go Normal file
View file

@ -0,0 +1,9 @@
package assert
import "strings"
func Cond(cond bool, details ...string) {
if !cond {
panic(strings.Join(details, " "))
}
}

131
internal/qos/limiter.go Normal file
View file

@ -0,0 +1,131 @@
package qos
import (
"context"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
)
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
)
type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
Close()
}
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
}
if isNoop(c) {
return noopLimiterInstance, nil
}
readScheduler, err := scheduling.NewMClock(
uint64(c.MaxReadRunningOps()), uint64(c.MaxReadWaitingOps()),
converToSchedulingTags(c.ReadTags()), defaultIdleTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create read scheduler: %w", err)
}
writeScheduler, err := scheduling.NewMClock(
uint64(c.MaxWriteRunningOps()), uint64(c.MaxWriteWaitingOps()),
converToSchedulingTags(c.WriteTags()), defaultIdleTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create write scheduler: %w", err)
}
return &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
}, nil
}
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
Share: defaultShare,
}
}
for _, l := range limits {
v := result[l.Tag]
if l.Weight != nil && *l.Weight != 0 {
v.Share = *l.Weight
}
if l.LimitOps != nil && *l.LimitOps != 0 {
v.LimitIOPS = l.LimitOps
}
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
}
result[l.Tag] = v
}
return result
}
var (
_ Limiter = (*noopLimiter)(nil)
releaseStub ReleaseFunc = func() {}
noopLimiterInstance = &noopLimiter{}
)
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) Close() {}
var _ Limiter = (*mClockLimiter)(nil)
type mClockLimiter struct {
readScheduler *scheduling.MClock
writeScheduler *scheduling.MClock
}
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
assert.Cond(ok, "request has no tag")
if tag == IOTagCritical.String() {
return releaseStub, nil
}
rel, err := n.readScheduler.RequestArrival(ctx, tag)
if err != nil {
return nil, err
}
return ReleaseFunc(rel), nil
}
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
if tag == IOTagCritical.String() {
return releaseStub, nil
}
rel, err := n.writeScheduler.RequestArrival(ctx, tag)
if err != nil {
return nil, err
}
return ReleaseFunc(rel), nil
}
func (n *mClockLimiter) Close() {
n.readScheduler.Close()
n.writeScheduler.Close()
}

View file

@ -14,7 +14,7 @@ type tagConfig struct {
Shares, Limit, Reserved *float64 Shares, Limit, Reserved *float64
} }
func ValidateConfig(c *limits.Config) error { func validateConfig(c *limits.Config) error {
if c.MaxReadRunningOps() <= 0 { if c.MaxReadRunningOps() <= 0 {
return fmt.Errorf("invalid 'max_read_running_ops = %d': must be greater than zero", c.MaxReadRunningOps()) return fmt.Errorf("invalid 'max_read_running_ops = %d': must be greater than zero", c.MaxReadRunningOps())
} }
@ -86,3 +86,12 @@ func float64Value(f *float64) float64 {
} }
return *f return *f
} }
func isNoop(c *limits.Config) bool {
return c.MaxReadRunningOps() == limits.NoLimit &&
c.MaxReadWaitingOps() == limits.NoLimit &&
c.MaxWriteRunningOps() == limits.NoLimit &&
c.MaxWriteWaitingOps() == limits.NoLimit &&
len(c.ReadTags()) == 0 &&
len(c.WriteTags()) == 0
}