diff --git a/pkg/core/quota/mclock.go b/pkg/core/quota/mclock.go index 57e02d202..e6af68577 100644 --- a/pkg/core/quota/mclock.go +++ b/pkg/core/quota/mclock.go @@ -12,7 +12,7 @@ const ( ) type mQueueItem interface { - tag() float64 + ts() float64 setIndex(idx int) } @@ -21,7 +21,7 @@ type mQueue struct { } type request struct { - vm string + tag string seqNumber uint64 reservation float64 @@ -33,7 +33,7 @@ type request struct { sharesIdx int } -type vmData struct { +type tagInfo struct { reservation float64 limit float64 shares float64 @@ -44,14 +44,14 @@ type clock interface { } type worker interface { - do(vm string, seqNumber uint64) + do(tag string, seqNumber uint64) } type mClockQueue struct { current uint64 limit uint64 clock clock - vmData map[string]vmData + tagInfo map[string]tagInfo previous map[string]*request reservationQueue *mQueue @@ -62,28 +62,28 @@ type mClockQueue struct { worker worker } -func (q *mClockQueue) RequestArrival(vm string) { +func (q *mClockQueue) RequestArrival(tag string) { now := q.clock.now() - vmData, ok := q.vmData[vm] + tagInfo, ok := q.tagInfo[tag] if !ok { - panic("unknown vm: " + vm) + panic("unknown tag: " + tag) } - prev, ok := q.previous[vm] + prev, ok := q.previous[tag] if !ok { - panic("unknown previous: " + vm) + panic("undefined previous: " + tag) } r := request{ - vm: vm, - reservation: max(prev.reservation+1.0/vmData.reservation, now), - limit: max(prev.limit+1.0/vmData.limit, now), - shares: max(prev.shares+1.0/vmData.shares, now), + tag: tag, + reservation: max(prev.reservation+1.0/tagInfo.reservation, now), + limit: max(prev.limit+1.0/tagInfo.limit, now), + shares: max(prev.shares+1.0/tagInfo.shares, now), seqNumber: q.index, reservationIdx: invalidIndex, limitIdx: invalidIndex, sharesIdx: invalidIndex, } q.index++ - q.previous[vm] = &r + q.previous[tag] = &r heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r}) heap.Push(q.limitQueue, &limitMQueueItem{r: &r}) q.scheduleRequest() @@ -91,10 +91,11 @@ func (q *mClockQueue) RequestArrival(vm string) { func (q *mClockQueue) scheduleRequest() { if q.current >= q.limit { + // next request will be scheduled by requestCompleted() return } now := q.clock.now() - if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].tag() <= now { + if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { it := heap.Pop(q.reservationQueue).(*reservationMQueueItem) if it.r.limitIdx != invalidIndex { heap.Remove(q.limitQueue, it.r.limitIdx) @@ -104,11 +105,11 @@ func (q *mClockQueue) scheduleRequest() { } assertIndexInvalid(it.r) q.current++ - q.worker.do(it.r.vm, it.r.seqNumber) + q.worker.do(it.r.tag, it.r.seqNumber) return } - for q.limitQueue.Len() > 0 && q.limitQueue.items[0].tag() <= now { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { it := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.sharesQueue, &sharesMQueueItem{r: it.r}) } @@ -119,14 +120,14 @@ func (q *mClockQueue) scheduleRequest() { heap.Remove(q.reservationQueue, it.r.reservationIdx) } var updated bool - vmData, ok := q.vmData[it.r.vm] + tagInfo, ok := q.tagInfo[it.r.tag] if !ok { - panic("unknown vm: " + it.r.vm) + panic("unknown tag: " + it.r.tag) } for _, i := range q.reservationQueue.items { ri := i.(*reservationMQueueItem) - if ri.r.vm == it.r.vm { - ri.r.reservation -= 1.0 / vmData.reservation + if ri.r.tag == it.r.tag { + ri.r.reservation -= 1.0 / tagInfo.reservation updated = true } } @@ -135,7 +136,7 @@ func (q *mClockQueue) scheduleRequest() { } assertIndexInvalid(it.r) q.current++ - q.worker.do(it.r.vm, it.r.seqNumber) + q.worker.do(it.r.tag, it.r.seqNumber) return } } @@ -167,7 +168,7 @@ func (q *mQueue) Len() int { // Less implements heap.Interface. func (q *mQueue) Less(i int, j int) bool { - return q.items[i].tag() < q.items[j].tag() + return q.items[i].ts() < q.items[j].ts() } // Pop implements heap.Interface. @@ -198,7 +199,7 @@ type reservationMQueueItem struct { r *request } -func (i *reservationMQueueItem) tag() float64 { +func (i *reservationMQueueItem) ts() float64 { return i.r.reservation } diff --git a/pkg/core/quota/mclock_test.go b/pkg/core/quota/mclock_test.go index effc7b33c..d710f79e3 100644 --- a/pkg/core/quota/mclock_test.go +++ b/pkg/core/quota/mclock_test.go @@ -12,7 +12,7 @@ func TestMClockSimulation(t *testing.T) { q := &mClockQueue{ limit: 1, clock: &systemClock{}, - vmData: map[string]vmData{ + tagInfo: map[string]tagInfo{ "class1": {reservation: zeroReservation, limit: maxLimit, shares: 60}, "class2": {reservation: zeroReservation, limit: maxLimit, shares: 30}, },