Schedule requests as soon as possible #13

Merged
dstepanov-yadro merged 1 commit from dstepanov-yadro/frostfs-qos:fix/limit_scheduling into master 2025-03-28 10:06:02 +00:00
2 changed files with 54 additions and 6 deletions

View file

@ -306,7 +306,7 @@ func (q *MClock) setNextScheduleTimer(now float64) {
}
func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
}
@ -349,7 +349,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
}
func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r)

View file

@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) {
reqCount = (reqCount / 2) * 2
limit := 0.01 // 1 request in 100 seconds
resevation := 100.0 // 100 RPS
cl := &noopClock{}
cl := &noopClock{v: float64(1.0)}
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
@ -237,15 +237,18 @@ func TestMClockReservationScheduling(t *testing.T) {
q.scheduleRequest()
count := 0
for _, req := range requests {
select {
case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
require.Equal(t, req.tag, "class2")
count++
default:
}
}
require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled")
cl.v = 1.00001 // 1s elapsed
cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy
q.scheduleRequest()
var result []string
@ -258,7 +261,7 @@ func TestMClockReservationScheduling(t *testing.T) {
}
}
require.Equal(t, 100, len(result))
require.Equal(t, 200, len(result))
for _, res := range result {
require.Equal(t, "class2", res)
}
@ -515,3 +518,48 @@ func TestMClockLowLimit(t *testing.T) {
})
require.NoError(t, eg.Wait())
}
func TestMClockLimitTotalTime(t *testing.T) {
t.Parallel()
limit := 10.0 // 10 RPS -> 1 request per 100 ms
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
// 10 requests, each request runs for 500 ms,
// but they should be scheduled as soon as possible,
// so total duration must be less than 1 second
eg, ctx := errgroup.WithContext(context.Background())
startedAt := time.Now()
for range 10 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) <= 1*time.Second)
// 11 requests, limit = 10 RPS, so 10 requests should be
// scheduled as soon as possible, but last request should be
// scheduled at now + 1.0 s
eg, ctx = errgroup.WithContext(context.Background())
startedAt = time.Now()
for range 11 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
}