diff --git a/CODEOWNERS b/CODEOWNERS index d19c96a..b6fa647 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,3 +1,3 @@ -.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers +.* @fyrchik .forgejo/.* @potyarkin Makefile @potyarkin diff --git a/scheduling/mclock.go b/scheduling/mclock.go index 64c62a8..63a969c 100644 --- a/scheduling/mclock.go +++ b/scheduling/mclock.go @@ -22,7 +22,6 @@ var ( ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") - ErrTagRequestsProhibited = errors.New("tag requests are prohibited") ) type request struct { @@ -50,7 +49,6 @@ type TagInfo struct { ReservedIOPS *float64 LimitIOPS *float64 Share float64 - Prohibited bool } // MClock is mClock scheduling algorithm implementation. @@ -198,9 +196,6 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) { if !ok { return nil, nil, ErrMClockSchedulerUnknownTag } - if tagInfo.Prohibited { - return nil, nil, ErrTagRequestsProhibited - } prev, ok := q.previous[tag] assert.Cond(ok, "undefined previous:", tag) @@ -311,7 +306,7 @@ func (q *MClock) setNextScheduleTimer(now float64) { } func (q *MClock) scheduleByLimitAndWeight(now float64) { - for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { + for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now { ready := heap.Pop(q.limitQueue).(*limitMQueueItem) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) } @@ -354,7 +349,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) { } func (q *MClock) scheduleByReservation(now float64) { - for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 { + for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now { next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) q.removeFromQueues(next.r) diff --git a/scheduling/mclock_test.go b/scheduling/mclock_test.go index 6433990..f9da670 100644 --- a/scheduling/mclock_test.go +++ b/scheduling/mclock_test.go @@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) { reqCount = (reqCount / 2) * 2 limit := 0.01 // 1 request in 100 seconds resevation := 100.0 // 100 RPS - cl := &noopClock{v: float64(1.0)} + cl := &noopClock{} q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2, LimitIOPS: &limit}, "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, @@ -237,18 +237,15 @@ func TestMClockReservationScheduling(t *testing.T) { q.scheduleRequest() - count := 0 for _, req := range requests { select { case <-req.scheduled: - require.Equal(t, req.tag, "class2") - count++ + require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") default: } } - require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled") - cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy + cl.v = 1.00001 // 1s elapsed q.scheduleRequest() var result []string @@ -261,7 +258,7 @@ func TestMClockReservationScheduling(t *testing.T) { } } - require.Equal(t, 200, len(result)) + require.Equal(t, 100, len(result)) for _, res := range result { require.Equal(t, "class2", res) } @@ -518,67 +515,3 @@ func TestMClockLowLimit(t *testing.T) { }) require.NoError(t, eg.Wait()) } - -func TestMClockLimitTotalTime(t *testing.T) { - t.Parallel() - limit := 10.0 // 10 RPS -> 1 request per 100 ms - q, err := NewMClock(100, 100, map[string]TagInfo{ - "class1": {Share: 50, LimitIOPS: &limit}, - }, 5*time.Second) - require.NoError(t, err) - defer q.Close() - - // 10 requests, each request runs for 500 ms, - // but they should be scheduled as soon as possible, - // so total duration must be less than 1 second - eg, ctx := errgroup.WithContext(context.Background()) - startedAt := time.Now() - for range 10 { - eg.Go(func() error { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - release() - return nil - }) - } - require.NoError(t, eg.Wait()) - require.True(t, time.Since(startedAt) <= 1*time.Second) - - // 11 requests, limit = 10 RPS, so 10 requests should be - // scheduled as soon as possible, but last request should be - // scheduled at now + 1.0 s - eg, ctx = errgroup.WithContext(context.Background()) - startedAt = time.Now() - for range 11 { - eg.Go(func() error { - release, err := q.RequestArrival(ctx, "class1") - require.NoError(t, err) - time.Sleep(500 * time.Millisecond) - release() - return nil - }) - } - require.NoError(t, eg.Wait()) - require.True(t, time.Since(startedAt) >= 1500*time.Millisecond) - require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests -} - -func TestMClockRestictTagRequests(t *testing.T) { - t.Parallel() - limit := 10.0 - q, err := NewMClock(100, 100, map[string]TagInfo{ - "class1": {Share: 50, LimitIOPS: &limit}, - "class2": {Share: 50, LimitIOPS: &limit, Prohibited: true}, - }, 5*time.Second) - require.NoError(t, err) - defer q.Close() - - release, err := q.RequestArrival(context.Background(), "class1") - require.NoError(t, err) - release() - - release, err = q.RequestArrival(context.Background(), "class2") - require.ErrorIs(t, err, ErrTagRequestsProhibited) - require.Nil(t, release) -}