package qos import ( "context" "errors" "fmt" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert" "git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling" "git.frostfs.info/TrueCloudLab/frostfs-qos/tagging" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" ) const ( defaultIdleTimeout time.Duration = 0 defaultShare float64 = 1.0 ) type ReleaseFunc scheduling.ReleaseFunc type Limiter interface { ReadRequest(context.Context) (ReleaseFunc, error) WriteRequest(context.Context) (ReleaseFunc, error) Close() } func NewLimiter(c *limits.Config) (Limiter, error) { if err := validateConfig(c); err != nil { return nil, err } read, write := c.Read(), c.Write() if isNoop(read, write) { return noopLimiterInstance, nil } readScheduler, err := scheduling.NewMClock( uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps), converToSchedulingTags(read.Tags), read.IdleTimeout) if err != nil { return nil, fmt.Errorf("create read scheduler: %w", err) } writeScheduler, err := scheduling.NewMClock( uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps), converToSchedulingTags(write.Tags), write.IdleTimeout) if err != nil { return nil, fmt.Errorf("create write scheduler: %w", err) } return &mClockLimiter{ readScheduler: readScheduler, writeScheduler: writeScheduler, }, nil } func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo { result := make(map[string]scheduling.TagInfo) for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} { result[tag.String()] = scheduling.TagInfo{ Share: defaultShare, } } for _, l := range limits { v := result[l.Tag] if l.Weight != nil && *l.Weight != 0 { v.Share = *l.Weight } if l.LimitOps != nil && *l.LimitOps != 0 { v.LimitIOPS = l.LimitOps } if l.ReservedOps != nil && *l.ReservedOps != 0 { v.ReservedIOPS = l.ReservedOps } result[l.Tag] = v } return result } var ( _ Limiter = (*noopLimiter)(nil) releaseStub ReleaseFunc = func() {} noopLimiterInstance = &noopLimiter{} ) func NewNoopLimiter() Limiter { return &noopLimiter{} } type noopLimiter struct{} func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) { return releaseStub, nil } func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) { return releaseStub, nil } func (n *noopLimiter) Close() {} var _ Limiter = (*mClockLimiter)(nil) type mClockLimiter struct { readScheduler *scheduling.MClock writeScheduler *scheduling.MClock } func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) { tag, ok := tagging.IOTagFromContext(ctx) assert.True(ok, "request has no tag") if tag == IOTagCritical.String() { return releaseStub, nil } rel, err := n.readScheduler.RequestArrival(ctx, tag) if err != nil { if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) { return nil, &apistatus.ResourceExhausted{} } return nil, err } return ReleaseFunc(rel), nil } func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) { tag, ok := tagging.IOTagFromContext(ctx) if !ok { tag = IOTagClient.String() } if tag == IOTagCritical.String() { return releaseStub, nil } rel, err := n.writeScheduler.RequestArrival(ctx, tag) if err != nil { if errors.Is(err, scheduling.ErrMClockSchedulerRequestLimitExceeded) { return nil, &apistatus.ResourceExhausted{} } return nil, err } return ReleaseFunc(rel), nil } func (n *mClockLimiter) Close() { n.readScheduler.Close() n.writeScheduler.Close() }