[#9999] Add mClock implementation

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-11-28 18:52:19 +03:00
parent 10f683a2f5
commit fb15809357
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
2 changed files with 290 additions and 0 deletions

231
pkg/core/quota/mclock.go Normal file
View file

@ -0,0 +1,231 @@
package quota
import (
"container/heap"
"math"
)
const (
invalidIndex = -1
zeroReservation = math.SmallestNonzeroFloat64
maxLimit = math.MaxFloat64
)
type mQueueItem interface {
tag() float64
setIndex(idx int)
}
type mQueue struct {
items []mQueueItem
}
type request struct {
vm string
seqNumber uint64
reservation float64
limit float64
shares float64
reservationIdx int
limitIdx int
sharesIdx int
}
type vmData struct {
reservation float64
limit float64
shares float64
}
type clock interface {
now() float64
}
type worker interface {
do(vm string, seqNumber uint64)
}
type mClockQueue struct {
current uint64
limit uint64
clock clock
vmData map[string]vmData
previous map[string]*request
reservationQueue *mQueue
limitQueue *mQueue
sharesQueue *mQueue
index uint64
worker worker
}
func (q *mClockQueue) RequestArrival(vm string) {
now := q.clock.now()
vmData, ok := q.vmData[vm]
if !ok {
panic("unknown vm: " + vm)
}
prev, ok := q.previous[vm]
if !ok {
panic("unknown previous: " + vm)
}
r := request{
vm: vm,
reservation: max(prev.reservation+1.0/vmData.reservation, now),
limit: max(prev.limit+1.0/vmData.limit, now),
shares: max(prev.shares+1.0/vmData.shares, now),
seqNumber: q.index,
reservationIdx: invalidIndex,
limitIdx: invalidIndex,
sharesIdx: invalidIndex,
}
q.index++
q.previous[vm] = &r
heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r})
heap.Push(q.limitQueue, &limitMQueueItem{r: &r})
q.scheduleRequest()
}
func (q *mClockQueue) scheduleRequest() {
if q.current >= q.limit {
return
}
now := q.clock.now()
if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].tag() <= now {
it := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
if it.r.limitIdx != invalidIndex {
heap.Remove(q.limitQueue, it.r.limitIdx)
}
if it.r.sharesIdx != invalidIndex {
heap.Remove(q.sharesQueue, it.r.sharesIdx)
}
assertIndexInvalid(it.r)
q.current++
q.worker.do(it.r.vm, it.r.seqNumber)
return
}
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].tag() <= now {
it := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.sharesQueue, &sharesMQueueItem{r: it.r})
}
if q.sharesQueue.Len() > 0 {
it := heap.Pop(q.sharesQueue).(*sharesMQueueItem)
if it.r.reservationIdx != invalidIndex {
heap.Remove(q.reservationQueue, it.r.reservationIdx)
}
var updated bool
vmData, ok := q.vmData[it.r.vm]
if !ok {
panic("unknown vm: " + it.r.vm)
}
for _, i := range q.reservationQueue.items {
ri := i.(*reservationMQueueItem)
if ri.r.vm == it.r.vm {
ri.r.reservation -= 1.0 / vmData.reservation
updated = true
}
}
if updated {
heap.Init(q.reservationQueue)
}
assertIndexInvalid(it.r)
q.current++
q.worker.do(it.r.vm, it.r.seqNumber)
return
}
}
func (q *mClockQueue) requestCompleted() {
if q.current == 0 {
panic("invalid requets count")
}
q.current--
q.scheduleRequest()
}
func assertIndexInvalid(r *request) {
if r.limitIdx != invalidIndex {
panic("limitIdx is not -1")
}
if r.sharesIdx != invalidIndex {
panic("sharesIdx is not -1")
}
if r.reservationIdx != invalidIndex {
panic("reservationIdx is not -1")
}
}
// Len implements heap.Interface.
func (q *mQueue) Len() int {
return len(q.items)
}
// Less implements heap.Interface.
func (q *mQueue) Less(i int, j int) bool {
return q.items[i].tag() < q.items[j].tag()
}
// Pop implements heap.Interface.
func (q *mQueue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.setIndex(-1)
return item
}
// Push implements heap.Interface.
func (q *mQueue) Push(x any) {
it := x.(mQueueItem)
it.setIndex(q.Len())
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *mQueue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].setIndex(i)
q.items[j].setIndex(j)
}
type reservationMQueueItem struct {
r *request
}
func (i *reservationMQueueItem) tag() float64 {
return i.r.reservation
}
func (i *reservationMQueueItem) setIndex(idx int) {
i.r.reservationIdx = idx
}
type limitMQueueItem struct {
r *request
}
func (i *limitMQueueItem) tag() float64 {
return i.r.limit
}
func (i *limitMQueueItem) setIndex(idx int) {
i.r.limitIdx = idx
}
type sharesMQueueItem struct {
r *request
}
func (i *sharesMQueueItem) tag() float64 {
return i.r.shares
}
func (i *sharesMQueueItem) setIndex(idx int) {
i.r.sharesIdx = idx
}

View file

@ -0,0 +1,59 @@
package quota
import (
"math/rand/v2"
"strconv"
"testing"
"time"
)
func TestMClockSimulation(t *testing.T) {
const maxIter = 100
q := &mClockQueue{
limit: 1,
clock: &systemClock{},
vmData: map[string]vmData{
"class1": {reservation: zeroReservation, limit: maxLimit, shares: 60},
"class2": {reservation: zeroReservation, limit: maxLimit, shares: 30},
},
previous: map[string]*request{
"class1": {reservation: 0.0, limit: 0.0, shares: 0.0},
"class2": {reservation: 0.0, limit: 0.0, shares: 0.0},
},
reservationQueue: &mQueue{
items: make([]mQueueItem, 0),
},
limitQueue: &mQueue{
items: make([]mQueueItem, 0),
},
sharesQueue: &mQueue{
items: make([]mQueueItem, 0),
},
worker: &consoleWorker{t: t},
}
for i := 0; i < maxIter; i++ {
vmNumber := rand.Int64N(2) + 1
q.RequestArrival("class" + strconv.FormatInt(vmNumber, 10))
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < maxIter; i++ {
q.requestCompleted()
}
t.Fail()
}
type systemClock struct {
start time.Time
}
func (c *systemClock) now() float64 {
return time.Since(c.start).Seconds()
}
type consoleWorker struct {
t *testing.T
}
func (w *consoleWorker) do(vm string, seqNumber uint64) {
w.t.Logf("request for vm %s with seq number %d scheduled\n", vm, seqNumber)
}