[#9999] quota: Do not use reservation queue if reservation is not defined

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-12-05 12:24:51 +03:00
parent 9eda612274
commit 2f66f30e25
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
2 changed files with 67 additions and 51 deletions

View file

@ -6,9 +6,8 @@ import (
)
const (
invalidIndex = -1
zeroReservation = math.SmallestNonzeroFloat64
maxLimit = math.MaxFloat64
invalidIndex = -1
undefinedReservation float64 = -1.0
)
type mQueueItem interface {
@ -34,8 +33,8 @@ type request struct {
}
type tagInfo struct {
reservation float64
limit float64
reservation *float64
limit *float64
shares float64
}
@ -48,11 +47,11 @@ type worker interface {
}
type mClockQueue struct {
current uint64
limit uint64
clock clock
tagInfo map[string]tagInfo
previous map[string]*request
inProgress uint64
limit uint64
clock clock
tagInfo map[string]tagInfo
previous map[string]*request
reservationQueue *mQueue
limitQueue *mQueue
@ -74,80 +73,97 @@ func (q *mClockQueue) RequestArrival(tag string) {
}
r := request{
tag: tag,
reservation: max(prev.reservation+1.0/tagInfo.reservation, now),
limit: max(prev.limit+1.0/tagInfo.limit, now),
shares: max(prev.shares+1.0/tagInfo.shares, now),
seqNumber: q.index,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
if tagInfo.reservation != nil {
r.reservation = max(prev.reservation + 1.0 / *tagInfo.reservation, now)
} else {
r.reservation = undefinedReservation
}
if tagInfo.limit != nil {
r.limit = max(prev.limit + 1.0 / *tagInfo.limit, now)
} else {
r.limit = max(prev.limit+1.0/math.MaxFloat64, now)
}
q.index++
q.previous[tag] = &r
heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r})
if tagInfo.reservation != nil {
heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r})
}
heap.Push(q.limitQueue, &limitMQueueItem{r: &r})
q.scheduleRequest()
}
func (q *mClockQueue) scheduleRequest() {
if q.current >= q.limit {
if q.inProgress >= q.limit {
// next request will be scheduled by requestCompleted()
return
}
now := q.clock.now()
for q.current < q.limit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
it := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
if it.r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, it.r.limitIdx)
for q.inProgress < q.limit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
if next.r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, next.r.limitIdx)
}
if it.r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, it.r.sharesIdx)
if next.r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, next.r.sharesIdx)
}
assertIndexInvalid(it.r)
q.current++
q.worker.do(it.r.tag, it.r.seqNumber)
assertIndexInvalid(next.r)
q.inProgress++
q.worker.do(next.r.tag, next.r.seqNumber)
}
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
it := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.sharesQueue, &sharesMQueueItem{r: it.r})
}
if q.current >= q.limit {
if q.inProgress >= q.limit {
return
}
for q.current < q.limit && q.sharesQueue.Len() > 0 {
it := heap.Pop(q.sharesQueue).(*sharesMQueueItem)
if it.r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, it.r.reservationIdx)
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.sharesQueue, &sharesMQueueItem{r: ready.r})
}
for q.inProgress < q.limit && q.sharesQueue.Len() > 0 {
next := heap.Pop(q.sharesQueue).(*sharesMQueueItem)
hadReservation := false
if next.r.reservationIdx != invalidIndex {
hadReservation = true
heap.Remove(q.reservationQueue, next.r.reservationIdx)
}
var updated bool
tagInfo, ok := q.tagInfo[it.r.tag]
tagInfo, ok := q.tagInfo[next.r.tag]
if !ok {
panic("unknown tag: " + it.r.tag)
panic("unknown tag: " + next.r.tag)
}
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.tag == it.r.tag {
ri.r.reservation -= 1.0 / tagInfo.reservation
updated = true
if tagInfo.reservation != nil && hadReservation {
var updated bool
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.tag == next.r.tag && ri.r.reservation > next.r.reservation {
ri.r.reservation -= 1.0 / *tagInfo.reservation
updated = true
}
}
if updated {
heap.Init(q.reservationQueue)
}
}
if updated {
heap.Init(q.reservationQueue)
}
assertIndexInvalid(it.r)
q.current++
q.worker.do(it.r.tag, it.r.seqNumber)
assertIndexInvalid(next.r)
q.inProgress++
q.worker.do(next.r.tag, next.r.seqNumber)
}
}
func (q *mClockQueue) requestCompleted() {
if q.current == 0 {
if q.inProgress == 0 {
panic("invalid requests count")
}
q.current--
q.inProgress--
q.scheduleRequest()
}

View file

@ -13,8 +13,8 @@ func TestMClockSimulation(t *testing.T) {
limit: 1,
clock: &systemClock{},
tagInfo: map[string]tagInfo{
"class1": {reservation: zeroReservation, limit: maxLimit, shares: 50},
"class2": {reservation: zeroReservation, limit: maxLimit, shares: 5},
"class1": {shares: 50},
"class2": {shares: 5},
},
previous: map[string]*request{
"class1": {reservation: 0.0, limit: 0.0, shares: 0.0},