package scheduling import ( "container/heap" "context" "errors" "math" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-qos/internal/assert" ) const ( invalidIndex = -1 undefinedReservation float64 = -1.0 ) var ( ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed") ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded") ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") ) type request struct { tag string ts float64 reservation float64 limit float64 shares float64 reservationIdx int limitIdx int sharesIdx int readyIdx int scheduled chan struct{} canceled chan struct{} } // ReleaseFunc is the type of function that should be called after the request is completed. type ReleaseFunc func() // TagInfo contains reserved IOPS, IOPS limit and share values for a tag. type TagInfo struct { ReservedIOPS *float64 LimitIOPS *float64 Share float64 } // MClock is mClock scheduling algorithm implementation. // // See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details. type MClock struct { runLimit uint64 waitLimit int clock clock idleTimeout float64 tagInfo map[string]TagInfo mtx sync.Mutex previous map[string]*request inProgress uint64 timeBasedScheduleTs float64 reservationQueue *queue limitQueue *queue sharesQueue *queue readyQueue *queue closed bool } // NewMClock creates new MClock scheduler instance with // runLimit maximum allowed count of running requests and // waitLimit maximum allowed count of waiting requests // for tags specified by tagInfo. The value of idleTimeout defines // the difference between the current time and the time of // the previous request, at which the tag considered idle. // If idleTimeout is negative, it means that there is no idle tags allowed. // If waitLimit equals zero, it means that there is no limit on the // number of waiting requests. func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout time.Duration) (*MClock, error) { if err := validateParams(runLimit, tagInfo); err != nil { return nil, err } result := &MClock{ runLimit: runLimit, waitLimit: int(waitLimit), clock: newSystemClock(), idleTimeout: idleTimeout.Seconds(), tagInfo: tagInfo, reservationQueue: &queue{}, limitQueue: &queue{}, sharesQueue: &queue{}, readyQueue: &queue{}, timeBasedScheduleTs: math.MaxFloat64, } previous := make(map[string]*request) for tag := range tagInfo { previous[tag] = &request{ tag: tag, reservationIdx: invalidIndex, limitIdx: invalidIndex, sharesIdx: invalidIndex, } } result.previous = previous return result, nil } // RequestArrival schedules new request with tag value. // Method call is blocked until one of the following events occurs: // request with the tag is scheduled for execution, // context ctx is canceled or the scheduler is closed. // If the method call returned non-nil ReleaseFunc, // then it must be called after the request is completed. func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) { req, release, err := q.pushRequest(tag) if err != nil { return nil, err } select { case <-ctx.Done(): q.dropRequest(req) return nil, ctx.Err() case <-req.scheduled: return release, nil case <-req.canceled: return nil, ErrMClockSchedulerClosed } } // Close closes MClock scheduler. // No new requests for scheduling will be accepted after the closing. func (q *MClock) Close() { q.mtx.Lock() defer q.mtx.Unlock() q.closed = true q.clock.close() for q.limitQueue.Len() > 0 { item := heap.Pop(q.limitQueue).(*limitMQueueItem) close(item.r.canceled) q.removeFromQueues(item.r) } } func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { if runLimit == 0 { return ErrInvalidRunLimit } for _, v := range tagInfo { if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) { return ErrInvalidTagInfo } if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) { return ErrInvalidTagInfo } if math.IsNaN(v.Share) || v.Share <= float64(0) { return ErrInvalidTagInfo } } return nil } func (q *MClock) dropRequest(req *request) { q.mtx.Lock() defer q.mtx.Unlock() select { case <-req.scheduled: assert.Cond(q.inProgress > 0, "invalid requests count") q.inProgress-- default: } q.removeFromQueues(req) } func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { q.mtx.Lock() defer q.mtx.Unlock() if q.closed { return nil, nil, ErrMClockSchedulerClosed } if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit { return nil, nil, ErrMClockSchedulerRequestLimitExceeded } now := q.clock.now() tagInfo, ok := q.tagInfo[tag] if !ok { return nil, nil, ErrMClockSchedulerUnknownTag } prev, ok := q.previous[tag] assert.Cond(ok, "undefined previous:", tag) if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout q.adjustTags(now, tag) } r := &request{ tag: tag, ts: now, shares: max(prev.shares+1.0/tagInfo.Share, now), reservationIdx: invalidIndex, limitIdx: invalidIndex, sharesIdx: invalidIndex, readyIdx: invalidIndex, scheduled: make(chan struct{}), canceled: make(chan struct{}), } if tagInfo.ReservedIOPS != nil { r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now) } else { r.reservation = undefinedReservation } if tagInfo.LimitIOPS != nil { r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now) } else { r.limit = max(prev.limit, now) } q.previous[tag] = r if tagInfo.ReservedIOPS != nil { heap.Push(q.reservationQueue, &reservationMQueueItem{r: r}) } heap.Push(q.sharesQueue, &sharesMQueueItem{r: r}) heap.Push(q.limitQueue, &limitMQueueItem{r: r}) q.scheduleRequestUnsafe() return r, q.requestCompleted, nil } func (q *MClock) adjustTags(now float64, idleTag string) { if q.sharesQueue.Len() == 0 { return } minShare := q.sharesQueue.items[0].ts() for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed limitItem := item.(*limitMQueueItem) if limitItem.r.tag == idleTag { continue } limitItem.r.shares -= (minShare - now) if limitItem.r.sharesIdx != invalidIndex { heap.Fix(q.sharesQueue, limitItem.r.sharesIdx) } if limitItem.r.readyIdx != invalidIndex { heap.Fix(q.readyQueue, limitItem.r.readyIdx) } } } func (q *MClock) scheduleRequest() { q.mtx.Lock() defer q.mtx.Unlock() q.scheduleRequestUnsafe() } func (q *MClock) scheduleRequestUnsafe() { if q.inProgress >= q.runLimit { return } now := q.clock.now() q.scheduleByReservation(now) if q.inProgress >= q.runLimit { return } q.scheduleByLimitAndWeight(now) if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) { return } q.setNextScheduleTimer(now) } func (q *MClock) setNextScheduleTimer(now float64) { nextTs := math.MaxFloat64 if q.reservationQueue.Len() > 0 { nextTs = q.reservationQueue.items[0].ts() } if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { nextTs = q.limitQueue.items[0].ts() } if nextTs <= now { // should not happen as we always compare .ts() <= now return } if q.timeBasedScheduleTs > nextTs { q.clock.runAt(nextTs, func() { q.scheduleRequest() }) q.timeBasedScheduleTs = nextTs } } func (q *MClock) scheduleByLimitAndWeight(now float64) { for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { ready := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) } for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 { next := heap.Pop(q.readyQueue).(*readyMQueueItem) hadReservation := false if next.r.reservationIdx != invalidIndex { hadReservation = true heap.Remove(q.reservationQueue, next.r.reservationIdx) } q.removeFromQueues(next.r) tagInfo, ok := q.tagInfo[next.r.tag] assert.Cond(ok, "unknown tag:", next.r.tag) if tagInfo.ReservedIOPS != nil && hadReservation { var updated bool for _, i := range q.reservationQueue.items { ri := i.(*reservationMQueueItem) if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation { ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS updated = true } } if updated { heap.Init(q.reservationQueue) } } select { case <-next.r.canceled: continue default: } assertIndexInvalid(next.r) q.inProgress++ close(next.r.scheduled) } } func (q *MClock) scheduleByReservation(now float64) { for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) q.removeFromQueues(next.r) select { case <-next.r.canceled: continue default: } assertIndexInvalid(next.r) q.inProgress++ close(next.r.scheduled) } } func (q *MClock) removeFromQueues(r *request) { if r.limitIdx != invalidIndex { heap.Remove(q.limitQueue, r.limitIdx) } if r.sharesIdx != invalidIndex { heap.Remove(q.sharesQueue, r.sharesIdx) } if r.readyIdx != invalidIndex { heap.Remove(q.readyQueue, r.readyIdx) } if r.reservationIdx != invalidIndex { heap.Remove(q.reservationQueue, r.reservationIdx) } } func (q *MClock) requestCompleted() { q.mtx.Lock() defer q.mtx.Unlock() if q.closed { return } assert.Cond(q.inProgress > 0, "invalid requests count") q.inProgress-- q.scheduleRequestUnsafe() } func assertIndexInvalid(r *request) { assert.Cond(r.limitIdx == invalidIndex, "limitIdx is not -1") assert.Cond(r.sharesIdx == invalidIndex, "sharesIdx is not -1") assert.Cond(r.reservationIdx == invalidIndex, "reservationIdx is not -1") assert.Cond(r.readyIdx == invalidIndex, "readyIdx is not -1") }