From fb158093570cdeb3a3872e2e81c0de10ea23cef1 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 28 Nov 2024 18:52:19 +0300 Subject: [PATCH] [#9999] Add mClock implementation Signed-off-by: Dmitrii Stepanov --- pkg/core/quota/mclock.go | 231 ++++++++++++++++++++++++++++++++++ pkg/core/quota/mclock_test.go | 59 +++++++++ 2 files changed, 290 insertions(+) create mode 100644 pkg/core/quota/mclock.go create mode 100644 pkg/core/quota/mclock_test.go diff --git a/pkg/core/quota/mclock.go b/pkg/core/quota/mclock.go new file mode 100644 index 000000000..8d907e0cb --- /dev/null +++ b/pkg/core/quota/mclock.go @@ -0,0 +1,231 @@ +package quota + +import ( + "container/heap" + "math" +) + +const ( + invalidIndex = -1 + zeroReservation = math.SmallestNonzeroFloat64 + maxLimit = math.MaxFloat64 +) + +type mQueueItem interface { + tag() float64 + setIndex(idx int) +} + +type mQueue struct { + items []mQueueItem +} + +type request struct { + vm string + seqNumber uint64 + + reservation float64 + limit float64 + shares float64 + + reservationIdx int + limitIdx int + sharesIdx int +} + +type vmData struct { + reservation float64 + limit float64 + shares float64 +} + +type clock interface { + now() float64 +} + +type worker interface { + do(vm string, seqNumber uint64) +} + +type mClockQueue struct { + current uint64 + limit uint64 + clock clock + vmData map[string]vmData + previous map[string]*request + + reservationQueue *mQueue + limitQueue *mQueue + sharesQueue *mQueue + + index uint64 + worker worker +} + +func (q *mClockQueue) RequestArrival(vm string) { + now := q.clock.now() + vmData, ok := q.vmData[vm] + if !ok { + panic("unknown vm: " + vm) + } + prev, ok := q.previous[vm] + if !ok { + panic("unknown previous: " + vm) + } + r := request{ + vm: vm, + reservation: max(prev.reservation+1.0/vmData.reservation, now), + limit: max(prev.limit+1.0/vmData.limit, now), + shares: max(prev.shares+1.0/vmData.shares, now), + seqNumber: q.index, + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + } + q.index++ + q.previous[vm] = &r + heap.Push(q.reservationQueue, &reservationMQueueItem{r: &r}) + heap.Push(q.limitQueue, &limitMQueueItem{r: &r}) + q.scheduleRequest() +} + +func (q *mClockQueue) scheduleRequest() { + if q.current >= q.limit { + return + } + now := q.clock.now() + if q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].tag() <= now { + it := heap.Pop(q.reservationQueue).(*reservationMQueueItem) + if it.r.limitIdx != invalidIndex { + heap.Remove(q.limitQueue, it.r.limitIdx) + } + if it.r.sharesIdx != invalidIndex { + heap.Remove(q.sharesQueue, it.r.sharesIdx) + } + assertIndexInvalid(it.r) + q.current++ + q.worker.do(it.r.vm, it.r.seqNumber) + return + } + + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].tag() <= now { + it := heap.Pop(q.limitQueue).(*limitMQueueItem) + heap.Push(q.sharesQueue, &sharesMQueueItem{r: it.r}) + } + + if q.sharesQueue.Len() > 0 { + it := heap.Pop(q.sharesQueue).(*sharesMQueueItem) + if it.r.reservationIdx != invalidIndex { + heap.Remove(q.reservationQueue, it.r.reservationIdx) + } + var updated bool + vmData, ok := q.vmData[it.r.vm] + if !ok { + panic("unknown vm: " + it.r.vm) + } + for _, i := range q.reservationQueue.items { + ri := i.(*reservationMQueueItem) + if ri.r.vm == it.r.vm { + ri.r.reservation -= 1.0 / vmData.reservation + updated = true + } + } + if updated { + heap.Init(q.reservationQueue) + } + assertIndexInvalid(it.r) + q.current++ + q.worker.do(it.r.vm, it.r.seqNumber) + return + } +} + +func (q *mClockQueue) requestCompleted() { + if q.current == 0 { + panic("invalid requets count") + } + q.current-- + q.scheduleRequest() +} + +func assertIndexInvalid(r *request) { + if r.limitIdx != invalidIndex { + panic("limitIdx is not -1") + } + if r.sharesIdx != invalidIndex { + panic("sharesIdx is not -1") + } + if r.reservationIdx != invalidIndex { + panic("reservationIdx is not -1") + } +} + +// Len implements heap.Interface. +func (q *mQueue) Len() int { + return len(q.items) +} + +// Less implements heap.Interface. +func (q *mQueue) Less(i int, j int) bool { + return q.items[i].tag() < q.items[j].tag() +} + +// Pop implements heap.Interface. +func (q *mQueue) Pop() any { + n := len(q.items) + item := q.items[n-1] + q.items[n-1] = nil + q.items = q.items[0 : n-1] + item.setIndex(-1) + return item +} + +// Push implements heap.Interface. +func (q *mQueue) Push(x any) { + it := x.(mQueueItem) + it.setIndex(q.Len()) + q.items = append(q.items, it) +} + +// Swap implements heap.Interface. +func (q *mQueue) Swap(i int, j int) { + q.items[i], q.items[j] = q.items[j], q.items[i] + q.items[i].setIndex(i) + q.items[j].setIndex(j) +} + +type reservationMQueueItem struct { + r *request +} + +func (i *reservationMQueueItem) tag() float64 { + return i.r.reservation +} + +func (i *reservationMQueueItem) setIndex(idx int) { + i.r.reservationIdx = idx +} + +type limitMQueueItem struct { + r *request +} + +func (i *limitMQueueItem) tag() float64 { + return i.r.limit +} + +func (i *limitMQueueItem) setIndex(idx int) { + i.r.limitIdx = idx +} + +type sharesMQueueItem struct { + r *request +} + +func (i *sharesMQueueItem) tag() float64 { + return i.r.shares +} + +func (i *sharesMQueueItem) setIndex(idx int) { + i.r.sharesIdx = idx +} diff --git a/pkg/core/quota/mclock_test.go b/pkg/core/quota/mclock_test.go new file mode 100644 index 000000000..effc7b33c --- /dev/null +++ b/pkg/core/quota/mclock_test.go @@ -0,0 +1,59 @@ +package quota + +import ( + "math/rand/v2" + "strconv" + "testing" + "time" +) + +func TestMClockSimulation(t *testing.T) { + const maxIter = 100 + q := &mClockQueue{ + limit: 1, + clock: &systemClock{}, + vmData: map[string]vmData{ + "class1": {reservation: zeroReservation, limit: maxLimit, shares: 60}, + "class2": {reservation: zeroReservation, limit: maxLimit, shares: 30}, + }, + previous: map[string]*request{ + "class1": {reservation: 0.0, limit: 0.0, shares: 0.0}, + "class2": {reservation: 0.0, limit: 0.0, shares: 0.0}, + }, + reservationQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + limitQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + sharesQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + worker: &consoleWorker{t: t}, + } + for i := 0; i < maxIter; i++ { + vmNumber := rand.Int64N(2) + 1 + q.RequestArrival("class" + strconv.FormatInt(vmNumber, 10)) + time.Sleep(10 * time.Millisecond) + } + for i := 0; i < maxIter; i++ { + q.requestCompleted() + } + t.Fail() +} + +type systemClock struct { + start time.Time +} + +func (c *systemClock) now() float64 { + return time.Since(c.start).Seconds() +} + +type consoleWorker struct { + t *testing.T +} + +func (w *consoleWorker) do(vm string, seqNumber uint64) { + w.t.Logf("request for vm %s with seq number %d scheduled\n", vm, seqNumber) +}