frostfs-qos/scheduling/mclock_test.go
Dmitrii Stepanov 41690c21e5
All checks were successful
DCO action / DCO (pull_request) Successful in 24s
Vulncheck / Vulncheck (pull_request) Successful in 31s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m10s
Tests and linters / Run gofumpt (pull_request) Successful in 1m4s
Tests and linters / Lint (pull_request) Successful in 1m12s
Tests and linters / gopls check (pull_request) Successful in 1m11s
Tests and linters / Staticcheck (pull_request) Successful in 1m18s
Tests and linters / Tests with -race (pull_request) Successful in 1m30s
Tests and linters / Tests (pull_request) Successful in 1m3s
[#9999] mclock: Schedule by limit requests as soon as possible
Let's assume that for some tag `limit = 1000 RPS` defined and each
request takes 10 ms to complete. At some point in time 1000 requests
were accepted. Then first request will be scheduled at `now()`, second -
at `now() + 1 ms`, third - at `now() + 2 ms` etc. Total processing
duration of 1000 requests will be 1 second + 10 ms.

After this fix scheduler looks forward to schedule requests within limit.
So for situation above total processing duration of 1000 requests will be
10 ms in ideal world.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-03-24 15:33:50 +03:00

562 lines
14 KiB
Go

package scheduling
import (
"context"
"math"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestMClockSharesScheduling(t *testing.T) {
t.Parallel()
reqCount := 1000
reqCount = (reqCount / 2) * 2
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2},
"class2": {Share: 1},
}, 100)
require.NoError(t, err)
q.clock = &noopClock{}
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
var result []string
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-requests[i].scheduled
result = append(result, requests[i].tag)
releases[i]()
}()
}
wg.Wait()
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
// because the ratio is 2 to 1.
// However, there may be deviations due to rounding and sorting.
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
var class1Count int
var class2Count int
var class2MaxSeq int
for _, res := range result {
switch res {
case "class1":
class1Count++
class2MaxSeq = 0
case "class2":
class2Count++
class2MaxSeq++
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
default:
require.Fail(t, "unknown tag")
}
}
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
}
var _ clock = &noopClock{}
type noopClock struct {
v float64
runAtValue *float64
}
func (n *noopClock) now() float64 {
return n.v
}
func (n *noopClock) runAt(ts float64, f func()) {
n.runAtValue = &ts
}
func (n *noopClock) close() {}
func TestMClockRequestCancel(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2},
"class2": {Share: 1},
}, 100)
require.NoError(t, err)
q.clock = &noopClock{}
release1, err := q.RequestArrival(context.Background(), "class1")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
release2, err := q.RequestArrival(ctx, "class1")
require.Nil(t, release2)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
release1()
}
func TestMClockLimitScheduling(t *testing.T) {
t.Parallel()
reqCount := 100
reqCount = (reqCount / 2) * 2
limit := 1.0
cl := &noopClock{}
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit},
}, 100)
require.NoError(t, err)
q.clock = cl
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
q.scheduleRequest()
for _, req := range requests {
select {
case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
default:
}
}
cl.v = math.MaxFloat64
var result []string
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-requests[i].scheduled
result = append(result, requests[i].tag)
releases[i]()
}()
}
q.scheduleRequest()
wg.Wait()
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
// because the ratio is 2 to 1.
// However, there may be deviations due to rounding and sorting.
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
var class1Count int
var class2Count int
var class2MaxSeq int
for _, res := range result {
switch res {
case "class1":
class1Count++
class2MaxSeq = 0
case "class2":
class2Count++
class2MaxSeq++
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
default:
require.Fail(t, "unknown tag")
}
}
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
}
func TestMClockReservationScheduling(t *testing.T) {
t.Parallel()
reqCount := 1000
reqCount = (reqCount / 2) * 2
limit := 0.01 // 1 request in 100 seconds
resevation := 100.0 // 100 RPS
cl := &noopClock{}
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
}, 100)
require.NoError(t, err)
q.clock = cl
var releases []ReleaseFunc
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
tag = "class2"
for i := 0; i < reqCount/2; i++ {
req, release, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
releases = append(releases, release)
}
q.scheduleRequest()
for _, req := range requests {
select {
case <-req.scheduled:
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
default:
}
}
cl.v = 1.00001 // 1s elapsed
q.scheduleRequest()
var result []string
for i, req := range requests {
select {
case <-req.scheduled:
result = append(result, requests[i].tag)
releases[i]()
default:
}
}
require.Equal(t, 100, len(result))
for _, res := range result {
require.Equal(t, "class2", res)
}
cl.v = math.MaxFloat64
q.scheduleRequest()
require.Equal(t, 0, q.readyQueue.Len())
require.Equal(t, 0, q.sharesQueue.Len())
require.Equal(t, 0, q.limitQueue.Len())
require.Equal(t, 0, q.reservationQueue.Len())
}
func TestMClockIdleTag(t *testing.T) {
t.Parallel()
reqCount := 100
idleTimeout := 2 * time.Second
cl := &noopClock{}
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1},
"class2": {Share: 1},
}, idleTimeout)
require.NoError(t, err)
q.clock = cl
var requests []*request
tag := "class1"
for i := 0; i < reqCount/2; i++ {
cl.v += idleTimeout.Seconds() / 2
req, _, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
}
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
cl.v += 2 * idleTimeout.Seconds()
tag = "class2"
req, _, err := q.pushRequest(tag)
require.NoError(t, err)
requests = append(requests, req)
// class2 must be defined as idle, so all shares tags must be adjusted.
for _, req := range requests {
select {
case <-req.scheduled:
default:
require.True(t, req.shares >= cl.v)
}
}
}
func TestMClockClose(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
q.clock = &noopClock{}
requestRunning := make(chan struct{})
checkDone := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
tag := "class1"
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
if err != nil {
return err
}
defer release()
close(requestRunning)
<-checkDone
return nil
})
<-requestRunning
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
return nil
})
// wait until second request will be blocked on wait
for q.waitingCount() == 0 {
time.Sleep(1 * time.Second)
}
q.Close()
release, err := q.RequestArrival(context.Background(), tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
close(checkDone)
require.NoError(t, eg.Wait())
}
func TestMClockWaitLimit(t *testing.T) {
t.Parallel()
q, err := NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
q.clock = &noopClock{}
defer q.Close()
requestRunning := make(chan struct{})
checkDone := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
tag := "class1"
// running request
eg.Go(func() error {
release, err := q.RequestArrival(ctx, tag)
if err != nil {
return err
}
defer release()
close(requestRunning)
<-checkDone
return nil
})
// waiting request
eg.Go(func() error {
<-requestRunning
release, err := q.RequestArrival(ctx, tag)
require.NotNil(t, release)
require.NoError(t, err)
defer release()
<-checkDone
return nil
})
// wait until second request will be waiting
for q.waitingCount() == 0 {
time.Sleep(1 * time.Second)
}
release, err := q.RequestArrival(ctx, tag)
require.Nil(t, release)
require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded)
close(checkDone)
require.NoError(t, eg.Wait())
}
func TestMClockParameterValidation(t *testing.T) {
_, err := NewMClock(0, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.ErrorIs(t, err, ErrInvalidRunLimit)
_, err = NewMClock(1, 0, map[string]TagInfo{
"class1": {Share: 1},
}, 1000)
require.NoError(t, err)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, -1.0)
require.NoError(t, err)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1},
}, 0)
require.NoError(t, err)
negativeValue := -1.0
zeroValue := float64(0)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, ReservedIOPS: &zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, ReservedIOPS: &negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, LimitIOPS: &zeroValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
_, err = NewMClock(1, 1, map[string]TagInfo{
"class1": {Share: 1.0, LimitIOPS: &negativeValue},
}, 1000)
require.ErrorIs(t, err, ErrInvalidTagInfo)
}
func (q *MClock) waitingCount() int {
q.mtx.Lock()
defer q.mtx.Unlock()
return q.sharesQueue.Len()
}
func TestMClockTimeBasedSchedule(t *testing.T) {
t.Parallel()
limit := 1.0 // 1 request per second allowed
cl := &noopClock{v: float64(1.5)}
q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{
"class1": {Share: 1, LimitIOPS: &limit},
}, 100)
require.NoError(t, err)
defer q.Close()
q.clock = cl
running := make(chan struct{})
checked := make(chan struct{})
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
defer release()
close(running)
<-checked
return nil
})
<-running
// request must be scheduled at 2.0
_, _, err = q.pushRequest("class1")
require.NoError(t, err)
require.NotNil(t, cl.runAtValue)
require.Equal(t, cl.v+1.0/limit, *cl.runAtValue)
close(checked)
require.NoError(t, eg.Wait())
}
func TestMClockLowLimit(t *testing.T) {
t.Parallel()
limit := 2.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(5)
eg.Go(func() error {
for range 3 {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
release()
}
return nil
})
require.NoError(t, eg.Wait())
}
func TestMClockLimitTotalTime(t *testing.T) {
t.Parallel()
limit := 10.0 // 10 RPS -> 1 request per 100 ms
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
// 10 requests, each request runs for 500 ms,
// but they should be scheduled as soon as possible,
// so total duration must be less than 1 second
eg, ctx := errgroup.WithContext(context.Background())
startedAt := time.Now()
for range 10 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) <= 1*time.Second)
// 11 requests, limit = 10 RPS, so 10 requests should be
// scheduled as soon as possible, but last request should be
// scheduled at now + 1.0 s
eg, ctx = errgroup.WithContext(context.Background())
startedAt = time.Now()
for range 11 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
}