From e70c195c172032cec57f4cc5ba41f4cb78a60d6a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 14 Jan 2025 13:58:08 +0300 Subject: [PATCH] [#9999] quota: Simplify queue creation Signed-off-by: Dmitrii Stepanov --- pkg/core/quota/bench_mclock_test.go | 22 ++--------- pkg/core/quota/mclock.go | 58 +++++++++++++++++++++++------ pkg/core/quota/mclock_test.go | 33 ++++------------ 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/pkg/core/quota/bench_mclock_test.go b/pkg/core/quota/bench_mclock_test.go index f0a25aaba..ebdc0c534 100644 --- a/pkg/core/quota/bench_mclock_test.go +++ b/pkg/core/quota/bench_mclock_test.go @@ -48,28 +48,12 @@ func BenchmarkMClock(b *testing.B) { for _, limit := range limits { for _, reservation := range reservations { for _, tags := range tagsCount { - tagInfos := make(map[string]tagInfo) - previous := make(map[string]*request) + tagInfos := make(map[string]TagInfo) for tag := 0; tag < tags; tag++ { - tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = tagInfo{shares: 50, limit: limit, reservation: reservation} - previous["tag"+strconv.FormatInt(int64(tag), 10)] = &request{} + tagInfos["tag"+strconv.FormatInt(int64(tag), 10)] = TagInfo{shares: 50, limit: limit, reservation: reservation} } - mClockQ := &mClockQueue{ - limit: 100_000, - clock: newSystemClock(), - tagInfo: tagInfos, - previous: previous, - reservationQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - limitQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - sharesQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - } + mClockQ := NewMClockQueue(100_000, tagInfos, (10 * time.Second).Seconds()) resStr := "no" if reservation != nil { diff --git a/pkg/core/quota/mclock.go b/pkg/core/quota/mclock.go index e6f57eb84..73fa36a34 100644 --- a/pkg/core/quota/mclock.go +++ b/pkg/core/quota/mclock.go @@ -41,7 +41,7 @@ type request struct { canceled chan struct{} } -type tagInfo struct { +type TagInfo struct { reservation *float64 limit *float64 shares float64 @@ -52,14 +52,14 @@ type clock interface { runAt(ts float64, f func()) } -type mClockQueue struct { +type MClockQueue struct { limit uint64 clock clock idleTimeout float64 - index atomic.Uint64 - tagInfo map[string]tagInfo + tagInfo map[string]TagInfo mtx sync.Mutex + index atomic.Uint64 previous map[string]*request inProgress uint64 lastSchedule float64 @@ -69,7 +69,43 @@ type mClockQueue struct { readyQueue *mQueue } -func (q *mClockQueue) RequestArrival(ctx context.Context, tag string) (Release, error) { +func NewMClockQueue(limit uint64, tagInfo map[string]TagInfo, idleTimeout float64) *MClockQueue { + result := &MClockQueue{ + limit: limit, + clock: newSystemClock(), + idleTimeout: idleTimeout, + tagInfo: tagInfo, + + reservationQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + limitQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + sharesQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + readyQueue: &mQueue{ + items: make([]mQueueItem, 0), + }, + } + + previous := make(map[string]*request) + for tag := range tagInfo { + previous[tag] = &request{ + tag: tag, + reservationIdx: invalidIndex, + limitIdx: invalidIndex, + sharesIdx: invalidIndex, + } + } + result.previous = previous + result.index.Store(1) // 0 for dummy previous + + return result +} + +func (q *MClockQueue) RequestArrival(ctx context.Context, tag string) (Release, error) { req, release := q.pushRequest(tag) select { case <-ctx.Done(): @@ -80,7 +116,7 @@ func (q *mClockQueue) RequestArrival(ctx context.Context, tag string) (Release, } } -func (q *mClockQueue) dropRequest(req *request) { +func (q *MClockQueue) dropRequest(req *request) { q.mtx.Lock() defer q.mtx.Unlock() @@ -96,7 +132,7 @@ func (q *mClockQueue) dropRequest(req *request) { q.removeFromQueues(req) } -func (q *mClockQueue) pushRequest(tag string) (*request, Release) { +func (q *MClockQueue) pushRequest(tag string) (*request, Release) { q.mtx.Lock() defer q.mtx.Unlock() @@ -149,7 +185,7 @@ func (q *mClockQueue) pushRequest(tag string) (*request, Release) { return r, q.requestCompleted } -func (q *mClockQueue) adjustTags(now float64) { +func (q *MClockQueue) adjustTags(now float64) { if q.sharesQueue.Len() == 0 { return } @@ -173,7 +209,7 @@ func (q *mClockQueue) adjustTags(now float64) { } } -func (q *mClockQueue) scheduleRequest(lockTaken bool) { +func (q *MClockQueue) scheduleRequest(lockTaken bool) { if !lockTaken { q.mtx.Lock() defer q.mtx.Unlock() @@ -266,7 +302,7 @@ func (q *mClockQueue) scheduleRequest(lockTaken bool) { } } -func (q *mClockQueue) removeFromQueues(r *request) { +func (q *MClockQueue) removeFromQueues(r *request) { if r.limitIdx != invalidIndex { heap.Remove(q.limitQueue, r.limitIdx) } @@ -281,7 +317,7 @@ func (q *mClockQueue) removeFromQueues(r *request) { } } -func (q *mClockQueue) requestCompleted() { +func (q *MClockQueue) requestCompleted() { q.mtx.Lock() defer q.mtx.Unlock() diff --git a/pkg/core/quota/mclock_test.go b/pkg/core/quota/mclock_test.go index 088643e77..485d20c82 100644 --- a/pkg/core/quota/mclock_test.go +++ b/pkg/core/quota/mclock_test.go @@ -16,30 +16,11 @@ import ( func TestMClockSimulation(t *testing.T) { t.Skip("use for simulation") const maxIter = 1000 - q := &mClockQueue{ - limit: 1, - clock: newSystemClock(), - tagInfo: map[string]tagInfo{ - "class1": {shares: 10}, - "class2": {shares: 5}, - }, - previous: map[string]*request{ - "class1": {reservation: 0.0, limit: 0.0, shares: 0.0}, - "class2": {reservation: 0.0, limit: 0.0, shares: 0.0}, - }, - reservationQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - limitQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - sharesQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - readyQueue: &mQueue{ - items: make([]mQueueItem, 0), - }, - } + const sleepInterval = 10 * time.Millisecond + q := NewMClockQueue(1, map[string]TagInfo{ + "class1": {shares: 10}, + "class2": {shares: 5}, + }, (10 * sleepInterval).Seconds()) eg, ctx := errgroup.WithContext(context.Background()) var seqNumber atomic.Int64 for i := 0; i < maxIter; i++ { @@ -53,10 +34,10 @@ func TestMClockSimulation(t *testing.T) { } defer release() fmt.Printf("request with seq number %d and tag %s scheduled\n", sn, tag) - time.Sleep(10 * time.Millisecond) + time.Sleep(sleepInterval) return nil }) } require.NoError(t, eg.Wait()) - t.Fail() + t.Fail() // to print }