[#1636] qos: Add semaphore limiter
If no tags specified, then limiter could be optimized to use atomic semaphore. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c2d855aedd
commit
8ed71a969e
4 changed files with 68 additions and 27 deletions
2
go.mod
2
go.mod
|
@ -8,7 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0
|
||||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
|
|
4
go.sum
4
go.sum
|
@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb
|
|||
git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -26,6 +25,11 @@ type Limiter interface {
|
|||
Close()
|
||||
}
|
||||
|
||||
type scheduler interface {
|
||||
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||
if err := validateConfig(c); err != nil {
|
||||
return nil, err
|
||||
|
@ -34,15 +38,11 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
if isNoop(read, write) {
|
||||
return noopLimiterInstance, nil
|
||||
}
|
||||
readScheduler, err := scheduling.NewMClock(
|
||||
uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps),
|
||||
converToSchedulingTags(read.Tags), read.IdleTimeout)
|
||||
readScheduler, err := createScheduler(c.Read())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
}
|
||||
writeScheduler, err := scheduling.NewMClock(
|
||||
uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps),
|
||||
converToSchedulingTags(write.Tags), write.IdleTimeout)
|
||||
writeScheduler, err := createScheduler(c.Write())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
|
@ -52,6 +52,15 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
|
||||
return newSemaphoreScheduler(config.MaxRunningOps), nil
|
||||
}
|
||||
return scheduling.NewMClock(
|
||||
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
|
||||
converToSchedulingTags(config.Tags), config.IdleTimeout)
|
||||
}
|
||||
|
||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||
result := make(map[string]scheduling.TagInfo)
|
||||
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
||||
|
@ -100,27 +109,19 @@ func (n *noopLimiter) Close() {}
|
|||
var _ Limiter = (*mClockLimiter)(nil)
|
||||
|
||||
type mClockLimiter struct {
|
||||
readScheduler *scheduling.MClock
|
||||
writeScheduler *scheduling.MClock
|
||||
readScheduler scheduler
|
||||
writeScheduler scheduler
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
assert.True(ok, "request has no tag")
|
||||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := n.readScheduler.RequestArrival(ctx, tag)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) {
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return ReleaseFunc(rel), nil
|
||||
return requestArrival(ctx, n.readScheduler)
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
return requestArrival(ctx, n.writeScheduler)
|
||||
}
|
||||
|
||||
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = IOTagClient.String()
|
||||
|
@ -128,9 +129,10 @@ func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
|||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := n.writeScheduler.RequestArrival(ctx, tag)
|
||||
rel, err := s.RequestArrival(ctx, tag)
|
||||
if err != nil {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) {
|
||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||
return nil, &apistatus.ResourceExhausted{}
|
||||
}
|
||||
return nil, err
|
||||
|
|
39
internal/qos/semaphore.go
Normal file
39
internal/qos/semaphore.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
qosSemaphore "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
)
|
||||
|
||||
var (
|
||||
_ scheduler = (*semaphore)(nil)
|
||||
errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded")
|
||||
)
|
||||
|
||||
type semaphore struct {
|
||||
s *qosSemaphore.Semaphore
|
||||
}
|
||||
|
||||
func newSemaphoreScheduler(size int64) *semaphore {
|
||||
return &semaphore{
|
||||
s: qosSemaphore.NewSemaphore(size),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *semaphore) Close() {}
|
||||
|
||||
func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if s.s.Acquire() {
|
||||
return s.s.Release, nil
|
||||
}
|
||||
return nil, errSemaphoreLimitExceeded
|
||||
}
|
Loading…
Add table
Reference in a new issue