2025-01-20 17:10:24 +03:00
|
|
|
package scheduling
|
|
|
|
|
|
|
|
import (
|
|
|
|
"container/heap"
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"math"
|
|
|
|
"sync"
|
2025-01-27 16:46:38 +03:00
|
|
|
"time"
|
2025-01-28 11:35:54 +03:00
|
|
|
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert"
|
2025-01-20 17:10:24 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
invalidIndex = -1
|
|
|
|
undefinedReservation float64 = -1.0
|
2025-02-18 15:36:58 +03:00
|
|
|
minusOne = ^uint64(0)
|
2025-01-20 17:10:24 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
|
|
|
|
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
|
|
|
|
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
|
|
|
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
|
|
|
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
|
|
|
)
|
|
|
|
|
|
|
|
type request struct {
|
|
|
|
tag string
|
|
|
|
ts float64
|
|
|
|
|
|
|
|
reservation float64
|
|
|
|
limit float64
|
|
|
|
shares float64
|
|
|
|
|
|
|
|
reservationIdx int
|
|
|
|
limitIdx int
|
|
|
|
sharesIdx int
|
|
|
|
readyIdx int
|
|
|
|
|
|
|
|
scheduled chan struct{}
|
|
|
|
canceled chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReleaseFunc is the type of function that should be called after the request is completed.
|
|
|
|
type ReleaseFunc func()
|
|
|
|
|
|
|
|
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
|
|
|
|
type TagInfo struct {
|
|
|
|
ReservedIOPS *float64
|
|
|
|
LimitIOPS *float64
|
|
|
|
Share float64
|
|
|
|
}
|
|
|
|
|
|
|
|
// MClock is mClock scheduling algorithm implementation.
|
|
|
|
//
|
|
|
|
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
|
|
|
|
type MClock struct {
|
|
|
|
runLimit uint64
|
|
|
|
waitLimit int
|
|
|
|
clock clock
|
|
|
|
idleTimeout float64
|
|
|
|
tagInfo map[string]TagInfo
|
2025-02-18 15:36:58 +03:00
|
|
|
tagStat map[string]*Stat
|
|
|
|
stats []*Stat
|
2025-01-20 17:10:24 +03:00
|
|
|
|
2025-01-24 12:18:16 +03:00
|
|
|
mtx sync.Mutex
|
|
|
|
previous map[string]*request
|
|
|
|
inProgress uint64
|
|
|
|
timeBasedScheduleTs float64
|
|
|
|
reservationQueue *queue
|
|
|
|
limitQueue *queue
|
|
|
|
sharesQueue *queue
|
|
|
|
readyQueue *queue
|
|
|
|
closed bool
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewMClock creates new MClock scheduler instance with
|
|
|
|
// runLimit maximum allowed count of running requests and
|
|
|
|
// waitLimit maximum allowed count of waiting requests
|
|
|
|
// for tags specified by tagInfo. The value of idleTimeout defines
|
|
|
|
// the difference between the current time and the time of
|
2025-01-27 16:46:38 +03:00
|
|
|
// the previous request, at which the tag considered idle.
|
2025-01-20 17:10:24 +03:00
|
|
|
// If idleTimeout is negative, it means that there is no idle tags allowed.
|
|
|
|
// If waitLimit equals zero, it means that there is no limit on the
|
|
|
|
// number of waiting requests.
|
2025-01-27 16:46:38 +03:00
|
|
|
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) {
|
2025-01-20 17:10:24 +03:00
|
|
|
if err := validateParams(runLimit, tagInfo); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result := &MClock{
|
|
|
|
runLimit: runLimit,
|
|
|
|
waitLimit: int(waitLimit),
|
|
|
|
clock: newSystemClock(),
|
2025-01-27 16:46:38 +03:00
|
|
|
idleTimeout: idleTimeout.Seconds(),
|
2025-01-20 17:10:24 +03:00
|
|
|
tagInfo: tagInfo,
|
|
|
|
|
2025-01-24 12:18:16 +03:00
|
|
|
reservationQueue: &queue{},
|
|
|
|
limitQueue: &queue{},
|
|
|
|
sharesQueue: &queue{},
|
|
|
|
readyQueue: &queue{},
|
|
|
|
timeBasedScheduleTs: math.MaxFloat64,
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
previous := make(map[string]*request)
|
|
|
|
for tag := range tagInfo {
|
|
|
|
previous[tag] = &request{
|
|
|
|
tag: tag,
|
|
|
|
reservationIdx: invalidIndex,
|
|
|
|
limitIdx: invalidIndex,
|
|
|
|
sharesIdx: invalidIndex,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
result.previous = previous
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
result.tagStat = make(map[string]*Stat, len(tagInfo))
|
|
|
|
result.stats = make([]*Stat, 0, len(tagInfo))
|
|
|
|
for tag := range tagInfo {
|
|
|
|
s := &Stat{
|
|
|
|
tag: tag,
|
|
|
|
}
|
|
|
|
result.tagStat[tag] = s
|
|
|
|
result.stats = append(result.stats, s)
|
|
|
|
}
|
|
|
|
|
2025-01-20 17:10:24 +03:00
|
|
|
return result, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RequestArrival schedules new request with tag value.
|
|
|
|
// Method call is blocked until one of the following events occurs:
|
|
|
|
// request with the tag is scheduled for execution,
|
|
|
|
// context ctx is canceled or the scheduler is closed.
|
|
|
|
// If the method call returned non-nil ReleaseFunc,
|
|
|
|
// then it must be called after the request is completed.
|
|
|
|
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
|
|
|
|
req, release, err := q.pushRequest(tag)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
q.dropRequest(req)
|
|
|
|
return nil, ctx.Err()
|
|
|
|
case <-req.scheduled:
|
|
|
|
return release, nil
|
|
|
|
case <-req.canceled:
|
|
|
|
return nil, ErrMClockSchedulerClosed
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes MClock scheduler.
|
|
|
|
// No new requests for scheduling will be accepted after the closing.
|
|
|
|
func (q *MClock) Close() {
|
|
|
|
q.mtx.Lock()
|
|
|
|
defer q.mtx.Unlock()
|
|
|
|
|
|
|
|
q.closed = true
|
|
|
|
q.clock.close()
|
|
|
|
for q.limitQueue.Len() > 0 {
|
|
|
|
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
|
|
|
close(item.r.canceled)
|
|
|
|
q.removeFromQueues(item.r)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
// Stats returns per tag stat.
|
|
|
|
// Returned slice should not be modified.
|
|
|
|
func (q *MClock) Stats() []*Stat {
|
|
|
|
return q.stats
|
|
|
|
}
|
|
|
|
|
2025-01-20 17:10:24 +03:00
|
|
|
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
|
|
|
if runLimit == 0 {
|
|
|
|
return ErrInvalidRunLimit
|
|
|
|
}
|
|
|
|
for _, v := range tagInfo {
|
|
|
|
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
|
|
|
|
return ErrInvalidTagInfo
|
|
|
|
}
|
|
|
|
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
|
|
|
|
return ErrInvalidTagInfo
|
|
|
|
}
|
|
|
|
if math.IsNaN(v.Share) || v.Share <= float64(0) {
|
|
|
|
return ErrInvalidTagInfo
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) dropRequest(req *request) {
|
|
|
|
q.mtx.Lock()
|
|
|
|
defer q.mtx.Unlock()
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
s, ok := q.tagStat[req.tag]
|
|
|
|
assert.Cond(ok, "undefined stat tag:", req.tag)
|
|
|
|
|
2025-01-20 17:10:24 +03:00
|
|
|
select {
|
|
|
|
case <-req.scheduled:
|
2025-01-28 11:35:54 +03:00
|
|
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
2025-01-20 17:10:24 +03:00
|
|
|
q.inProgress--
|
2025-02-18 15:36:58 +03:00
|
|
|
s.inProgress.Add(minusOne)
|
2025-01-20 17:10:24 +03:00
|
|
|
default:
|
2025-02-18 15:36:58 +03:00
|
|
|
s.pending.Add(minusOne)
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
q.removeFromQueues(req)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
|
|
|
q.mtx.Lock()
|
|
|
|
defer q.mtx.Unlock()
|
|
|
|
|
|
|
|
if q.closed {
|
|
|
|
return nil, nil, ErrMClockSchedulerClosed
|
|
|
|
}
|
|
|
|
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
|
|
|
|
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
|
|
|
|
}
|
|
|
|
|
|
|
|
now := q.clock.now()
|
|
|
|
tagInfo, ok := q.tagInfo[tag]
|
|
|
|
if !ok {
|
|
|
|
return nil, nil, ErrMClockSchedulerUnknownTag
|
|
|
|
}
|
|
|
|
prev, ok := q.previous[tag]
|
2025-01-28 11:35:54 +03:00
|
|
|
assert.Cond(ok, "undefined previous:", tag)
|
2025-01-20 17:10:24 +03:00
|
|
|
|
|
|
|
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
|
|
|
|
q.adjustTags(now, tag)
|
|
|
|
}
|
|
|
|
|
|
|
|
r := &request{
|
|
|
|
tag: tag,
|
|
|
|
ts: now,
|
|
|
|
shares: max(prev.shares+1.0/tagInfo.Share, now),
|
|
|
|
reservationIdx: invalidIndex,
|
|
|
|
limitIdx: invalidIndex,
|
|
|
|
sharesIdx: invalidIndex,
|
|
|
|
readyIdx: invalidIndex,
|
|
|
|
scheduled: make(chan struct{}),
|
|
|
|
canceled: make(chan struct{}),
|
|
|
|
}
|
|
|
|
if tagInfo.ReservedIOPS != nil {
|
|
|
|
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
|
|
|
|
} else {
|
|
|
|
r.reservation = undefinedReservation
|
|
|
|
}
|
|
|
|
|
|
|
|
if tagInfo.LimitIOPS != nil {
|
|
|
|
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
|
|
|
|
} else {
|
|
|
|
r.limit = max(prev.limit, now)
|
|
|
|
}
|
|
|
|
|
|
|
|
q.previous[tag] = r
|
|
|
|
if tagInfo.ReservedIOPS != nil {
|
|
|
|
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
|
|
|
|
}
|
|
|
|
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
|
|
|
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
2025-02-18 15:36:58 +03:00
|
|
|
|
|
|
|
s, ok := q.tagStat[tag]
|
|
|
|
assert.Cond(ok, "undefined stat tag:", tag)
|
|
|
|
s.pending.Add(1)
|
|
|
|
|
2025-01-28 11:23:07 +03:00
|
|
|
q.scheduleRequestUnsafe()
|
2025-01-20 17:10:24 +03:00
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
return r, func() { q.requestCompleted(tag) }, nil
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) adjustTags(now float64, idleTag string) {
|
|
|
|
if q.sharesQueue.Len() == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
minShare := q.sharesQueue.items[0].ts()
|
|
|
|
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
|
|
|
|
limitItem := item.(*limitMQueueItem)
|
|
|
|
if limitItem.r.tag == idleTag {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
limitItem.r.shares -= (minShare - now)
|
|
|
|
if limitItem.r.sharesIdx != invalidIndex {
|
|
|
|
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
|
|
|
|
}
|
|
|
|
if limitItem.r.readyIdx != invalidIndex {
|
|
|
|
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-01-28 11:23:07 +03:00
|
|
|
func (q *MClock) scheduleRequest() {
|
|
|
|
q.mtx.Lock()
|
|
|
|
defer q.mtx.Unlock()
|
|
|
|
|
|
|
|
q.scheduleRequestUnsafe()
|
|
|
|
}
|
2025-01-20 17:10:24 +03:00
|
|
|
|
2025-01-28 11:23:07 +03:00
|
|
|
func (q *MClock) scheduleRequestUnsafe() {
|
2025-01-20 17:10:24 +03:00
|
|
|
if q.inProgress >= q.runLimit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
now := q.clock.now()
|
|
|
|
q.scheduleByReservation(now)
|
|
|
|
if q.inProgress >= q.runLimit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
q.scheduleByLimitAndWeight(now)
|
|
|
|
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
q.setNextScheduleTimer(now)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) setNextScheduleTimer(now float64) {
|
|
|
|
nextTs := math.MaxFloat64
|
|
|
|
if q.reservationQueue.Len() > 0 {
|
|
|
|
nextTs = q.reservationQueue.items[0].ts()
|
|
|
|
}
|
|
|
|
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
|
|
|
nextTs = q.limitQueue.items[0].ts()
|
|
|
|
}
|
2025-01-24 12:18:16 +03:00
|
|
|
if nextTs <= now {
|
|
|
|
// should not happen as we always compare .ts() <= now
|
|
|
|
return
|
|
|
|
}
|
2025-01-20 17:10:24 +03:00
|
|
|
|
2025-01-24 12:18:16 +03:00
|
|
|
if q.timeBasedScheduleTs > nextTs {
|
2025-01-20 17:10:24 +03:00
|
|
|
q.clock.runAt(nextTs, func() {
|
2025-01-28 11:23:07 +03:00
|
|
|
q.scheduleRequest()
|
2025-01-20 17:10:24 +03:00
|
|
|
})
|
2025-01-24 12:18:16 +03:00
|
|
|
q.timeBasedScheduleTs = nextTs
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
|
|
|
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
|
|
|
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
|
|
|
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
|
|
|
}
|
|
|
|
|
|
|
|
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
|
|
|
|
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
|
|
|
|
hadReservation := false
|
|
|
|
if next.r.reservationIdx != invalidIndex {
|
|
|
|
hadReservation = true
|
|
|
|
heap.Remove(q.reservationQueue, next.r.reservationIdx)
|
|
|
|
}
|
|
|
|
q.removeFromQueues(next.r)
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
s, ok := q.tagStat[next.r.tag]
|
|
|
|
assert.Cond(ok, "undefined stat tag:", next.r.tag)
|
|
|
|
s.pending.Add(minusOne)
|
|
|
|
|
2025-01-20 17:10:24 +03:00
|
|
|
tagInfo, ok := q.tagInfo[next.r.tag]
|
2025-01-28 11:35:54 +03:00
|
|
|
assert.Cond(ok, "unknown tag:", next.r.tag)
|
2025-01-20 17:10:24 +03:00
|
|
|
if tagInfo.ReservedIOPS != nil && hadReservation {
|
|
|
|
var updated bool
|
|
|
|
for _, i := range q.reservationQueue.items {
|
|
|
|
ri := i.(*reservationMQueueItem)
|
|
|
|
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
|
|
|
|
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
|
|
|
|
updated = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if updated {
|
|
|
|
heap.Init(q.reservationQueue)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-next.r.canceled:
|
|
|
|
continue
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
assertIndexInvalid(next.r)
|
|
|
|
q.inProgress++
|
2025-02-18 15:36:58 +03:00
|
|
|
s.inProgress.Add(1)
|
2025-01-20 17:10:24 +03:00
|
|
|
close(next.r.scheduled)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) scheduleByReservation(now float64) {
|
|
|
|
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
|
|
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
|
|
|
q.removeFromQueues(next.r)
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
s, ok := q.tagStat[next.r.tag]
|
|
|
|
assert.Cond(ok, "undefined stat tag:", next.r.tag)
|
|
|
|
s.pending.Add(minusOne)
|
|
|
|
|
2025-01-20 17:10:24 +03:00
|
|
|
select {
|
|
|
|
case <-next.r.canceled:
|
|
|
|
continue
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
assertIndexInvalid(next.r)
|
|
|
|
q.inProgress++
|
2025-02-18 15:36:58 +03:00
|
|
|
s.inProgress.Add(1)
|
2025-01-20 17:10:24 +03:00
|
|
|
close(next.r.scheduled)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *MClock) removeFromQueues(r *request) {
|
|
|
|
if r.limitIdx != invalidIndex {
|
|
|
|
heap.Remove(q.limitQueue, r.limitIdx)
|
|
|
|
}
|
|
|
|
if r.sharesIdx != invalidIndex {
|
|
|
|
heap.Remove(q.sharesQueue, r.sharesIdx)
|
|
|
|
}
|
|
|
|
if r.readyIdx != invalidIndex {
|
|
|
|
heap.Remove(q.readyQueue, r.readyIdx)
|
|
|
|
}
|
|
|
|
if r.reservationIdx != invalidIndex {
|
|
|
|
heap.Remove(q.reservationQueue, r.reservationIdx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2025-02-18 15:36:58 +03:00
|
|
|
func (q *MClock) requestCompleted(tag string) {
|
2025-01-20 17:10:24 +03:00
|
|
|
q.mtx.Lock()
|
|
|
|
defer q.mtx.Unlock()
|
|
|
|
|
|
|
|
if q.closed {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2025-01-28 11:35:54 +03:00
|
|
|
assert.Cond(q.inProgress > 0, "invalid requests count")
|
2025-01-20 17:10:24 +03:00
|
|
|
q.inProgress--
|
2025-02-18 15:36:58 +03:00
|
|
|
s, ok := q.tagStat[tag]
|
|
|
|
assert.Cond(ok, "undefined stat tag:", tag)
|
|
|
|
s.inProgress.Add(minusOne)
|
2025-01-28 11:23:07 +03:00
|
|
|
q.scheduleRequestUnsafe()
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func assertIndexInvalid(r *request) {
|
2025-01-28 11:35:54 +03:00
|
|
|
assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1")
|
|
|
|
assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1")
|
|
|
|
assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1")
|
|
|
|
assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1")
|
2025-01-20 17:10:24 +03:00
|
|
|
}
|