[#9999] qos: Add Limiter
Some checks failed
DCO action / DCO (pull_request) Successful in 43s
Vulncheck / Vulncheck (pull_request) Successful in 56s
Build / Build Components (pull_request) Failing after 1m9s
Tests and linters / Tests (pull_request) Failing after 1m4s
Tests and linters / Staticcheck (pull_request) Failing after 1m8s
Tests and linters / Lint (pull_request) Failing after 1m16s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Tests and linters / Run gofumpt (pull_request) Successful in 2m29s
Tests and linters / gopls check (pull_request) Successful in 3m6s
Tests and linters / Tests with -race (pull_request) Failing after 3m15s
Some checks failed
DCO action / DCO (pull_request) Successful in 43s
Vulncheck / Vulncheck (pull_request) Successful in 56s
Build / Build Components (pull_request) Failing after 1m9s
Tests and linters / Tests (pull_request) Failing after 1m4s
Tests and linters / Staticcheck (pull_request) Failing after 1m8s
Tests and linters / Lint (pull_request) Failing after 1m16s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m33s
Tests and linters / Run gofumpt (pull_request) Successful in 2m29s
Tests and linters / gopls check (pull_request) Successful in 3m6s
Tests and linters / Tests with -race (pull_request) Failing after 3m15s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
77f67f837d
commit
44218bf4f6
5 changed files with 154 additions and 4 deletions
|
@ -135,6 +135,7 @@ type shardCfg struct {
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
refillMetabaseWorkersCount int
|
refillMetabaseWorkersCount int
|
||||||
mode shardmode.Mode
|
mode shardmode.Mode
|
||||||
|
limiter qos.Limiter
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
path string
|
path string
|
||||||
|
@ -277,7 +278,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
a.setMetabaseConfig(&newConfig, oldConfig)
|
a.setMetabaseConfig(&newConfig, oldConfig)
|
||||||
|
|
||||||
a.setGCConfig(&newConfig, oldConfig)
|
a.setGCConfig(&newConfig, oldConfig)
|
||||||
if err := a.setLimits(&newConfig, oldConfig); err != nil {
|
if err := a.setLimiter(&newConfig, oldConfig); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,11 +374,16 @@ func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *s
|
||||||
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *applicationConfiguration) setLimits(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
|
func (a *applicationConfiguration) setLimiter(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
|
||||||
limitsConfig := oldConfig.Limits()
|
limitsConfig := oldConfig.Limits()
|
||||||
if err := qos.ValidateConfig(limitsConfig); err != nil {
|
limiter, err := qos.NewLimiter(limitsConfig)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if newConfig.limiter != nil {
|
||||||
|
newConfig.limiter.Close()
|
||||||
|
}
|
||||||
|
newConfig.limiter = limiter
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -8,6 +8,7 @@ require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -8,6 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88 h1:9bvBDLApbbO5sXBKdODpE9tzy3HV99nXxkDWNn22rdI=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe h1:81gDNdWNLP24oMQukRiCE9R1wGSh0l0dRq3F1W+Oesc=
|
||||||
|
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250128150313-cfbca7fa1dfe/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421 h1:pP19IawSdsLCKFv7HMNfWAeH6E3uSnntKZkwka+/2+4=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250202151421-8389887a3421/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
|
|
132
internal/qos/limiter.go
Normal file
132
internal/qos/limiter.go
Normal file
|
@ -0,0 +1,132 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultIdleTimeout time.Duration = 0
|
||||||
|
defaultShare float64 = 1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReleaseFunc scheduling.ReleaseFunc
|
||||||
|
|
||||||
|
type Limiter interface {
|
||||||
|
PerformReadRequest(context.Context) (ReleaseFunc, error)
|
||||||
|
PerformWriteRequest(context.Context) (ReleaseFunc, error)
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||||
|
if err := validateConfig(c); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if isNoop(c) {
|
||||||
|
return noopLimiterInstance, nil
|
||||||
|
}
|
||||||
|
readScheduler, err := scheduling.NewMClock(
|
||||||
|
uint64(c.MaxReadRunningOps()), uint64(c.MaxReadWaitingOps()),
|
||||||
|
converToSchedulingTags(c.ReadTags()), defaultIdleTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create read scheduler: %w", err)
|
||||||
|
}
|
||||||
|
writeScheduler, err := scheduling.NewMClock(
|
||||||
|
uint64(c.MaxWriteRunningOps()), uint64(c.MaxWriteWaitingOps()),
|
||||||
|
converToSchedulingTags(c.WriteTags()), defaultIdleTimeout)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create write scheduler: %w", err)
|
||||||
|
}
|
||||||
|
return &mClockLimiter{
|
||||||
|
readScheduler: readScheduler,
|
||||||
|
writeScheduler: writeScheduler,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||||
|
result := make(map[string]scheduling.TagInfo)
|
||||||
|
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
||||||
|
result[tag.String()] = scheduling.TagInfo{
|
||||||
|
Share: defaultShare,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, l := range limits {
|
||||||
|
v := result[l.Tag]
|
||||||
|
if l.Weight != nil && *l.Weight != 0 {
|
||||||
|
v.Share = *l.Weight
|
||||||
|
}
|
||||||
|
if l.LimitOps != nil && *l.LimitOps != 0 {
|
||||||
|
v.LimitIOPS = l.LimitOps
|
||||||
|
}
|
||||||
|
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
||||||
|
v.ReservedIOPS = l.ReservedOps
|
||||||
|
}
|
||||||
|
result[l.Tag] = v
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ Limiter = (*noopLimiter)(nil)
|
||||||
|
releaseStub ReleaseFunc = func() {}
|
||||||
|
noopLimiterInstance = &noopLimiter{}
|
||||||
|
)
|
||||||
|
|
||||||
|
type noopLimiter struct{}
|
||||||
|
|
||||||
|
func (n *noopLimiter) PerformReadRequest(context.Context) (ReleaseFunc, error) {
|
||||||
|
return releaseStub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopLimiter) PerformWriteRequest(context.Context) (ReleaseFunc, error) {
|
||||||
|
return releaseStub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *noopLimiter) Close()
|
||||||
|
|
||||||
|
var _ Limiter = (*mClockLimiter)(nil)
|
||||||
|
|
||||||
|
type mClockLimiter struct {
|
||||||
|
readScheduler *scheduling.MClock
|
||||||
|
writeScheduler *scheduling.MClock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *mClockLimiter) PerformReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
tag = IOTagClient.String()
|
||||||
|
}
|
||||||
|
if tag == IOTagCritical.String() {
|
||||||
|
return releaseStub, nil
|
||||||
|
}
|
||||||
|
rel, err := n.readScheduler.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ReleaseFunc(rel), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *mClockLimiter) PerformWriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||||
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
|
if !ok {
|
||||||
|
tag = IOTagClient.String()
|
||||||
|
}
|
||||||
|
if tag == IOTagCritical.String() {
|
||||||
|
return releaseStub, nil
|
||||||
|
}
|
||||||
|
rel, err := n.writeScheduler.RequestArrival(ctx, tag)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ReleaseFunc(rel), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *mClockLimiter) Close() {
|
||||||
|
n.readScheduler.Close()
|
||||||
|
n.writeScheduler.Close()
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ type tagConfig struct {
|
||||||
Shares, Limit, Reserved *float64
|
Shares, Limit, Reserved *float64
|
||||||
}
|
}
|
||||||
|
|
||||||
func ValidateConfig(c *limits.Config) error {
|
func validateConfig(c *limits.Config) error {
|
||||||
if c.MaxReadRunningOps() <= 0 {
|
if c.MaxReadRunningOps() <= 0 {
|
||||||
return fmt.Errorf("invalid 'max_read_running_ops = %d': must be greater than zero", c.MaxReadRunningOps())
|
return fmt.Errorf("invalid 'max_read_running_ops = %d': must be greater than zero", c.MaxReadRunningOps())
|
||||||
}
|
}
|
||||||
|
@ -86,3 +86,12 @@ func float64Value(f *float64) float64 {
|
||||||
}
|
}
|
||||||
return *f
|
return *f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isNoop(c *limits.Config) bool {
|
||||||
|
return c.MaxReadRunningOps() == limits.NoLimit &&
|
||||||
|
c.MaxReadWaitingOps() == limits.NoLimit &&
|
||||||
|
c.MaxWriteRunningOps() == limits.NoLimit &&
|
||||||
|
c.MaxWriteWaitingOps() == limits.NoLimit &&
|
||||||
|
len(c.ReadTags()) == 0 &&
|
||||||
|
len(c.WriteTags()) == 0
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue