frostfs-node/internal/qos/limiter.go

147 lines
3.7 KiB
Go
Raw Normal View History

package qos
import (
"context"
"errors"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
)
const (
defaultIdleTimeout time.Duration = 0
defaultShare float64 = 1.0
)
type ReleaseFunc scheduling.ReleaseFunc
type Limiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
Close()
}
type scheduler interface {
RequestArrival(ctx context.Context, tag string) (scheduling.ReleaseFunc, error)
Close()
}
func NewLimiter(c *limits.Config) (Limiter, error) {
if err := validateConfig(c); err != nil {
return nil, err
}
read, write := c.Read(), c.Write()
if isNoop(read, write) {
return noopLimiterInstance, nil
}
readScheduler, err := createScheduler(c.Read())
if err != nil {
return nil, fmt.Errorf("create read scheduler: %w", err)
}
writeScheduler, err := createScheduler(c.Write())
if err != nil {
return nil, fmt.Errorf("create write scheduler: %w", err)
}
return &mClockLimiter{
readScheduler: readScheduler,
writeScheduler: writeScheduler,
}, nil
}
func createScheduler(config limits.OpConfig) (scheduler, error) {
if len(config.Tags) == 0 && config.MaxWaitingOps == limits.NoLimit {
return newSemaphoreScheduler(config.MaxRunningOps), nil
}
return scheduling.NewMClock(
uint64(config.MaxRunningOps), uint64(config.MaxWaitingOps),
converToSchedulingTags(config.Tags), config.IdleTimeout)
}
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
result := make(map[string]scheduling.TagInfo)
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
result[tag.String()] = scheduling.TagInfo{
Share: defaultShare,
}
}
for _, l := range limits {
v := result[l.Tag]
if l.Weight != nil && *l.Weight != 0 {
v.Share = *l.Weight
}
if l.LimitOps != nil && *l.LimitOps != 0 {
v.LimitIOPS = l.LimitOps
}
if l.ReservedOps != nil && *l.ReservedOps != 0 {
v.ReservedIOPS = l.ReservedOps
}
result[l.Tag] = v
}
return result
}
var (
_ Limiter = (*noopLimiter)(nil)
releaseStub ReleaseFunc = func() {}
noopLimiterInstance = &noopLimiter{}
)
func NewNoopLimiter() Limiter {
return &noopLimiter{}
}
type noopLimiter struct{}
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
return releaseStub, nil
}
func (n *noopLimiter) Close() {}
var _ Limiter = (*mClockLimiter)(nil)
type mClockLimiter struct {
readScheduler scheduler
writeScheduler scheduler
}
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.readScheduler)
}
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
return requestArrival(ctx, n.writeScheduler)
}
func requestArrival(ctx context.Context, s scheduler) (ReleaseFunc, error) {
tag, ok := tagging.IOTagFromContext(ctx)
if !ok {
tag = IOTagClient.String()
}
if tag == IOTagCritical.String() {
return releaseStub, nil
}
rel, err := s.RequestArrival(ctx, tag)
if err != nil {
if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) ||
errors.Is(err, errSemaphoreLimitExceeded) {
return nil, &apistatus.ResourceExhausted{}
}
return nil, err
}
return ReleaseFunc(rel), nil
}
func (n *mClockLimiter) Close() {
n.readScheduler.Close()
n.writeScheduler.Close()
}