forked from TrueCloudLab/frostfs-node
[#9999] quota: Refactor mClock definitions
Use ts instead of tag. Use tag instead of vm. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
4e6c58456d
commit
7256a12ff6
2 changed files with 27 additions and 26 deletions
|
@ -12,7 +12,7 @@ const (
|
|||
)
|
||||
|
||||
type mQueueItem interface {
|
||||
tag() float64
|
||||
ts() float64
|
||||
setIndex(idx int)
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ type mQueue struct {
|
|||
}
|
||||
|
||||
type request struct {
|
||||
vm string
|
||||
tag string
|
||||
seqNumber uint64
|
||||
|
||||
reservation float64
|
||||
|
@ -33,7 +33,7 @@ type request struct {
|
|||
sharesIdx int
|
||||
}
|
||||
|
||||
type vmData struct {
|
||||
type tagInfo struct {
|
||||
reservation float64
|
||||
limit float64
|
||||
shares float64
|
||||
|
@ -44,14 +44,14 @@ type clock interface {
|
|||
}
|
||||
|
||||
type worker interface {
|
||||
do(vm string, seqNumber uint64)
|
||||
do(tag string, seqNumber uint64)
|
||||
}
|
||||
|
||||
type mClockQueue struct {
|
||||
current uint64
|
||||
limit uint64
|
||||
clock clock
|
||||
vmData map[string]vmData
|
||||
tagInfo map[string]tagInfo
|
||||
previous map[string]*request
|
||||
|
||||
reservationQueue *mQueue
|
||||
|
@ -62,28 +62,28 @@ type mClockQueue struct {
|
|||
worker worker
|
||||
}
|
||||
|
||||
func (q *mClockQueue) RequestArrival(vm string) {
|
||||
func (q *mClockQueue) RequestArrival(tag string) {
|
||||
now := q.clock.now()
|
||||
vmData, ok := q.vmData[vm]
|
||||
tagInfo, ok := q.tagInfo[tag]
|
||||
if !ok {
|
||||
panic("unknown vm: " + vm)
|
||||
panic("unknown tag: " + tag)
|
||||
}
|
||||
prev, ok := q.previous[vm]
|
||||
prev, ok := q.previous[tag]
|
||||
if !ok {
|
||||
panic("unknown previous: " + vm)
|
||||
panic("undefined previous: " + tag)
|
||||
}
|
||||
r := request{
|
||||
vm: vm,
|
||||
reservation: max(prev.reservation+1.0/vmData.reservation, now),
|
||||
limit: max(prev.limit+1.0/vmData.limit, now),
|
||||
shares: max(prev.shares+1.0/vmData.shares, now),
|
||||
tag: tag,
|
||||
reservation: max(prev.reservation+1.0/tagInfo.reservation, now),
|
||||
limit: max(prev.limit+1.0/tagInfo.limit, now),
|
||||
shares: max(prev.shares+1.0/tagInfo.shares, now),
|
||||
seqNumber: q.index,
|
||||
reservationIdx: invalidIndex,
|
||||
limitIdx: invalidIndex,
|
||||
sharesIdx: invalidIndex,
|
||||
}
|
||||
q.index++
|
||||
q.previous[vm] = &r
|
||||
q.previous[tag] = &r
|
||||
heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r})
|
||||
heap.Push(q.limitQueue, &limitMQueueItem{r: &r})
|
||||
q.scheduleRequest()
|
||||
|
@ -91,10 +91,11 @@ func (q *mClockQueue) RequestArrival(vm string) {
|
|||
|
||||
func (q *mClockQueue) scheduleRequest() {
|
||||
if q.current >= q.limit {
|
||||
// next request will be scheduled by requestCompleted()
|
||||
return
|
||||
}
|
||||
now := q.clock.now()
|
||||
if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].tag() <= now {
|
||||
if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
||||
it := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||
if it.r.limitIdx != invalidIndex {
|
||||
heap.Remove(q.limitQueue, it.r.limitIdx)
|
||||
|
@ -104,11 +105,11 @@ func (q *mClockQueue) scheduleRequest() {
|
|||
}
|
||||
assertIndexInvalid(it.r)
|
||||
q.current++
|
||||
q.worker.do(it.r.vm, it.r.seqNumber)
|
||||
q.worker.do(it.r.tag, it.r.seqNumber)
|
||||
return
|
||||
}
|
||||
|
||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].tag() <= now {
|
||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
||||
it := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||
heap.Push(q.sharesQueue, &sharesMQueueItem{r: it.r})
|
||||
}
|
||||
|
@ -119,14 +120,14 @@ func (q *mClockQueue) scheduleRequest() {
|
|||
heap.Remove(q.reservationQueue, it.r.reservationIdx)
|
||||
}
|
||||
var updated bool
|
||||
vmData, ok := q.vmData[it.r.vm]
|
||||
tagInfo, ok := q.tagInfo[it.r.tag]
|
||||
if !ok {
|
||||
panic("unknown vm: " + it.r.vm)
|
||||
panic("unknown tag: " + it.r.tag)
|
||||
}
|
||||
for _, i := range q.reservationQueue.items {
|
||||
ri := i.(*reservationMQueueItem)
|
||||
if ri.r.vm == it.r.vm {
|
||||
ri.r.reservation -= 1.0 / vmData.reservation
|
||||
if ri.r.tag == it.r.tag {
|
||||
ri.r.reservation -= 1.0 / tagInfo.reservation
|
||||
updated = true
|
||||
}
|
||||
}
|
||||
|
@ -135,7 +136,7 @@ func (q *mClockQueue) scheduleRequest() {
|
|||
}
|
||||
assertIndexInvalid(it.r)
|
||||
q.current++
|
||||
q.worker.do(it.r.vm, it.r.seqNumber)
|
||||
q.worker.do(it.r.tag, it.r.seqNumber)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +168,7 @@ func (q *mQueue) Len() int {
|
|||
|
||||
// Less implements heap.Interface.
|
||||
func (q *mQueue) Less(i int, j int) bool {
|
||||
return q.items[i].tag() < q.items[j].tag()
|
||||
return q.items[i].ts() < q.items[j].ts()
|
||||
}
|
||||
|
||||
// Pop implements heap.Interface.
|
||||
|
@ -198,7 +199,7 @@ type reservationMQueueItem struct {
|
|||
r *request
|
||||
}
|
||||
|
||||
func (i *reservationMQueueItem) tag() float64 {
|
||||
func (i *reservationMQueueItem) ts() float64 {
|
||||
return i.r.reservation
|
||||
}
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ func TestMClockSimulation(t *testing.T) {
|
|||
q := &mClockQueue{
|
||||
limit: 1,
|
||||
clock: &systemClock{},
|
||||
vmData: map[string]vmData{
|
||||
tagInfo: map[string]tagInfo{
|
||||
"class1": {reservation: zeroReservation, limit: maxLimit, shares: 60},
|
||||
"class2": {reservation: zeroReservation, limit: maxLimit, shares: 30},
|
||||
},
|
||||
|
|
Loading…
Add table
Reference in a new issue