diff --git a/go.mod b/go.mod index 2bfc3abfe7..bbb8179577 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 - git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf + git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 git.frostfs.info/TrueCloudLab/hrw v1.2.1 git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972 diff --git a/go.sum b/go.sum index 4a7dfd4dc4..0c66f45553 100644 --- a/go.sum +++ b/go.sum @@ -8,8 +8,8 @@ git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb git.frostfs.info/TrueCloudLab/frostfs-locode-db v0.4.1-0.20240710074952-65761deb5c0d/go.mod h1:7ZZq8iguY7qFsXajdHGmZd2AW4QbucyrJwhbsRfOfek= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824 h1:Mxw1c/8t96vFIUOffl28lFaHKi413oCBfLMGJmF9cFA= git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20250212111929-d34e1329c824/go.mod h1:kbwB4v2o6RyOfCo9kEFeUDZIX3LKhmS0yXPrtvzkQ1g= -git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf h1:ik2aMBpTJJpoZe2ffcGShXRkrvny65NEPLVt67KmH/A= -git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250213125059-356851eed3bf/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U= +git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3 h1:QnAt5b2R6+hQthMOIn5ECfLAlVD8IAE5JRm1NCCOmuE= +git.frostfs.info/TrueCloudLab/frostfs-qos v0.0.0-20250227072915-25102d1e1aa3/go.mod h1:PCijYq4oa8vKtIEcUX6jRiszI6XAW+nBwU+T1kB4d1U= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4 h1:dOZHuOywvH1ms8U38lDCWpysgkCCeJ02RLI7zDhPcyw= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20250217152255-c3f7378887a4/go.mod h1:aQpPWfG8oyfJ2X+FenPTJpSRWZjwcP5/RAtkW+/VEX8= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= diff --git a/internal/qos/limiter.go b/internal/qos/limiter.go index 3b6c6547c9..b1406a7f37 100644 --- a/internal/qos/limiter.go +++ b/internal/qos/limiter.go @@ -7,7 +7,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert" "git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" @@ -26,6 +25,11 @@ type Limiter interface { Close() } +type scheduler interface { + RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error) + Close() +} + func NewLimiter(c *limits.Config) (Limiter, error) { if err := validateConfig(c); err != nil { return nil, err @@ -34,15 +38,11 @@ func NewLimiter(c *limits.Config) (Limiter, error) { if isNoop(read, write) { return noopLimiterInstance, nil } - readScheduler, err := scheduling.NewMClock( - uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps), - converToSchedulingTags(read.Tags), read.IdleTimeout) + readScheduler, err := createScheduler(c.Read()) if err != nil { return nil, fmt.Errorf("create read scheduler: %w", err) } - writeScheduler, err := scheduling.NewMClock( - uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps), - converToSchedulingTags(write.Tags), write.IdleTimeout) + writeScheduler, err := createScheduler(c.Write()) if err != nil { return nil, fmt.Errorf("create write scheduler: %w", err) } @@ -52,6 +52,15 @@ func NewLimiter(c *limits.Config) (Limiter, error) { }, nil } +func createScheduler(config limits.OpConfig) (scheduler, error) { + if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit { + return newSemaphoreScheduler(config.MaxRunningOps), nil + } + return scheduling.NewMClock( + uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps), + converToSchedulingTags(config.Tags), config.IdleTimeout) +} + func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo { result := make(map[string]scheduling.TagInfo) for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} { @@ -100,27 +109,19 @@ func (n *noopLimiter) Close() {} var _ Limiter = (*mClockLimiter)(nil) type mClockLimiter struct { - readScheduler *scheduling.MClock - writeScheduler *scheduling.MClock + readScheduler scheduler + writeScheduler scheduler } func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { - tag, ok := tagging.IOTagFromContext(ctx) - assert.True(ok, "request has no tag") - if tag == IOTagCritical.String() { - return releaseStub, nil - } - rel, err := n.readScheduler.RequestArrival(ctx, tag) - if err != nil { - if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) { - return nil, &apistatus.ResourceExhausted{} - } - return nil, err - } - return ReleaseFunc(rel), nil + return requestArrival(ctx, n.readScheduler) } func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { + return requestArrival(ctx, n.writeScheduler) +} + +func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) { tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() @@ -128,9 +129,10 @@ func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { if tag == IOTagCritical.String() { return releaseStub, nil } - rel, err := n.writeScheduler.RequestArrival(ctx, tag) + rel, err := s.RequestArrival(ctx, tag) if err != nil { - if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) { + if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) || + errors.Is(err, errSemaphoreLimitExceeded) { return nil, &apistatus.ResourceExhausted{} } return nil, err diff --git a/internal/qos/semaphore.go b/internal/qos/semaphore.go new file mode 100644 index 0000000000..74e6928f32 --- /dev/null +++ b/internal/qos/semaphore.go @@ -0,0 +1,39 @@ +package qos + +import ( + "context" + "errors" + + qosSemaphore "git.frostfs.info/TrueCloudLab/frostfs-qos/limiting/semaphore" + "git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling" +) + +var ( + _ scheduler = (*semaphore)(nil) + errSemaphoreLimitExceeded = errors.New("semaphore limit exceeded") +) + +type semaphore struct { + s *qosSemaphore.Semaphore +} + +func newSemaphoreScheduler(size int64) *semaphore { + return &semaphore{ + s: qosSemaphore.NewSemaphore(size), + } +} + +func (s *semaphore) Close() {} + +func (s *semaphore) RequestArrival(ctx context.Context, _ string) (scheduling.ReleaseFunc, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + if s.s.Acquire() { + return s.s.Release, nil + } + return nil, errSemaphoreLimitExceeded +}