[#1636] qos: Add Limiter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
98d6125029
commit
92a67a6716
4 changed files with 160 additions and 4 deletions
132
internal/qos/limiter.go
Normal file
132
internal/qos/limiter.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package qos
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/limits"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/assert"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/scheduling"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-qos/tagging"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultIdleTimeout time.Duration = 0
|
||||
defaultShare float64 = 1.0
|
||||
)
|
||||
|
||||
type ReleaseFunc scheduling.ReleaseFunc
|
||||
|
||||
type Limiter interface {
|
||||
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||
Close()
|
||||
}
|
||||
|
||||
func NewLimiter(c *limits.Config) (Limiter, error) {
|
||||
if err := validateConfig(c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
read, write := c.Read(), c.Write()
|
||||
if isNoop(read, write) {
|
||||
return noopLimiterInstance, nil
|
||||
}
|
||||
readScheduler, err := scheduling.NewMClock(
|
||||
uint64(read.MaxRunningOps), uint64(read.MaxWaitingOps),
|
||||
converToSchedulingTags(read.Tags), read.IdleTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create read scheduler: %w", err)
|
||||
}
|
||||
writeScheduler, err := scheduling.NewMClock(
|
||||
uint64(write.MaxRunningOps), uint64(write.MaxWaitingOps),
|
||||
converToSchedulingTags(write.Tags), write.IdleTimeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create write scheduler: %w", err)
|
||||
}
|
||||
return &mClockLimiter{
|
||||
readScheduler: readScheduler,
|
||||
writeScheduler: writeScheduler,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func converToSchedulingTags(limits []limits.IOTagConfig) map[string]scheduling.TagInfo {
|
||||
result := make(map[string]scheduling.TagInfo)
|
||||
for _, tag := range []IOTag{IOTagClient, IOTagBackground, IOTagInternal, IOTagPolicer, IOTagWritecache} {
|
||||
result[tag.String()] = scheduling.TagInfo{
|
||||
Share: defaultShare,
|
||||
}
|
||||
}
|
||||
for _, l := range limits {
|
||||
v := result[l.Tag]
|
||||
if l.Weight != nil && *l.Weight != 0 {
|
||||
v.Share = *l.Weight
|
||||
}
|
||||
if l.LimitOps != nil && *l.LimitOps != 0 {
|
||||
v.LimitIOPS = l.LimitOps
|
||||
}
|
||||
if l.ReservedOps != nil && *l.ReservedOps != 0 {
|
||||
v.ReservedIOPS = l.ReservedOps
|
||||
}
|
||||
result[l.Tag] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
var (
|
||||
_ Limiter = (*noopLimiter)(nil)
|
||||
releaseStub ReleaseFunc = func() {}
|
||||
noopLimiterInstance = &noopLimiter{}
|
||||
)
|
||||
|
||||
type noopLimiter struct{}
|
||||
|
||||
func (n *noopLimiter) ReadRequest(context.Context) (ReleaseFunc, error) {
|
||||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) WriteRequest(context.Context) (ReleaseFunc, error) {
|
||||
return releaseStub, nil
|
||||
}
|
||||
|
||||
func (n *noopLimiter) Close() {}
|
||||
|
||||
var _ Limiter = (*mClockLimiter)(nil)
|
||||
|
||||
type mClockLimiter struct {
|
||||
readScheduler *scheduling.MClock
|
||||
writeScheduler *scheduling.MClock
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) ReadRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
assert.True(ok, "request has no tag")
|
||||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := n.readScheduler.RequestArrival(ctx, tag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ReleaseFunc(rel), nil
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) WriteRequest(ctx context.Context) (ReleaseFunc, error) {
|
||||
tag, ok := tagging.IOTagFromContext(ctx)
|
||||
if !ok {
|
||||
tag = IOTagClient.String()
|
||||
}
|
||||
if tag == IOTagCritical.String() {
|
||||
return releaseStub, nil
|
||||
}
|
||||
rel, err := n.writeScheduler.RequestArrival(ctx, tag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ReleaseFunc(rel), nil
|
||||
}
|
||||
|
||||
func (n *mClockLimiter) Close() {
|
||||
n.readScheduler.Close()
|
||||
n.writeScheduler.Close()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue