From 41690c21e5e32e3cedd7745bb66c074686b7ac9c Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 24 Mar 2025 15:33:50 +0300 Subject: [PATCH] [#9999] mclock: Schedule by limit requests as soon as possible Let's assume that for some tag `limit = 1000 RPS` defined and each request takes 10 ms to complete. At some point in time 1000 requests were accepted. Then first request will be scheduled at `now()`, second - at `now() + 1 ms`, third - at `now() + 2 ms` etc. Total processing duration of 1000 requests will be 1 second + 10 ms. After this fix scheduler looks forward to schedule requests within limit. So for situation above total processing duration of 1000 requests will be 10 ms in ideal world. Signed-off-by: Dmitrii Stepanov --- scheduling/mclock.go | 2 +- scheduling/mclock_test.go | 45 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 63a969c..e87d919 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -306,7 +306,7 @@ func (q *MClock) setNextScheduleTimer(now float64) { } func (q *MClock) scheduleByLimitAndWeight(now float64) { - for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { ready := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) } diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index f9da670..056f4b9 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -515,3 +515,48 @@ func TestMClockLowLimit(t *testing.T) { }) require.NoError(t, eg.Wait()) } + +func TestMClockLimitTotalTime(t *testing.T) { + t.Parallel() + limit := 10.0 // 10 RPS -> 1 request per 100 ms + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + // 10 requests, each request runs for 500 ms, + // but they should be scheduled as soon as possible, + // so total duration must be less than 1 second + eg, ctx := errgroup.WithContext(context.Background()) + startedAt := time.Now() + for range 10 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) <= 1*time.Second) + + // 11 requests, limit = 10 RPS, so 10 requests should be + // scheduled as soon as possible, but last request should be + // scheduled at now + 1.0 s + eg, ctx = errgroup.WithContext(context.Background()) + startedAt = time.Now() + for range 11 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) + require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests +}