[#1] mclock: Initial implementation
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
eca6765dda
commit
0dccab22c2
6 changed files with 1322 additions and 0 deletions
581
scheduling/mclock.go
Normal file
581
scheduling/mclock.go
Normal file
|
@ -0,0 +1,581 @@
|
|||
package scheduling
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
invalidIndex = -1
|
||||
undefinedReservation float64 = -1.0
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMClockSchedulerClosed = errors.New("mClock scheduler is closed")
|
||||
ErrMClockSchedulerRequestLimitExceeded = errors.New("mClock scheduler request limit exceeded")
|
||||
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
||||
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
||||
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
||||
)
|
||||
|
||||
type queueItem interface {
|
||||
ts() float64
|
||||
setIndex(idx int)
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
items []queueItem
|
||||
}
|
||||
|
||||
type request struct {
|
||||
tag string
|
||||
ts float64
|
||||
|
||||
reservation float64
|
||||
limit float64
|
||||
shares float64
|
||||
|
||||
reservationIdx int
|
||||
limitIdx int
|
||||
sharesIdx int
|
||||
readyIdx int
|
||||
|
||||
scheduled chan struct{}
|
||||
canceled chan struct{}
|
||||
}
|
||||
|
||||
type clock interface {
|
||||
now() float64
|
||||
runAt(ts float64, f func())
|
||||
close()
|
||||
}
|
||||
|
||||
// ReleaseFunc is the type of function that should be called after the request is completed.
|
||||
type ReleaseFunc func()
|
||||
|
||||
// TagInfo contains reserved IOPS, IOPS limit and share values for a tag.
|
||||
type TagInfo struct {
|
||||
ReservedIOPS *float64
|
||||
LimitIOPS *float64
|
||||
Share float64
|
||||
}
|
||||
|
||||
// MClock is mClock scheduling algorithm implementation.
|
||||
//
|
||||
// See https://www.usenix.org/legacy/event/osdi10/tech/full_papers/Gulati.pdf for details.
|
||||
type MClock struct {
|
||||
runLimit uint64
|
||||
waitLimit int
|
||||
clock clock
|
||||
idleTimeout float64
|
||||
tagInfo map[string]TagInfo
|
||||
|
||||
mtx sync.Mutex
|
||||
previous map[string]*request
|
||||
inProgress uint64
|
||||
lastSchedule float64
|
||||
reservationQueue *queue
|
||||
limitQueue *queue
|
||||
sharesQueue *queue
|
||||
readyQueue *queue
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewMClock creates new MClock scheduler instance with
|
||||
// runLimit maximum allowed count of running requests and
|
||||
// waitLimit maximum allowed count of waiting requests
|
||||
// for tags specified by tagInfo. The value of idleTimeout defines
|
||||
// the difference between the current time and the time of
|
||||
// the previous request in seconds, at which the tag considered idle.
|
||||
// If idleTimeout is negative, it means that there is no idle tags allowed.
|
||||
// If waitLimit equals zero, it means that there is no limit on the
|
||||
// number of waiting requests.
|
||||
func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeout float64) (*MClock, error) {
|
||||
if err := validateParams(runLimit, tagInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result := &MClock{
|
||||
runLimit: runLimit,
|
||||
waitLimit: int(waitLimit),
|
||||
clock: newSystemClock(),
|
||||
idleTimeout: idleTimeout,
|
||||
tagInfo: tagInfo,
|
||||
|
||||
reservationQueue: &queue{},
|
||||
limitQueue: &queue{},
|
||||
sharesQueue: &queue{},
|
||||
readyQueue: &queue{},
|
||||
}
|
||||
|
||||
previous := make(map[string]*request)
|
||||
for tag := range tagInfo {
|
||||
previous[tag] = &request{
|
||||
tag: tag,
|
||||
reservationIdx: invalidIndex,
|
||||
limitIdx: invalidIndex,
|
||||
sharesIdx: invalidIndex,
|
||||
}
|
||||
}
|
||||
result.previous = previous
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// RequestArrival schedules new request with tag value.
|
||||
// Method call is blocked until one of the following events occurs:
|
||||
// request with the tag is scheduled for execution,
|
||||
// context ctx is canceled or the scheduler is closed.
|
||||
// If the method call returned non-nil ReleaseFunc,
|
||||
// then it must be called after the request is completed.
|
||||
func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, error) {
|
||||
req, release, err := q.pushRequest(tag)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
q.dropRequest(req)
|
||||
return nil, ctx.Err()
|
||||
case <-req.scheduled:
|
||||
return release, nil
|
||||
case <-req.canceled:
|
||||
return nil, ErrMClockSchedulerClosed
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes MClock scheduler.
|
||||
// No new requests for scheduling will be accepted after the closing.
|
||||
func (q *MClock) Close() {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
q.closed = true
|
||||
q.clock.close()
|
||||
for q.limitQueue.Len() > 0 {
|
||||
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||
close(item.r.canceled)
|
||||
q.removeFromQueues(item.r)
|
||||
}
|
||||
}
|
||||
|
||||
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
||||
if runLimit == 0 {
|
||||
return ErrInvalidRunLimit
|
||||
}
|
||||
for _, v := range tagInfo {
|
||||
if v.LimitIOPS != nil && (math.IsNaN(*v.LimitIOPS) || *v.LimitIOPS <= float64(0)) {
|
||||
return ErrInvalidTagInfo
|
||||
}
|
||||
if v.ReservedIOPS != nil && (math.IsNaN(*v.ReservedIOPS) || *v.ReservedIOPS <= float64(0)) {
|
||||
return ErrInvalidTagInfo
|
||||
}
|
||||
if math.IsNaN(v.Share) || v.Share <= float64(0) {
|
||||
return ErrInvalidTagInfo
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *MClock) dropRequest(req *request) {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
select {
|
||||
case <-req.scheduled:
|
||||
if q.inProgress == 0 {
|
||||
panic("invalid requests count")
|
||||
}
|
||||
q.inProgress--
|
||||
default:
|
||||
}
|
||||
|
||||
q.removeFromQueues(req)
|
||||
}
|
||||
|
||||
func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
if q.closed {
|
||||
return nil, nil, ErrMClockSchedulerClosed
|
||||
}
|
||||
if q.waitLimit > 0 && q.sharesQueue.Len() == q.waitLimit {
|
||||
return nil, nil, ErrMClockSchedulerRequestLimitExceeded
|
||||
}
|
||||
|
||||
now := q.clock.now()
|
||||
tagInfo, ok := q.tagInfo[tag]
|
||||
if !ok {
|
||||
return nil, nil, ErrMClockSchedulerUnknownTag
|
||||
}
|
||||
prev, ok := q.previous[tag]
|
||||
if !ok {
|
||||
panic("undefined previous: " + tag)
|
||||
}
|
||||
|
||||
if q.idleTimeout >= 0 && now-prev.ts > q.idleTimeout { // was inactive for q.idleTimeout
|
||||
q.adjustTags(now, tag)
|
||||
}
|
||||
|
||||
r := &request{
|
||||
tag: tag,
|
||||
ts: now,
|
||||
shares: max(prev.shares+1.0/tagInfo.Share, now),
|
||||
reservationIdx: invalidIndex,
|
||||
limitIdx: invalidIndex,
|
||||
sharesIdx: invalidIndex,
|
||||
readyIdx: invalidIndex,
|
||||
scheduled: make(chan struct{}),
|
||||
canceled: make(chan struct{}),
|
||||
}
|
||||
if tagInfo.ReservedIOPS != nil {
|
||||
r.reservation = max(prev.reservation + 1.0 / *tagInfo.ReservedIOPS, now)
|
||||
} else {
|
||||
r.reservation = undefinedReservation
|
||||
}
|
||||
|
||||
if tagInfo.LimitIOPS != nil {
|
||||
r.limit = max(prev.limit + 1.0 / *tagInfo.LimitIOPS, now)
|
||||
} else {
|
||||
r.limit = max(prev.limit, now)
|
||||
}
|
||||
|
||||
q.previous[tag] = r
|
||||
if tagInfo.ReservedIOPS != nil {
|
||||
heap.Push(q.reservationQueue, &reservationMQueueItem{r: r})
|
||||
}
|
||||
heap.Push(q.sharesQueue, &sharesMQueueItem{r: r})
|
||||
heap.Push(q.limitQueue, &limitMQueueItem{r: r})
|
||||
q.scheduleRequest(true)
|
||||
|
||||
return r, q.requestCompleted, nil
|
||||
}
|
||||
|
||||
func (q *MClock) adjustTags(now float64, idleTag string) {
|
||||
if q.sharesQueue.Len() == 0 {
|
||||
return
|
||||
}
|
||||
minShare := q.sharesQueue.items[0].ts()
|
||||
for _, item := range q.limitQueue.items { // limitQueue has all requests and sharesQueue may be fixed
|
||||
limitItem := item.(*limitMQueueItem)
|
||||
if limitItem.r.tag == idleTag {
|
||||
continue
|
||||
}
|
||||
limitItem.r.shares -= (minShare - now)
|
||||
if limitItem.r.sharesIdx != invalidIndex {
|
||||
heap.Fix(q.sharesQueue, limitItem.r.sharesIdx)
|
||||
}
|
||||
if limitItem.r.readyIdx != invalidIndex {
|
||||
heap.Fix(q.readyQueue, limitItem.r.readyIdx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *MClock) scheduleRequest(lockTaken bool) {
|
||||
if !lockTaken {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
}
|
||||
|
||||
if q.inProgress >= q.runLimit {
|
||||
return
|
||||
}
|
||||
now := q.clock.now()
|
||||
q.scheduleByReservation(now)
|
||||
if q.inProgress >= q.runLimit {
|
||||
return
|
||||
}
|
||||
q.scheduleByLimitAndWeight(now)
|
||||
if q.inProgress >= q.runLimit || (q.reservationQueue.Len() == 0 && q.limitQueue.Len() == 0) {
|
||||
return
|
||||
}
|
||||
q.setNextScheduleTimer(now)
|
||||
}
|
||||
|
||||
func (q *MClock) setNextScheduleTimer(now float64) {
|
||||
nextTs := math.MaxFloat64
|
||||
if q.reservationQueue.Len() > 0 {
|
||||
nextTs = q.reservationQueue.items[0].ts()
|
||||
}
|
||||
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
||||
nextTs = q.limitQueue.items[0].ts()
|
||||
}
|
||||
|
||||
if q.lastSchedule < now && q.lastSchedule > nextTs {
|
||||
q.clock.runAt(nextTs, func() {
|
||||
q.scheduleRequest(false)
|
||||
})
|
||||
q.lastSchedule = nextTs
|
||||
}
|
||||
}
|
||||
|
||||
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
||||
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
||||
}
|
||||
|
||||
for q.inProgress < q.runLimit && q.readyQueue.Len() > 0 {
|
||||
next := heap.Pop(q.readyQueue).(*readyMQueueItem)
|
||||
hadReservation := false
|
||||
if next.r.reservationIdx != invalidIndex {
|
||||
hadReservation = true
|
||||
heap.Remove(q.reservationQueue, next.r.reservationIdx)
|
||||
}
|
||||
q.removeFromQueues(next.r)
|
||||
|
||||
tagInfo, ok := q.tagInfo[next.r.tag]
|
||||
if !ok {
|
||||
panic("unknown tag: " + next.r.tag) // must be checked on top level
|
||||
}
|
||||
if tagInfo.ReservedIOPS != nil && hadReservation {
|
||||
var updated bool
|
||||
for _, i := range q.reservationQueue.items {
|
||||
ri := i.(*reservationMQueueItem)
|
||||
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
|
||||
ri.r.reservation -= 1.0 / *tagInfo.ReservedIOPS
|
||||
updated = true
|
||||
}
|
||||
}
|
||||
if updated {
|
||||
heap.Init(q.reservationQueue)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-next.r.canceled:
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
assertIndexInvalid(next.r)
|
||||
q.inProgress++
|
||||
close(next.r.scheduled)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *MClock) scheduleByReservation(now float64) {
|
||||
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
||||
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||
q.removeFromQueues(next.r)
|
||||
|
||||
select {
|
||||
case <-next.r.canceled:
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
assertIndexInvalid(next.r)
|
||||
q.inProgress++
|
||||
close(next.r.scheduled)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *MClock) removeFromQueues(r *request) {
|
||||
if r.limitIdx != invalidIndex {
|
||||
heap.Remove(q.limitQueue, r.limitIdx)
|
||||
}
|
||||
if r.sharesIdx != invalidIndex {
|
||||
heap.Remove(q.sharesQueue, r.sharesIdx)
|
||||
}
|
||||
if r.readyIdx != invalidIndex {
|
||||
heap.Remove(q.readyQueue, r.readyIdx)
|
||||
}
|
||||
if r.reservationIdx != invalidIndex {
|
||||
heap.Remove(q.reservationQueue, r.reservationIdx)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *MClock) requestCompleted() {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
if q.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if q.inProgress == 0 {
|
||||
panic("invalid requests count")
|
||||
}
|
||||
q.inProgress--
|
||||
q.scheduleRequest(true)
|
||||
}
|
||||
|
||||
func assertIndexInvalid(r *request) {
|
||||
if r.limitIdx != invalidIndex {
|
||||
panic("limitIdx is not -1")
|
||||
}
|
||||
if r.sharesIdx != invalidIndex {
|
||||
panic("sharesIdx is not -1")
|
||||
}
|
||||
if r.reservationIdx != invalidIndex {
|
||||
panic("reservationIdx is not -1")
|
||||
}
|
||||
if r.readyIdx != invalidIndex {
|
||||
panic("readyIdx is not -1")
|
||||
}
|
||||
}
|
||||
|
||||
// Len implements heap.Interface.
|
||||
func (q *queue) Len() int {
|
||||
return len(q.items)
|
||||
}
|
||||
|
||||
// Less implements heap.Interface.
|
||||
func (q *queue) Less(i int, j int) bool {
|
||||
return q.items[i].ts() < q.items[j].ts()
|
||||
}
|
||||
|
||||
// Pop implements heap.Interface.
|
||||
func (q *queue) Pop() any {
|
||||
n := len(q.items)
|
||||
item := q.items[n-1]
|
||||
q.items[n-1] = nil
|
||||
q.items = q.items[0 : n-1]
|
||||
item.setIndex(invalidIndex)
|
||||
return item
|
||||
}
|
||||
|
||||
// Push implements heap.Interface.
|
||||
func (q *queue) Push(x any) {
|
||||
it := x.(queueItem)
|
||||
it.setIndex(q.Len())
|
||||
q.items = append(q.items, it)
|
||||
}
|
||||
|
||||
// Swap implements heap.Interface.
|
||||
func (q *queue) Swap(i int, j int) {
|
||||
q.items[i], q.items[j] = q.items[j], q.items[i]
|
||||
q.items[i].setIndex(i)
|
||||
q.items[j].setIndex(j)
|
||||
}
|
||||
|
||||
var _ queueItem = &reservationMQueueItem{}
|
||||
|
||||
type reservationMQueueItem struct {
|
||||
r *request
|
||||
}
|
||||
|
||||
func (i *reservationMQueueItem) ts() float64 {
|
||||
return i.r.reservation
|
||||
}
|
||||
|
||||
func (i *reservationMQueueItem) setIndex(idx int) {
|
||||
i.r.reservationIdx = idx
|
||||
}
|
||||
|
||||
var _ queueItem = &limitMQueueItem{}
|
||||
|
||||
type limitMQueueItem struct {
|
||||
r *request
|
||||
}
|
||||
|
||||
func (i *limitMQueueItem) ts() float64 {
|
||||
return i.r.limit
|
||||
}
|
||||
|
||||
func (i *limitMQueueItem) setIndex(idx int) {
|
||||
i.r.limitIdx = idx
|
||||
}
|
||||
|
||||
var _ queueItem = &sharesMQueueItem{}
|
||||
|
||||
type sharesMQueueItem struct {
|
||||
r *request
|
||||
}
|
||||
|
||||
func (i *sharesMQueueItem) ts() float64 {
|
||||
return i.r.shares
|
||||
}
|
||||
|
||||
func (i *sharesMQueueItem) setIndex(idx int) {
|
||||
i.r.sharesIdx = idx
|
||||
}
|
||||
|
||||
var _ queueItem = &readyMQueueItem{}
|
||||
|
||||
type readyMQueueItem struct {
|
||||
r *request
|
||||
}
|
||||
|
||||
func (i *readyMQueueItem) ts() float64 {
|
||||
return i.r.shares
|
||||
}
|
||||
|
||||
func (i *readyMQueueItem) setIndex(idx int) {
|
||||
i.r.readyIdx = idx
|
||||
}
|
||||
|
||||
type scheduleInfo struct {
|
||||
ts float64
|
||||
f func()
|
||||
}
|
||||
|
||||
type systemClock struct {
|
||||
since time.Time
|
||||
schedule chan scheduleInfo
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newSystemClock() *systemClock {
|
||||
c := &systemClock{
|
||||
since: time.Now(),
|
||||
schedule: make(chan scheduleInfo),
|
||||
}
|
||||
c.start()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *systemClock) now() float64 {
|
||||
return time.Since(c.since).Seconds()
|
||||
}
|
||||
|
||||
func (c *systemClock) runAt(ts float64, f func()) {
|
||||
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||
}
|
||||
|
||||
func (c *systemClock) close() {
|
||||
close(c.schedule)
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
func (c *systemClock) start() {
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
t := time.NewTimer(time.Hour)
|
||||
var f func()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if f != nil {
|
||||
f()
|
||||
f = nil
|
||||
}
|
||||
t.Reset(time.Hour)
|
||||
case s, ok := <-c.schedule:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
now := c.now()
|
||||
if now >= s.ts {
|
||||
s.f()
|
||||
f = nil
|
||||
continue
|
||||
}
|
||||
if !t.Stop() {
|
||||
select {
|
||||
case <-t.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
t.Reset(time.Duration((s.ts - now) * 1e9))
|
||||
f = s.f
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue