frostfs-qos/scheduling/mclock.go
Dmitrii Stepanov 346752477b
All checks were successful
DCO action / DCO (pull_request) Successful in 21s
Vulncheck / Vulncheck (pull_request) Successful in 34s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m13s
Tests and linters / Tests with -race (pull_request) Successful in 1m14s
Tests and linters / Run gofumpt (pull_request) Successful in 1m13s
Tests and linters / gopls check (pull_request) Successful in 1m17s
Tests and linters / Lint (pull_request) Successful in 1m22s
Tests and linters / Staticcheck (pull_request) Successful in 1m18s
Tests and linters / Tests (pull_request) Successful in 1m27s
[#12] mclock: Fix timer-based scheduling
Let's assume that there are two requests in the queue with execution time t1 and t2.
The timer is set to t1. The timer is triggered, schedules the t1 request,
calculates the time for the next timer t2 to be triggered.
But it doesn't schedules timer to this time because of the
`q.timeBasedScheduleTs > nextTs` check.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-19 17:12:00 +03:00

397 lines
10 KiB
Go

package scheduling
import (
"container/heap"
"context"
"errors"
"math"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert"
)
const (
invalidIndex = -1
undefinedReservation float64 = -1.0
)
var (
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
)
type request struct {
tag string
ts float64
reservation float64
limit float64
shares float64
reservationIdx int
limitIdx int
sharesIdx int
readyIdx int
scheduled chan struct{}
canceled chan struct{}
}
// ReleaseFunc is the type of function that should be called after the request is completed.
type ReleaseFunc func()
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
type TagInfo struct {
ReservedIOPS *float64
LimitIOPS *float64
Share float64
}
// MClock is mClock scheduling algorithm implementation.
//
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
type MClock struct {
runLimit uint64
waitLimit int
clock clock
idleTimeout float64
tagInfo map[string]TagInfo
mtx sync.Mutex
previous map[string]*request
inProgress uint64
reservationQueue *queue
limitQueue *queue
sharesQueue *queue
readyQueue *queue
closed bool
}
// NewMClock creates new MClock scheduler instance with
// runLimit maximum allowed count of running requests and
// waitLimit maximum allowed count of waiting requests
// for tags specified by tagInfo. The value of idleTimeout defines
// the difference between the current time and the time of
// the previous request, at which the tag considered idle.
// If idleTimeout is negative, it means that there is no idle tags allowed.
// If waitLimit equals zero, it means that there is no limit on the
// number of waiting requests.
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) {
if err := validateParams(runLimit, tagInfo); err != nil {
return nil, err
}
result := &MClock{
runLimit: runLimit,
waitLimit: int(waitLimit),
clock: newSystemClock(),
idleTimeout: idleTimeout.Seconds(),
tagInfo: tagInfo,
reservationQueue: &queue{},
limitQueue: &queue{},
sharesQueue: &queue{},
readyQueue: &queue{},
}
previous := make(map[string]*request)
for tag := range tagInfo {
previous[tag] = &request{
tag: tag,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
}
result.previous = previous
return result, nil
}
// RequestArrival schedules new request with tag value.
// Method call is blocked until one of the following events occurs:
// request with the tag is scheduled for execution,
// context ctx is canceled or the scheduler is closed.
// If the method call returned non-nil ReleaseFunc,
// then it must be called after the request is completed.
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
req, release, err := q.pushRequest(tag)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
q.dropRequest(req)
return nil, ctx.Err()
case <-req.scheduled:
return release, nil
case <-req.canceled:
return nil, ErrMClockSchedulerClosed
}
}
// Close closes MClock scheduler.
// No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() {
q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true
q.clock.close()
for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled)
q.removeFromQueues(item.r)
}
}
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
if runLimit == 0 {
return ErrInvalidRunLimit
}
for _, v := range tagInfo {
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
return ErrInvalidTagInfo
}
if math.IsNaN(v.Share) || v.Share <= float64(0) {
return ErrInvalidTagInfo
}
}
return nil
}
func (q *MClock) dropRequest(req *request) {
q.mtx.Lock()
defer q.mtx.Unlock()
select {
case <-req.scheduled:
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
default:
}
q.removeFromQueues(req)
}
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return nil, nil, ErrMClockSchedulerClosed
}
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
}
now := q.clock.now()
tagInfo, ok := q.tagInfo[tag]
if !ok {
return nil, nil, ErrMClockSchedulerUnknownTag
}
prev, ok := q.previous[tag]
assert.Cond(ok, "undefined previous:", tag)
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
q.adjustTags(now, tag)
}
r := &request{
tag: tag,
ts: now,
shares: max(prev.shares+1.0/tagInfo.Share, now),
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
readyIdx: invalidIndex,
scheduled: make(chan struct{}),
canceled: make(chan struct{}),
}
if tagInfo.ReservedIOPS != nil {
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
} else {
r.reservation = undefinedReservation
}
if tagInfo.LimitIOPS != nil {
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
} else {
r.limit = max(prev.limit, now)
}
q.previous[tag] = r
if tagInfo.ReservedIOPS != nil {
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
}
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
q.scheduleRequestUnsafe()
return r, q.requestCompleted, nil
}
func (q *MClock) adjustTags(now float64, idleTag string) {
if q.sharesQueue.Len() == 0 {
return
}
minShare := q.sharesQueue.items[0].ts()
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
limitItem := item.(*limitMQueueItem)
if limitItem.r.tag == idleTag {
continue
}
limitItem.r.shares -= (minShare - now)
if limitItem.r.sharesIdx != invalidIndex {
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
}
if limitItem.r.readyIdx != invalidIndex {
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
}
}
}
func (q *MClock) scheduleRequest() {
q.mtx.Lock()
defer q.mtx.Unlock()
q.scheduleRequestUnsafe()
}
func (q *MClock) scheduleRequestUnsafe() {
if q.inProgress >= q.runLimit {
return
}
now := q.clock.now()
q.scheduleByReservation(now)
if q.inProgress >= q.runLimit {
return
}
q.scheduleByLimitAndWeight(now)
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
return
}
q.setNextScheduleTimer(now)
}
func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64
var hasNext bool
if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts()
hasNext = true
}
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts()
hasNext = true
}
if nextTs <= now {
// should not happen as we always compare .ts() <= now
return
}
if !hasNext {
return
}
q.clock.runAt(nextTs, q.scheduleRequest)
}
func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
}
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
hadReservation := false
if next.r.reservationIdx != invalidIndex {
hadReservation = true
heap.Remove(q.reservationQueue, next.r.reservationIdx)
}
q.removeFromQueues(next.r)
tagInfo, ok := q.tagInfo[next.r.tag]
assert.Cond(ok, "unknown tag:", next.r.tag)
if tagInfo.ReservedIOPS != nil && hadReservation {
var updated bool
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
updated = true
}
}
if updated {
heap.Init(q.reservationQueue)
}
}
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)
select {
case <-next.r.canceled:
continue
default:
}
assertIndexInvalid(next.r)
q.inProgress++
close(next.r.scheduled)
}
}
func (q *MClock) removeFromQueues(r *request) {
if r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, r.limitIdx)
}
if r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, r.sharesIdx)
}
if r.readyIdx != invalidIndex {
heap.Remove(q.readyQueue, r.readyIdx)
}
if r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, r.reservationIdx)
}
}
func (q *MClock) requestCompleted() {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.closed {
return
}
assert.Cond(q.inProgress > 0, "invalid requests count")
q.inProgress--
q.scheduleRequestUnsafe()
}
func assertIndexInvalid(r *request) {
assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1")
assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1")
assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1")
assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1")
}