frostfs-qos/scheduling/mclock.go
Dmitrii Stepanov 57d895c321
All checks were successful
DCO action / DCO (pull_request) Successful in 24s
Vulncheck / Vulncheck (pull_request) Successful in 33s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m11s
Tests and linters / Tests with -race (pull_request) Successful in 1m14s
Tests and linters / Run gofumpt (pull_request) Successful in 1m11s
Tests and linters / Lint (pull_request) Successful in 1m22s
Tests and linters / Staticcheck (pull_request) Successful in 1m17s
Tests and linters / gopls check (pull_request) Successful in 1m18s
Tests and linters / Tests (pull_request) Successful in 1m26s
Vulncheck / Vulncheck (push) Successful in 34s
Tests and linters / Run gofumpt (push) Successful in 45s
Tests and linters / Staticcheck (push) Successful in 1m0s
Tests and linters / Tests (push) Successful in 1m3s
Tests and linters / Tests with -race (push) Successful in 1m5s
Tests and linters / Lint (push) Successful in 1m13s
Pre-commit hooks / Pre-commit (push) Successful in 1m20s
Tests and linters / gopls check (push) Successful in 1m16s
[#13] mclock: Schedule requests as soon as possible
Let's assume that for some tag `limit = 1000 RPS` defined and each
request takes 10 ms to complete. At some point in time 1000 requests
were accepted. Then first request will be scheduled at `now()`, second -
at `now() + 1 ms`, third - at `now() + 2 ms` etc. Total processing
duration of 1000 requests will be 1 second + 10 ms.

After this fix scheduler looks forward to schedule requests within limit.
So for situation above total processing duration of 1000 requests will be
10 ms in ideal world.

The same for reservation scheduling.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-24 16:36:47 +03:00

401 lines
10 KiB
Go

package scheduling
import (
"container/heap"
"context"
"errors"
"math"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert"
)
const (
invalidIndex = -1
undefinedReservation float64 = -1.0
)
var (
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
)
type request struct {
tag string
ts float64
reservation float64
limit float64
shares float64
reservationIdx int
limitIdx int
sharesIdx int
readyIdx int
scheduled chan struct{}
canceled chan struct{}
}
// ReleaseFunc is the type of function that should be called after the request is completed.
type ReleaseFunc func()
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
type TagInfo struct {
ReservedIOPS *float64
LimitIOPS *float64
Share float64
}
// MClock is mClock scheduling algorithm implementation.
//
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
type MClock struct {
runLimit uint64
waitLimit int
clock clock
idleTimeout float64
tagInfo map[string]TagInfo
mtx sync.Mutex
previous map[string]*request
inProgress uint64
reservationQueue *queue
limitQueue *queue
sharesQueue *queue
readyQueue *queue
closed bool
}
// NewMClock creates new MClock scheduler instance with
// runLimit maximum allowed count of running requests and
// waitLimit maximum allowed count of waiting requests
// for tags specified by tagInfo. The value of idleTimeout defines
// the difference between the current time and the time of
// the previous request, at which the tag considered idle.
// If idleTimeout is negative, it means that there is no idle tags allowed.
// If waitLimit equals zero, it means that there is no limit on the
// number of waiting requests.
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) {
if err := validateParams(runLimit, tagInfo); err != nil {
return nil, err
}
result := &MClock{
runLimit: runLimit,
waitLimit: int(waitLimit),
clock: newSystemClock(),
idleTimeout: idleTimeout.Seconds(),
tagInfo: tagInfo,
reservationQueue: &queue{},
limitQueue: &queue{},
sharesQueue: &queue{},
readyQueue: &queue{},
}
previous := make(map[string]*request)
for tag := range tagInfo {
previous[tag] = &request{
tag: tag,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
}
result.previous = previous
return result, nil
}
// RequestArrival schedules new request with tag value.
// Method call is blocked until one of the following events occurs:
// request with the tag is scheduled for execution,
// context ctx is canceled or the scheduler is closed.
// If the method call returned non-nil ReleaseFunc,
// then it must be called after the request is completed.
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
req, release, err := q.pushRequest(tag)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
q.dropRequest(req)
return nil, ctx.Err()
case <-req.scheduled:
return release, nil
case <-req.canceled:
return nil, ErrMClockSchedulerClosed
}
}
// Close closes MClock scheduler.
// No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() {
q.mtx.Lock()
q.closed = true
for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled)
q.removeFromQueues(item.r)
}
q.mtx.Unlock()
q.clock.close()
}
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
if runLimit == 0 {
return ErrInvalidRunLimit
}
for _, v := range tagInfo {
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if math.IsNaN(v.Share) || v.Share <= float64(0) {
return ErrInvalidTagInfo
}
}
return nil
}
func (q *MClock) dropRequest(req *request) {
q.mtx.Lock()
defer q.mtx.Unlock()
select {
case <-req.scheduled:
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
default:
}
q.removeFromQueues(req)
}
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return nil, nil, ErrMClockSchedulerClosed
}
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
}
now := q.clock.now()
tagInfo, ok := q.tagInfo[tag]
if !ok {
return nil, nil, ErrMClockSchedulerUnknownTag
}
prev, ok := q.previous[tag]
assert.Cond(ok, "undefined previous:", tag)
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
q.adjustTags(now, tag)
}
r := &request{
tag: tag,
ts: now,
shares: max(prev.shares+1.0/tagInfo.Share, now),
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
readyIdx: invalidIndex,
scheduled: make(chan struct{}),
canceled: make(chan struct{}),
}
if tagInfo.ReservedIOPS != nil {
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
} else {
r.reservation = undefinedReservation
}
if tagInfo.LimitIOPS != nil {
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
} else {
r.limit = max(prev.limit, now)
}
q.previous[tag] = r
if tagInfo.ReservedIOPS != nil {
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
}
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
q.scheduleRequestUnsafe()
return r, q.requestCompleted, nil
}
func (q *MClock) adjustTags(now float64, idleTag string) {
if q.sharesQueue.Len() == 0 {
return
}
minShare := q.sharesQueue.items[0].ts()
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
limitItem := item.(*limitMQueueItem)
if limitItem.r.tag == idleTag {
continue
}
limitItem.r.shares -= (minShare - now)
if limitItem.r.sharesIdx != invalidIndex {
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
}
if limitItem.r.readyIdx != invalidIndex {
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
}
}
}
func (q *MClock) scheduleRequest() {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return
}
q.scheduleRequestUnsafe()
}
func (q *MClock) scheduleRequestUnsafe() {
if q.inProgress >= q.runLimit {
return
}
now := q.clock.now()
q.scheduleByReservation(now)
if q.inProgress >= q.runLimit {
return
}
q.scheduleByLimitAndWeight(now)
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return
}
q.setNextScheduleTimer(now)
}
func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64
var hasNext bool
if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts()
hasNext = true
}
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
hasNext = true
}
if nextTs <= now {
// should not happen as we always compare .ts() <= now
return
}
if !hasNext {
return
}
q.clock.runAt(nextTs, q.scheduleRequest)
}
func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
}
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
hadReservation := false
if next.r.reservationIdx != invalidIndex {
hadReservation = true
heap.Remove(q.reservationQueue, next.r.reservationIdx)
}
q.removeFromQueues(next.r)
tagInfo, ok := q.tagInfo[next.r.tag]
assert.Cond(ok, "unknown tag:", next.r.tag)
if tagInfo.ReservedIOPS != nil && hadReservation {
var updated bool
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
updated = true
}
}
if updated {
heap.Init(q.reservationQueue)
}
}
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) removeFromQueues(r *request) {
if r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, r.limitIdx)
}
if r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, r.sharesIdx)
}
if r.readyIdx != invalidIndex {
heap.Remove(q.readyQueue, r.readyIdx)
}
if r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, r.reservationIdx)
}
}
func (q *MClock) requestCompleted() {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return
}
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
q.scheduleRequestUnsafe()
}
func assertIndexInvalid(r *request) {
assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1")
assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1")
assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1")
assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1")
}