package scheduling import ( "container/heap" "context" "errors" "math" "sync" "time" ) const ( invalidIndex = -1 undefinedReservation float64 = -1.0 ) var ( ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed") ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded") ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") ) type queueItem interface { ts() float64 setIndex(idx int) } type queue struct { items []queueItem } type request struct { tag string ts float64 reservation float64 limit float64 shares float64 reservationIdx int limitIdx int sharesIdx int readyIdx int scheduled chan struct{} canceled chan struct{} } type clock interface { now() float64 runAt(ts float64, f func()) close() } // ReleaseFunc is the type of function that should be called after the request is completed. type ReleaseFunc func() // TagInfo contains reserved IOPS, IOPS limit and share values for a tag. type TagInfo struct { ReservedIOPS *float64 LimitIOPS *float64 Share float64 } // MClock is mClock scheduling algorithm implementation. // // See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details. type MClock struct { runLimit uint64 waitLimit int clock clock idleTimeout float64 tagInfo map[string]TagInfo mtx sync.Mutex previous map[string]*request inProgress uint64 lastSchedule float64 reservationQueue *queue limitQueue *queue sharesQueue *queue readyQueue *queue closed bool } // NewMClock creates new MClock scheduler instance with // runLimit maximum allowed count of running requests and // waitLimit maximum allowed count of waiting requests // for tags specified by tagInfo. The value of idleTimeout defines // the difference between the current time and the time of // the previous request in seconds, at which the tag considered idle. // If idleTimeout is negative, it means that there is no idle tags allowed. // If waitLimit equals zero, it means that there is no limit on the // number of waiting requests. func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout float64) (*MClock, error) { if err := validateParams(runLimit, tagInfo); err != nil { return nil, err } result := &MClock{ runLimit: runLimit, waitLimit: int(waitLimit), clock: newSystemClock(), idleTimeout: idleTimeout, tagInfo: tagInfo, reservationQueue: &queue{}, limitQueue: &queue{}, sharesQueue: &queue{}, readyQueue: &queue{}, } previous := make(map[string]*request) for tag := range tagInfo { previous[tag] = &request{ tag: tag, reservationIdx: invalidIndex, limitIdx: invalidIndex, sharesIdx: invalidIndex, } } result.previous = previous return result, nil } // RequestArrival schedules new request with tag value. // Method call is blocked until one of the following events occurs: // request with the tag is scheduled for execution, // context ctx is canceled or the scheduler is closed. // If the method call returned non-nil ReleaseFunc, // then it should be called after the request is completed. func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) { req, release, err := q.pushRequest(tag) if err != nil { return nil, err } select { case <-ctx.Done(): q.dropRequest(req) return nil, ctx.Err() case <-req.scheduled: return release, nil case <-req.canceled: return nil, ErrMClockSchedulerClosed } } // Close closes MClock scheduler. // No new requests for scheduling will be accepted after the closing. func (q *MClock) Close() { q.mtx.Lock() defer q.mtx.Unlock() q.closed = true q.clock.close() for q.limitQueue.Len() > 0 { item := heap.Pop(q.limitQueue).(*limitMQueueItem) close(item.r.canceled) q.removeFromQueues(item.r) } } func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { if runLimit == 0 { return ErrInvalidRunLimit } for _, v := range tagInfo { if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) { return ErrInvalidTagInfo } if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) { return ErrInvalidTagInfo } if math.IsNaN(v.Share) || v.Share <= float64(0) { return ErrInvalidTagInfo } } return nil } func (q *MClock) dropRequest(req *request) { q.mtx.Lock() defer q.mtx.Unlock() select { case <-req.scheduled: if q.inProgress == 0 { panic("invalid requests count") } q.inProgress-- default: } q.removeFromQueues(req) } func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { q.mtx.Lock() defer q.mtx.Unlock() if q.closed { return nil, nil, ErrMClockSchedulerClosed } if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit { return nil, nil, ErrMClockSchedulerRequestLimitExceeded } now := q.clock.now() tagInfo, ok := q.tagInfo[tag] if !ok { return nil, nil, ErrMClockSchedulerUnknownTag } prev, ok := q.previous[tag] if !ok { panic("undefined previous: " + tag) } if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout q.adjustTags(now, tag) } r := &request{ tag: tag, ts: now, shares: max(prev.shares+1.0/tagInfo.Share, now), reservationIdx: invalidIndex, limitIdx: invalidIndex, sharesIdx: invalidIndex, readyIdx: invalidIndex, scheduled: make(chan struct{}), canceled: make(chan struct{}), } if tagInfo.ReservedIOPS != nil { r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now) } else { r.reservation = undefinedReservation } if tagInfo.LimitIOPS != nil { r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now) } else { r.limit = max(prev.limit, now) } q.previous[tag] = r if tagInfo.ReservedIOPS != nil { heap.Push(q.reservationQueue, &reservationMQueueItem{r: r}) } heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) heap.Push(q.limitQueue, &limitMQueueItem{r: r}) q.scheduleRequest(true) return r, q.requestCompleted, nil } func (q *MClock) adjustTags(now float64, idleTag string) { if q.sharesQueue.Len() == 0 { return } minShare := q.sharesQueue.items[0].ts() for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed limitItem := (item).(*limitMQueueItem) if limitItem.r.tag == idleTag { continue } limitItem.r.shares -= (minShare - now) if limitItem.r.sharesIdx != invalidIndex { heap.Fix(q.sharesQueue, limitItem.r.sharesIdx) } if limitItem.r.readyIdx != invalidIndex { heap.Fix(q.readyQueue, limitItem.r.readyIdx) } } } func (q *MClock) scheduleRequest(lockTaken bool) { if !lockTaken { q.mtx.Lock() defer q.mtx.Unlock() } if q.inProgress >= q.runLimit { return } now := q.clock.now() q.scheduleByReservation(now) if q.inProgress >= q.runLimit { return } q.scheduleByLimitAndWeight(now) if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { return } q.setNextScheduleTimer(now) } func (q *MClock) setNextScheduleTimer(now float64) { nextTs := math.MaxFloat64 if q.reservationQueue.Len() > 0 { nextTs = q.reservationQueue.items[0].ts() } if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { nextTs = q.limitQueue.items[0].ts() } if q.lastSchedule < now && q.lastSchedule > nextTs { q.clock.runAt(nextTs, func() { q.scheduleRequest(false) }) q.lastSchedule = nextTs } } func (q *MClock) scheduleByLimitAndWeight(now float64) { for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { ready := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) } for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 { next := heap.Pop(q.readyQueue).(*readyMQueueItem) hadReservation := false if next.r.reservationIdx != invalidIndex { hadReservation = true heap.Remove(q.reservationQueue, next.r.reservationIdx) } q.removeFromQueues(next.r) tagInfo, ok := q.tagInfo[next.r.tag] if !ok { panic("unknown tag: " + next.r.tag) // must be checked on top level } if tagInfo.ReservedIOPS != nil && hadReservation { var updated bool for _, i := range q.reservationQueue.items { ri := i.(*reservationMQueueItem) if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation { ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS updated = true } } if updated { heap.Init(q.reservationQueue) } } select { case <-next.r.canceled: continue default: } assertIndexInvalid(next.r) q.inProgress++ close(next.r.scheduled) } } func (q *MClock) scheduleByReservation(now float64) { for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) q.removeFromQueues(next.r) select { case <-next.r.canceled: continue default: } assertIndexInvalid(next.r) q.inProgress++ close(next.r.scheduled) } } func (q *MClock) removeFromQueues(r *request) { if r.limitIdx != invalidIndex { heap.Remove(q.limitQueue, r.limitIdx) } if r.sharesIdx != invalidIndex { heap.Remove(q.sharesQueue, r.sharesIdx) } if r.readyIdx != invalidIndex { heap.Remove(q.readyQueue, r.readyIdx) } if r.reservationIdx != invalidIndex { heap.Remove(q.reservationQueue, r.reservationIdx) } } func (q *MClock) requestCompleted() { q.mtx.Lock() defer q.mtx.Unlock() if q.closed { return } if q.inProgress == 0 { panic("invalid requests count") } q.inProgress-- q.scheduleRequest(true) } func assertIndexInvalid(r *request) { if r.limitIdx != invalidIndex { panic("limitIdx is not -1") } if r.sharesIdx != invalidIndex { panic("sharesIdx is not -1") } if r.reservationIdx != invalidIndex { panic("reservationIdx is not -1") } if r.readyIdx != invalidIndex { panic("readyIdx is not -1") } } // Len implements heap.Interface. func (q *queue) Len() int { return len(q.items) } // Less implements heap.Interface. func (q *queue) Less(i int, j int) bool { return q.items[i].ts() < q.items[j].ts() } // Pop implements heap.Interface. func (q *queue) Pop() any { n := len(q.items) item := q.items[n-1] q.items[n-1] = nil q.items = q.items[0 : n-1] item.setIndex(invalidIndex) return item } // Push implements heap.Interface. func (q *queue) Push(x any) { it := x.(queueItem) it.setIndex(q.Len()) q.items = append(q.items, it) } // Swap implements heap.Interface. func (q *queue) Swap(i int, j int) { q.items[i], q.items[j] = q.items[j], q.items[i] q.items[i].setIndex(i) q.items[j].setIndex(j) } var _ queueItem = &reservationMQueueItem{} type reservationMQueueItem struct { r *request } func (i *reservationMQueueItem) ts() float64 { return i.r.reservation } func (i *reservationMQueueItem) setIndex(idx int) { i.r.reservationIdx = idx } var _ queueItem = &limitMQueueItem{} type limitMQueueItem struct { r *request } func (i *limitMQueueItem) ts() float64 { return i.r.limit } func (i *limitMQueueItem) setIndex(idx int) { i.r.limitIdx = idx } var _ queueItem = &sharesMQueueItem{} type sharesMQueueItem struct { r *request } func (i *sharesMQueueItem) ts() float64 { return i.r.shares } func (i *sharesMQueueItem) setIndex(idx int) { i.r.sharesIdx = idx } var _ queueItem = &readyMQueueItem{} type readyMQueueItem struct { r *request } func (i *readyMQueueItem) ts() float64 { return i.r.shares } func (i *readyMQueueItem) setIndex(idx int) { i.r.readyIdx = idx } type scheduleInfo struct { ts float64 f func() } type systemClock struct { since time.Time schedule chan scheduleInfo wg sync.WaitGroup } func newSystemClock() *systemClock { c := &systemClock{ since: time.Now(), schedule: make(chan scheduleInfo), } c.start() return c } func (c *systemClock) now() float64 { return time.Since(c.since).Seconds() } func (c *systemClock) runAt(ts float64, f func()) { c.schedule <- scheduleInfo{ts: ts, f: f} } func (c *systemClock) close() { close(c.schedule) c.wg.Wait() } func (c *systemClock) start() { c.wg.Add(1) go func() { defer c.wg.Done() t := time.NewTimer(time.Hour) var f func() for { select { case <-t.C: if f != nil { f() f = nil } t.Reset(time.Hour) case s, ok := <-c.schedule: if !ok { return } now := c.now() if now >= s.ts { s.f() f = nil continue } if !t.Stop() { select { case <-t.C: default: } } t.Reset(time.Duration((s.ts - now) * 1e9)) f = s.f } } }() }