581 lines
13 KiB
Go
581 lines
13 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"errors"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
invalidIndex = -1
|
|
undefinedReservation float64 = -1.0
|
|
)
|
|
|
|
var (
|
|
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
|
|
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
|
|
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
|
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
|
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
|
)
|
|
|
|
type queueItem interface {
|
|
ts() float64
|
|
setIndex(idx int)
|
|
}
|
|
|
|
type queue struct {
|
|
items []queueItem
|
|
}
|
|
|
|
type request struct {
|
|
tag string
|
|
ts float64
|
|
|
|
reservation float64
|
|
limit float64
|
|
shares float64
|
|
|
|
reservationIdx int
|
|
limitIdx int
|
|
sharesIdx int
|
|
readyIdx int
|
|
|
|
scheduled chan struct{}
|
|
canceled chan struct{}
|
|
}
|
|
|
|
type clock interface {
|
|
now() float64
|
|
runAt(ts float64, f func())
|
|
close()
|
|
}
|
|
|
|
// ReleaseFunc is the type of function that should be called after the request is completed.
|
|
type ReleaseFunc func()
|
|
|
|
// TagInfo contains Reservation, Limit and Shares values for a tag.
|
|
type TagInfo struct {
|
|
Reservation *float64
|
|
Limit *float64
|
|
Shares float64
|
|
}
|
|
|
|
// MClock is mClock scheduling algorithm implementation.
|
|
//
|
|
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
|
|
type MClock struct {
|
|
runLimit uint64
|
|
waitLimit int
|
|
clock clock
|
|
idleTimeout float64
|
|
tagInfo map[string]TagInfo
|
|
|
|
mtx sync.Mutex
|
|
previous map[string]*request
|
|
inProgress uint64
|
|
lastSchedule float64
|
|
reservationQueue *queue
|
|
limitQueue *queue
|
|
sharesQueue *queue
|
|
readyQueue *queue
|
|
closed bool
|
|
}
|
|
|
|
// NewMClock creates new MClock scheduler instance with
|
|
// runLimit maximum allowed count of running requests and
|
|
// waitLimit maximum allowed count of waiting requests
|
|
// for tags specified by tagInfo. The value of idleTimeout defines
|
|
// the difference between the current time and the time of
|
|
// the previous request in seconds, at which the tag considered idle.
|
|
// If idleTimeout is negative, it means that there is no idle tags allowed.
|
|
// If waitLimit equals zero, it means that there is no limit on the
|
|
// number of waiting requests.
|
|
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout float64) (*MClock, error) {
|
|
if err := validateParams(runLimit, tagInfo); err != nil {
|
|
return nil, err
|
|
}
|
|
result := &MClock{
|
|
runLimit: runLimit,
|
|
waitLimit: int(waitLimit),
|
|
clock: newSystemClock(),
|
|
idleTimeout: idleTimeout,
|
|
tagInfo: tagInfo,
|
|
|
|
reservationQueue: &queue{},
|
|
limitQueue: &queue{},
|
|
sharesQueue: &queue{},
|
|
readyQueue: &queue{},
|
|
}
|
|
|
|
previous := make(map[string]*request)
|
|
for tag := range tagInfo {
|
|
previous[tag] = &request{
|
|
tag: tag,
|
|
reservationIdx: invalidIndex,
|
|
limitIdx: invalidIndex,
|
|
sharesIdx: invalidIndex,
|
|
}
|
|
}
|
|
result.previous = previous
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// RequestArrival schedules new request with tag value.
|
|
// Method call is blocked until one of the following events occurs:
|
|
// request with the tag is scheduled for execution,
|
|
// context ctx is canceled or the scheduler is closed.
|
|
// If the method call returned non-nil ReleaseFunc,
|
|
// then it should be called after the request is completed.
|
|
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
|
|
req, release, err := q.pushRequest(tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
q.dropRequest(req)
|
|
return nil, ctx.Err()
|
|
case <-req.scheduled:
|
|
return release, nil
|
|
case <-req.canceled:
|
|
return nil, ErrMClockSchedulerClosed
|
|
}
|
|
}
|
|
|
|
// Close closes MClock scheduler.
|
|
// No new requests for scheduling will be accepted after the closing.
|
|
func (q *MClock) Close() {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
q.closed = true
|
|
q.clock.close()
|
|
for q.limitQueue.Len() > 0 {
|
|
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
|
close(item.r.canceled)
|
|
q.removeFromQueues(item.r)
|
|
}
|
|
}
|
|
|
|
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
|
if runLimit == 0 {
|
|
return ErrInvalidRunLimit
|
|
}
|
|
for _, v := range tagInfo {
|
|
if v.Limit != nil && (math.IsNaN(*v.Limit) || *v.Limit <= float64(0)) {
|
|
return ErrInvalidTagInfo
|
|
}
|
|
if v.Reservation != nil && (math.IsNaN(*v.Reservation) || *v.Reservation <= float64(0)) {
|
|
return ErrInvalidTagInfo
|
|
}
|
|
if math.IsNaN(v.Shares) || v.Shares <= float64(0) {
|
|
return ErrInvalidTagInfo
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *MClock) dropRequest(req *request) {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
select {
|
|
case <-req.scheduled:
|
|
if q.inProgress == 0 {
|
|
panic("invalid requests count")
|
|
}
|
|
q.inProgress--
|
|
default:
|
|
}
|
|
|
|
q.removeFromQueues(req)
|
|
}
|
|
|
|
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
if q.closed {
|
|
return nil, nil, ErrMClockSchedulerClosed
|
|
}
|
|
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
|
|
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
|
|
}
|
|
|
|
now := q.clock.now()
|
|
tagInfo, ok := q.tagInfo[tag]
|
|
if !ok {
|
|
return nil, nil, ErrMClockSchedulerUnknownTag
|
|
}
|
|
prev, ok := q.previous[tag]
|
|
if !ok {
|
|
panic("undefined previous: " + tag)
|
|
}
|
|
|
|
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
|
|
q.adjustTags(now, tag)
|
|
}
|
|
|
|
r := &request{
|
|
tag: tag,
|
|
ts: now,
|
|
shares: max(prev.shares+1.0/tagInfo.Shares, now),
|
|
reservationIdx: invalidIndex,
|
|
limitIdx: invalidIndex,
|
|
sharesIdx: invalidIndex,
|
|
readyIdx: invalidIndex,
|
|
scheduled: make(chan struct{}),
|
|
canceled: make(chan struct{}),
|
|
}
|
|
if tagInfo.Reservation != nil {
|
|
r.reservation = max(prev.reservation + 1.0 / *tagInfo.Reservation, now)
|
|
} else {
|
|
r.reservation = undefinedReservation
|
|
}
|
|
|
|
if tagInfo.Limit != nil {
|
|
r.limit = max(prev.limit + 1.0 / *tagInfo.Limit, now)
|
|
} else {
|
|
r.limit = max(prev.limit, now)
|
|
}
|
|
|
|
q.previous[tag] = r
|
|
if tagInfo.Reservation != nil {
|
|
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
|
|
}
|
|
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
|
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
|
q.scheduleRequest(true)
|
|
|
|
return r, q.requestCompleted, nil
|
|
}
|
|
|
|
func (q *MClock) adjustTags(now float64, idleTag string) {
|
|
if q.sharesQueue.Len() == 0 {
|
|
return
|
|
}
|
|
minShare := q.sharesQueue.items[0].ts()
|
|
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
|
|
limitItem := (item).(*limitMQueueItem)
|
|
if limitItem.r.tag == idleTag {
|
|
continue
|
|
}
|
|
limitItem.r.shares -= (minShare - now)
|
|
if limitItem.r.sharesIdx != invalidIndex {
|
|
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
|
|
}
|
|
if limitItem.r.readyIdx != invalidIndex {
|
|
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *MClock) scheduleRequest(lockTaken bool) {
|
|
if !lockTaken {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
}
|
|
|
|
if q.inProgress >= q.runLimit {
|
|
return
|
|
}
|
|
now := q.clock.now()
|
|
q.scheduleByReservation(now)
|
|
if q.inProgress >= q.runLimit {
|
|
return
|
|
}
|
|
q.scheduleByLimitAndWeight(now)
|
|
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
|
|
return
|
|
}
|
|
q.setNextScheduleTimer(now)
|
|
}
|
|
|
|
func (q *MClock) setNextScheduleTimer(now float64) {
|
|
nextTs := math.MaxFloat64
|
|
if q.reservationQueue.Len() > 0 {
|
|
nextTs = q.reservationQueue.items[0].ts()
|
|
}
|
|
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
|
nextTs = q.limitQueue.items[0].ts()
|
|
}
|
|
|
|
if q.lastSchedule < now && q.lastSchedule > nextTs {
|
|
q.clock.runAt(nextTs, func() {
|
|
q.scheduleRequest(false)
|
|
})
|
|
q.lastSchedule = nextTs
|
|
}
|
|
}
|
|
|
|
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
|
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
|
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
|
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
|
}
|
|
|
|
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
|
|
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
|
|
hadReservation := false
|
|
if next.r.reservationIdx != invalidIndex {
|
|
hadReservation = true
|
|
heap.Remove(q.reservationQueue, next.r.reservationIdx)
|
|
}
|
|
q.removeFromQueues(next.r)
|
|
|
|
tagInfo, ok := q.tagInfo[next.r.tag]
|
|
if !ok {
|
|
panic("unknown tag: " + next.r.tag) // must be checked on top level
|
|
}
|
|
if tagInfo.Reservation != nil && hadReservation {
|
|
var updated bool
|
|
for _, i := range q.reservationQueue.items {
|
|
ri := i.(*reservationMQueueItem)
|
|
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
|
|
ri.r.reservation -= 1.0 / *tagInfo.Reservation
|
|
updated = true
|
|
}
|
|
}
|
|
if updated {
|
|
heap.Init(q.reservationQueue)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-next.r.canceled:
|
|
continue
|
|
default:
|
|
}
|
|
|
|
assertIndexInvalid(next.r)
|
|
q.inProgress++
|
|
close(next.r.scheduled)
|
|
}
|
|
}
|
|
|
|
func (q *MClock) scheduleByReservation(now float64) {
|
|
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
|
q.removeFromQueues(next.r)
|
|
|
|
select {
|
|
case <-next.r.canceled:
|
|
continue
|
|
default:
|
|
}
|
|
|
|
assertIndexInvalid(next.r)
|
|
q.inProgress++
|
|
close(next.r.scheduled)
|
|
}
|
|
}
|
|
|
|
func (q *MClock) removeFromQueues(r *request) {
|
|
if r.limitIdx != invalidIndex {
|
|
heap.Remove(q.limitQueue, r.limitIdx)
|
|
}
|
|
if r.sharesIdx != invalidIndex {
|
|
heap.Remove(q.sharesQueue, r.sharesIdx)
|
|
}
|
|
if r.readyIdx != invalidIndex {
|
|
heap.Remove(q.readyQueue, r.readyIdx)
|
|
}
|
|
if r.reservationIdx != invalidIndex {
|
|
heap.Remove(q.reservationQueue, r.reservationIdx)
|
|
}
|
|
}
|
|
|
|
func (q *MClock) requestCompleted() {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
if q.closed {
|
|
return
|
|
}
|
|
|
|
if q.inProgress == 0 {
|
|
panic("invalid requests count")
|
|
}
|
|
q.inProgress--
|
|
q.scheduleRequest(true)
|
|
}
|
|
|
|
func assertIndexInvalid(r *request) {
|
|
if r.limitIdx != invalidIndex {
|
|
panic("limitIdx is not -1")
|
|
}
|
|
if r.sharesIdx != invalidIndex {
|
|
panic("sharesIdx is not -1")
|
|
}
|
|
if r.reservationIdx != invalidIndex {
|
|
panic("reservationIdx is not -1")
|
|
}
|
|
if r.readyIdx != invalidIndex {
|
|
panic("readyIdx is not -1")
|
|
}
|
|
}
|
|
|
|
// Len implements heap.Interface.
|
|
func (q *queue) Len() int {
|
|
return len(q.items)
|
|
}
|
|
|
|
// Less implements heap.Interface.
|
|
func (q *queue) Less(i int, j int) bool {
|
|
return q.items[i].ts() < q.items[j].ts()
|
|
}
|
|
|
|
// Pop implements heap.Interface.
|
|
func (q *queue) Pop() any {
|
|
n := len(q.items)
|
|
item := q.items[n-1]
|
|
q.items[n-1] = nil
|
|
q.items = q.items[0 : n-1]
|
|
item.setIndex(invalidIndex)
|
|
return item
|
|
}
|
|
|
|
// Push implements heap.Interface.
|
|
func (q *queue) Push(x any) {
|
|
it := x.(queueItem)
|
|
it.setIndex(q.Len())
|
|
q.items = append(q.items, it)
|
|
}
|
|
|
|
// Swap implements heap.Interface.
|
|
func (q *queue) Swap(i int, j int) {
|
|
q.items[i], q.items[j] = q.items[j], q.items[i]
|
|
q.items[i].setIndex(i)
|
|
q.items[j].setIndex(j)
|
|
}
|
|
|
|
var _ queueItem = &reservationMQueueItem{}
|
|
|
|
type reservationMQueueItem struct {
|
|
r *request
|
|
}
|
|
|
|
func (i *reservationMQueueItem) ts() float64 {
|
|
return i.r.reservation
|
|
}
|
|
|
|
func (i *reservationMQueueItem) setIndex(idx int) {
|
|
i.r.reservationIdx = idx
|
|
}
|
|
|
|
var _ queueItem = &limitMQueueItem{}
|
|
|
|
type limitMQueueItem struct {
|
|
r *request
|
|
}
|
|
|
|
func (i *limitMQueueItem) ts() float64 {
|
|
return i.r.limit
|
|
}
|
|
|
|
func (i *limitMQueueItem) setIndex(idx int) {
|
|
i.r.limitIdx = idx
|
|
}
|
|
|
|
var _ queueItem = &sharesMQueueItem{}
|
|
|
|
type sharesMQueueItem struct {
|
|
r *request
|
|
}
|
|
|
|
func (i *sharesMQueueItem) ts() float64 {
|
|
return i.r.shares
|
|
}
|
|
|
|
func (i *sharesMQueueItem) setIndex(idx int) {
|
|
i.r.sharesIdx = idx
|
|
}
|
|
|
|
var _ queueItem = &readyMQueueItem{}
|
|
|
|
type readyMQueueItem struct {
|
|
r *request
|
|
}
|
|
|
|
func (i *readyMQueueItem) ts() float64 {
|
|
return i.r.shares
|
|
}
|
|
|
|
func (i *readyMQueueItem) setIndex(idx int) {
|
|
i.r.readyIdx = idx
|
|
}
|
|
|
|
type scheduleInfo struct {
|
|
ts float64
|
|
f func()
|
|
}
|
|
|
|
type systemClock struct {
|
|
since time.Time
|
|
schedule chan scheduleInfo
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newSystemClock() *systemClock {
|
|
c := &systemClock{
|
|
since: time.Now(),
|
|
schedule: make(chan scheduleInfo),
|
|
}
|
|
c.start()
|
|
return c
|
|
}
|
|
|
|
func (c *systemClock) now() float64 {
|
|
return time.Since(c.since).Seconds()
|
|
}
|
|
|
|
func (c *systemClock) runAt(ts float64, f func()) {
|
|
c.schedule <- scheduleInfo{ts: ts, f: f}
|
|
}
|
|
|
|
func (c *systemClock) close() {
|
|
close(c.schedule)
|
|
c.wg.Wait()
|
|
}
|
|
|
|
func (c *systemClock) start() {
|
|
c.wg.Add(1)
|
|
go func() {
|
|
defer c.wg.Done()
|
|
t := time.NewTimer(time.Hour)
|
|
var f func()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
if f != nil {
|
|
f()
|
|
f = nil
|
|
}
|
|
t.Reset(time.Hour)
|
|
case s, ok := <-c.schedule:
|
|
if !ok {
|
|
return
|
|
}
|
|
now := c.now()
|
|
if now >= s.ts {
|
|
s.f()
|
|
f = nil
|
|
continue
|
|
}
|
|
if !t.Stop() {
|
|
select {
|
|
case <-t.C:
|
|
default:
|
|
}
|
|
}
|
|
t.Reset(time.Duration((s.ts - now) * 1e9))
|
|
f = s.f
|
|
}
|
|
}
|
|
}()
|
|
}
|