[#9999] qos: Add semaphore limiter
All checks were successful
DCO action / DCO (pull_request) Successful in 42s
Tests and linters / Run gofumpt (pull_request) Successful in 37s
Vulncheck / Vulncheck (pull_request) Successful in 52s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m25s
Build / Build Components (pull_request) Successful in 1m45s
Tests and linters / Staticcheck (pull_request) Successful in 2m13s
Tests and linters / Lint (pull_request) Successful in 2m55s
Tests and linters / Tests (pull_request) Successful in 2m59s
Tests and linters / Tests with -race (pull_request) Successful in 3m15s
Tests and linters / gopls check (pull_request) Successful in 3m18s
All checks were successful
DCO action / DCO (pull_request) Successful in 42s
Tests and linters / Run gofumpt (pull_request) Successful in 37s
Vulncheck / Vulncheck (pull_request) Successful in 52s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m25s
Build / Build Components (pull_request) Successful in 1m45s
Tests and linters / Staticcheck (pull_request) Successful in 2m13s
Tests and linters / Lint (pull_request) Successful in 2m55s
Tests and linters / Tests (pull_request) Successful in 2m59s
Tests and linters / Tests with -race (pull_request) Successful in 3m15s
Tests and linters / gopls check (pull_request) Successful in 3m18s
If no tags specified, then limiter could be optimized to use atomic semaphore. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c922e68f1b
commit
49c384ef9d
2 changed files with 65 additions and 24 deletions
|
@ -7,7 +7,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -26,6 +25,11 @@ type Limiter interface {
|
||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type scheduler interface {
|
||||||
|
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
func NewLimiter(c *limits.Config) (Limiter, error) {
|
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||||
if err := validateConfig(c); err != nil {
|
if err := validateConfig(c); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -34,15 +38,11 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||||
if isNoop(read, write) {
|
if isNoop(read, write) {
|
||||||
return noopLimiterInstance, nil
|
return noopLimiterInstance, nil
|
||||||
}
|
}
|
||||||
readScheduler, err := scheduling.NewMClock(
|
readScheduler, err := createScheduler(c.Read())
|
||||||
uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps),
|
|
||||||
converToSchedulingTags(read.Tags), read.IdleTimeout)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create read scheduler: %w", err)
|
return nil, fmt.Errorf("failed to create read scheduler: %w", err)
|
||||||
}
|
}
|
||||||
writeScheduler, err := scheduling.NewMClock(
|
writeScheduler, err := createScheduler(c.Write())
|
||||||
uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps),
|
|
||||||
converToSchedulingTags(write.Tags), write.IdleTimeout)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create write scheduler: %w", err)
|
return nil, fmt.Errorf("failed to create write scheduler: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,15 @@ func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createScheduler(config limits.OpConfig) (scheduler, error) {
|
||||||
|
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
|
||||||
|
return &semaphore{limit: int64(config.MaxRunningOps)}, nil
|
||||||
|
}
|
||||||
|
return scheduling.NewMClock(
|
||||||
|
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
|
||||||
|
converToSchedulingTags(config.Tags), config.IdleTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||||
result := make(map[string]scheduling.TagInfo)
|
result := make(map[string]scheduling.TagInfo)
|
||||||
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
||||||
|
@ -100,27 +109,19 @@ func (n *noopLimiter) Close() {}
|
||||||
var _ Limiter = (*mClockLimiter)(nil)
|
var _ Limiter = (*mClockLimiter)(nil)
|
||||||
|
|
||||||
type mClockLimiter struct {
|
type mClockLimiter struct {
|
||||||
readScheduler *scheduling.MClock
|
readScheduler scheduler
|
||||||
writeScheduler *scheduling.MClock
|
writeScheduler scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
return requestArrival(ctx, n.readScheduler)
|
||||||
assert.Cond(ok, "request has no tag")
|
|
||||||
if tag == IOTagCritical.String() {
|
|
||||||
return releaseStub, nil
|
|
||||||
}
|
|
||||||
rel, err := n.readScheduler.RequestArrival(ctx, tag)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) {
|
|
||||||
return nil, &apistatus.ResourceExhausted{}
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return ReleaseFunc(rel), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||||
|
return requestArrival(ctx, n.writeScheduler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
|
||||||
tag, ok := tagging.IOTagFromContext(ctx)
|
tag, ok := tagging.IOTagFromContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
tag = IOTagClient.String()
|
tag = IOTagClient.String()
|
||||||
|
@ -128,9 +129,10 @@ func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||||
if tag == IOTagCritical.String() {
|
if tag == IOTagCritical.String() {
|
||||||
return releaseStub, nil
|
return releaseStub, nil
|
||||||
}
|
}
|
||||||
rel, err := n.writeScheduler.RequestArrival(ctx, tag)
|
rel, err := s.RequestArrival(ctx, tag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) {
|
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
|
||||||
|
errors.Is(err, errSemaphoreLimitExceeded) {
|
||||||
return nil, &apistatus.ResourceExhausted{}
|
return nil, &apistatus.ResourceExhausted{}
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
39
internal/qos/semaphore.go
Normal file
39
internal/qos/semaphore.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package qos
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ scheduler = (*semaphore)(nil)
|
||||||
|
errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded")
|
||||||
|
)
|
||||||
|
|
||||||
|
type semaphore struct {
|
||||||
|
count atomic.Int64
|
||||||
|
limit int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *semaphore) Close() {}
|
||||||
|
|
||||||
|
func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
v := s.count.Add(1)
|
||||||
|
if v > s.limit {
|
||||||
|
s.count.Add(-1)
|
||||||
|
return nil, errSemaphoreLimitExceeded
|
||||||
|
}
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
s.count.Add(-1)
|
||||||
|
}, nil
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue