forked from TrueCloudLab/frostfs-qos
Let's assume that there are two requests in the queue with execution time t1 and t2. The timer is set to t1. The timer is triggered, schedules the t1 request, calculates the time for the next timer t2 to be triggered. But it doesn't schedules timer to this time because of the `q.timeBasedScheduleTs > nextTs` check. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
517 lines
12 KiB
Go
517 lines
12 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func TestMClockSharesScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 1000
|
|
reqCount = (reqCount / 2) * 2
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2},
|
|
"class2": {Share: 1},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
var result []string
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < reqCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-requests[i].scheduled
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
|
// because the ratio is 2 to 1.
|
|
// However, there may be deviations due to rounding and sorting.
|
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
|
var class1Count int
|
|
var class2Count int
|
|
var class2MaxSeq int
|
|
for _, res := range result {
|
|
switch res {
|
|
case "class1":
|
|
class1Count++
|
|
class2MaxSeq = 0
|
|
case "class2":
|
|
class2Count++
|
|
class2MaxSeq++
|
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
|
default:
|
|
require.Fail(t, "unknown tag")
|
|
}
|
|
}
|
|
|
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
|
}
|
|
|
|
var _ clock = &noopClock{}
|
|
|
|
type noopClock struct {
|
|
v float64
|
|
runAtValue *float64
|
|
}
|
|
|
|
func (n *noopClock) now() float64 {
|
|
return n.v
|
|
}
|
|
|
|
func (n *noopClock) runAt(ts float64, f func()) {
|
|
n.runAtValue = &ts
|
|
}
|
|
|
|
func (n *noopClock) close() {}
|
|
|
|
func TestMClockRequestCancel(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2},
|
|
"class2": {Share: 1},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
release1, err := q.RequestArrival(context.Background(), "class1")
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
|
defer cancel()
|
|
release2, err := q.RequestArrival(ctx, "class1")
|
|
require.Nil(t, release2)
|
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
|
|
release1()
|
|
}
|
|
|
|
func TestMClockLimitScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 100
|
|
reqCount = (reqCount / 2) * 2
|
|
limit := 1.0
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2, LimitIOPS: &limit},
|
|
"class2": {Share: 1, LimitIOPS: &limit},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
q.scheduleRequest()
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
|
default:
|
|
}
|
|
}
|
|
|
|
cl.v = math.MaxFloat64
|
|
|
|
var result []string
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < reqCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-requests[i].scheduled
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
}()
|
|
}
|
|
q.scheduleRequest()
|
|
wg.Wait()
|
|
|
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
|
// because the ratio is 2 to 1.
|
|
// However, there may be deviations due to rounding and sorting.
|
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
|
var class1Count int
|
|
var class2Count int
|
|
var class2MaxSeq int
|
|
for _, res := range result {
|
|
switch res {
|
|
case "class1":
|
|
class1Count++
|
|
class2MaxSeq = 0
|
|
case "class2":
|
|
class2Count++
|
|
class2MaxSeq++
|
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
|
default:
|
|
require.Fail(t, "unknown tag")
|
|
}
|
|
}
|
|
|
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
}
|
|
|
|
func TestMClockReservationScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 1000
|
|
reqCount = (reqCount / 2) * 2
|
|
limit := 0.01 // 1 request in 100 seconds
|
|
resevation := 100.0 // 100 RPS
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2, LimitIOPS: &limit},
|
|
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
q.scheduleRequest()
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
|
default:
|
|
}
|
|
}
|
|
|
|
cl.v = 1.00001 // 1s elapsed
|
|
q.scheduleRequest()
|
|
|
|
var result []string
|
|
for i, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
default:
|
|
}
|
|
}
|
|
|
|
require.Equal(t, 100, len(result))
|
|
for _, res := range result {
|
|
require.Equal(t, "class2", res)
|
|
}
|
|
|
|
cl.v = math.MaxFloat64
|
|
q.scheduleRequest()
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
}
|
|
|
|
func TestMClockIdleTag(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 100
|
|
idleTimeout := 2 * time.Second
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
"class2": {Share: 1},
|
|
}, idleTimeout)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
cl.v += idleTimeout.Seconds() / 2
|
|
req, _, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
}
|
|
|
|
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
|
|
|
|
cl.v += 2 * idleTimeout.Seconds()
|
|
|
|
tag = "class2"
|
|
req, _, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
|
|
// class2 must be defined as idle, so all shares tags must be adjusted.
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
default:
|
|
require.True(t, req.shares >= cl.v)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMClockClose(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
requestRunning := make(chan struct{})
|
|
checkDone := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
tag := "class1"
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer release()
|
|
close(requestRunning)
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
<-requestRunning
|
|
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
|
return nil
|
|
})
|
|
|
|
// wait until second request will be blocked on wait
|
|
for q.waitingCount() == 0 {
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
q.Close()
|
|
|
|
release, err := q.RequestArrival(context.Background(), tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
|
|
|
close(checkDone)
|
|
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestMClockWaitLimit(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
defer q.Close()
|
|
|
|
requestRunning := make(chan struct{})
|
|
checkDone := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
tag := "class1"
|
|
// running request
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer release()
|
|
close(requestRunning)
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
|
|
// waiting request
|
|
eg.Go(func() error {
|
|
<-requestRunning
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.NotNil(t, release)
|
|
require.NoError(t, err)
|
|
defer release()
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
|
|
// wait until second request will be waiting
|
|
for q.waitingCount() == 0 {
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded)
|
|
|
|
close(checkDone)
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestMClockParameterValidation(t *testing.T) {
|
|
_, err := NewMClock(0, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidRunLimit)
|
|
_, err = NewMClock(1, 0, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, -1.0)
|
|
require.NoError(t, err)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 0)
|
|
require.NoError(t, err)
|
|
negativeValue := -1.0
|
|
zeroValue := float64(0)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, ReservedIOPS: &zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, ReservedIOPS: &negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, LimitIOPS: &zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, LimitIOPS: &negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
}
|
|
|
|
func (q *MClock) waitingCount() int {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
return q.sharesQueue.Len()
|
|
}
|
|
|
|
func TestMClockTimeBasedSchedule(t *testing.T) {
|
|
t.Parallel()
|
|
limit := 1.0 // 1 request per second allowed
|
|
cl := &noopClock{v: float64(1.5)}
|
|
q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1, LimitIOPS: &limit},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
defer q.Close()
|
|
q.clock = cl
|
|
|
|
running := make(chan struct{})
|
|
checked := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, "class1")
|
|
require.NoError(t, err)
|
|
defer release()
|
|
close(running)
|
|
<-checked
|
|
return nil
|
|
})
|
|
|
|
<-running
|
|
// request must be scheduled at 2.0
|
|
_, _, err = q.pushRequest("class1")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, cl.runAtValue)
|
|
require.Equal(t, cl.v+1.0/limit, *cl.runAtValue)
|
|
close(checked)
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestMClockLowLimit(t *testing.T) {
|
|
t.Parallel()
|
|
limit := 2.0
|
|
q, err := NewMClock(100, 100, map[string]TagInfo{
|
|
"class1": {Share: 50, LimitIOPS: &limit},
|
|
}, 5*time.Second)
|
|
require.NoError(t, err)
|
|
defer q.Close()
|
|
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
eg.SetLimit(5)
|
|
eg.Go(func() error {
|
|
for range 3 {
|
|
release, err := q.RequestArrival(ctx, "class1")
|
|
require.NoError(t, err)
|
|
release()
|
|
}
|
|
return nil
|
|
})
|
|
require.NoError(t, eg.Wait())
|
|
}
|