From 57d895c32167472184eb755df7b1d3114b6a63c7 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 24 Mar 2025 15:33:50 +0300 Subject: [PATCH 1/3] [#13] mclock: Schedule requests as soon as possible Let's assume that for some tag `limit = 1000 RPS` defined and each request takes 10 ms to complete. At some point in time 1000 requests were accepted. Then first request will be scheduled at `now()`, second - at `now() + 1 ms`, third - at `now() + 2 ms` etc. Total processing duration of 1000 requests will be 1 second + 10 ms. After this fix scheduler looks forward to schedule requests within limit. So for situation above total processing duration of 1000 requests will be 10 ms in ideal world. The same for reservation scheduling. Signed-off-by: Dmitrii Stepanov --- scheduling/mclock.go | 4 +-- scheduling/mclock_test.go | 56 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 63a969c..f9bf2d2 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -306,7 +306,7 @@ func (q *MClock) setNextScheduleTimer(now float64) { } func (q *MClock) scheduleByLimitAndWeight(now float64) { - for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { ready := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) } @@ -349,7 +349,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) { } func (q *MClock) scheduleByReservation(now float64) { - for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { + for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 { next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) q.removeFromQueues(next.r) diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index f9da670..81e1811 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) { reqCount = (reqCount / 2) * 2 limit := 0.01 // 1 request in 100 seconds resevation := 100.0 // 100 RPS - cl := &noopClock{} + cl := &noopClock{v: float64(1.0)} q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2, LimitIOPS: &limit}, "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, @@ -237,15 +237,18 @@ func TestMClockReservationScheduling(t *testing.T) { q.scheduleRequest() + count := 0 for _, req := range requests { select { case <-req.scheduled: - require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") + require.Equal(t, req.tag, "class2") + count++ default: } } + require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled") - cl.v = 1.00001 // 1s elapsed + cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy q.scheduleRequest() var result []string @@ -258,7 +261,7 @@ func TestMClockReservationScheduling(t *testing.T) { } } - require.Equal(t, 100, len(result)) + require.Equal(t, 200, len(result)) for _, res := range result { require.Equal(t, "class2", res) } @@ -515,3 +518,48 @@ func TestMClockLowLimit(t *testing.T) { }) require.NoError(t, eg.Wait()) } + +func TestMClockLimitTotalTime(t *testing.T) { + t.Parallel() + limit := 10.0 // 10 RPS -> 1 request per 100 ms + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + // 10 requests, each request runs for 500 ms, + // but they should be scheduled as soon as possible, + // so total duration must be less than 1 second + eg, ctx := errgroup.WithContext(context.Background()) + startedAt := time.Now() + for range 10 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) <= 1*time.Second) + + // 11 requests, limit = 10 RPS, so 10 requests should be + // scheduled as soon as possible, but last request should be + // scheduled at now + 1.0 s + eg, ctx = errgroup.WithContext(context.Background()) + startedAt = time.Now() + for range 11 { + eg.Go(func() error { + release, err := q.RequestArrival(ctx, "class1") + require.NoError(t, err) + time.Sleep(500 * time.Millisecond) + release() + return nil + }) + } + require.NoError(t, eg.Wait()) + require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) + require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests +} From 6c6e5bf4de10503f7e86e7916229f307b12ce184 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 28 Mar 2025 13:39:29 +0300 Subject: [PATCH 2/3] [#14] mclock: Allow to prohibit tag requests It is now possible to restrict requests for a specific tag. A separate field in `TagInfo` is used to avoid comparing float64 values with zero. Signed-off-by: Dmitrii Stepanov --- scheduling/mclock.go | 5 +++++ scheduling/mclock_test.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/scheduling/mclock.go b/scheduling/mclock.go index f9bf2d2..64c62a8 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -22,6 +22,7 @@ var ( ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") + ErrTagRequestsProhibited = errors.New("tag requests are prohibited") ) type request struct { @@ -49,6 +50,7 @@ type TagInfo struct { ReservedIOPS *float64 LimitIOPS *float64 Share float64 + Prohibited bool } // MClock is mClock scheduling algorithm implementation. @@ -196,6 +198,9 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { if !ok { return nil, nil, ErrMClockSchedulerUnknownTag } + if tagInfo.Prohibited { + return nil, nil, ErrTagRequestsProhibited + } prev, ok := q.previous[tag] assert.Cond(ok, "undefined previous:", tag) diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 81e1811..6433990 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -563,3 +563,22 @@ func TestMClockLimitTotalTime(t *testing.T) { require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests } + +func TestMClockRestictTagRequests(t *testing.T) { + t.Parallel() + limit := 10.0 + q, err := NewMClock(100, 100, map[string]TagInfo{ + "class1": {Share: 50, LimitIOPS: &limit}, + "class2": {Share: 50, LimitIOPS: &limit, Prohibited: true}, + }, 5*time.Second) + require.NoError(t, err) + defer q.Close() + + release, err := q.RequestArrival(context.Background(), "class1") + require.NoError(t, err) + release() + + release, err = q.RequestArrival(context.Background(), "class2") + require.ErrorIs(t, err, ErrTagRequestsProhibited) + require.Nil(t, release) +} From b5ed0b6eff475ecaa61e1a33b5346f449806ee37 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 28 Mar 2025 13:43:11 +0300 Subject: [PATCH 3/3] [#14] CODEOWNERS: Use core commiters and developers groups Signed-off-by: Dmitrii Stepanov --- CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CODEOWNERS b/CODEOWNERS index b6fa647..d19c96a 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,3 +1,3 @@ -.* @fyrchik +.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers .forgejo/.* @potyarkin Makefile @potyarkin