forked from TrueCloudLab/frostfs-qos
645 lines
16 KiB
Go
645 lines
16 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func TestMClockSharesScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 1000
|
|
reqCount = (reqCount / 2) * 2
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2},
|
|
"class2": {Share: 1},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
stats := q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(1), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2-1), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
var result []string
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < reqCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-requests[i].scheduled
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
|
// because the ratio is 2 to 1.
|
|
// However, there may be deviations due to rounding and sorting.
|
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
|
var class1Count int
|
|
var class2Count int
|
|
var class2MaxSeq int
|
|
for _, res := range result {
|
|
switch res {
|
|
case "class1":
|
|
class1Count++
|
|
class2MaxSeq = 0
|
|
case "class2":
|
|
class2Count++
|
|
class2MaxSeq++
|
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
|
default:
|
|
require.Fail(t, "unknown tag")
|
|
}
|
|
}
|
|
|
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
|
}
|
|
|
|
var _ clock = &noopClock{}
|
|
|
|
type noopClock struct {
|
|
v float64
|
|
runAtValue *float64
|
|
}
|
|
|
|
func (n *noopClock) now() float64 {
|
|
return n.v
|
|
}
|
|
|
|
func (n *noopClock) runAt(ts float64, f func()) {
|
|
n.runAtValue = &ts
|
|
}
|
|
|
|
func (n *noopClock) close() {}
|
|
|
|
func TestMClockRequestCancel(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2},
|
|
"class2": {Share: 1},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
release1, err := q.RequestArrival(context.Background(), "class1")
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
|
defer cancel()
|
|
release2, err := q.RequestArrival(ctx, "class1")
|
|
require.Nil(t, release2)
|
|
require.ErrorIs(t, err, context.DeadlineExceeded)
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
|
|
stats := q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(1), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
release1()
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMClockLimitScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 100
|
|
reqCount = (reqCount / 2) * 2
|
|
limit := 1.0
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2, LimitIOPS: &limit},
|
|
"class2": {Share: 1, LimitIOPS: &limit},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
q.scheduleRequest()
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
|
default:
|
|
}
|
|
}
|
|
|
|
stats := q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
cl.v = math.MaxFloat64
|
|
|
|
var result []string
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < reqCount; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
<-requests[i].scheduled
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
}()
|
|
}
|
|
q.scheduleRequest()
|
|
wg.Wait()
|
|
|
|
// Requests must be scheduled as class1->class1->class2->class1->class1->class2...,
|
|
// because the ratio is 2 to 1.
|
|
// However, there may be deviations due to rounding and sorting.
|
|
result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail
|
|
var class1Count int
|
|
var class2Count int
|
|
var class2MaxSeq int
|
|
for _, res := range result {
|
|
switch res {
|
|
case "class1":
|
|
class1Count++
|
|
class2MaxSeq = 0
|
|
case "class2":
|
|
class2Count++
|
|
class2MaxSeq++
|
|
require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row
|
|
default:
|
|
require.Fail(t, "unknown tag")
|
|
}
|
|
}
|
|
|
|
require.True(t, (class1Count*100)/(class1Count+class2Count) == 66)
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMClockReservationScheduling(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 1000
|
|
reqCount = (reqCount / 2) * 2
|
|
limit := 0.01 // 1 request in 100 seconds
|
|
resevation := 100.0 // 100 RPS
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 2, LimitIOPS: &limit},
|
|
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var releases []ReleaseFunc
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
tag = "class2"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
req, release, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
releases = append(releases, release)
|
|
}
|
|
|
|
q.scheduleRequest()
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
|
default:
|
|
}
|
|
}
|
|
|
|
stats := q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
cl.v = 1.00001 // 1s elapsed
|
|
q.scheduleRequest()
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(100), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2-100), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
var result []string
|
|
for i, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
result = append(result, requests[i].tag)
|
|
releases[i]()
|
|
default:
|
|
}
|
|
}
|
|
|
|
require.Equal(t, 100, len(result))
|
|
for _, res := range result {
|
|
require.Equal(t, "class2", res)
|
|
}
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(0), s.InProgress())
|
|
require.Equal(t, uint64(reqCount/2-100), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
|
|
cl.v = math.MaxFloat64
|
|
q.scheduleRequest()
|
|
|
|
require.Equal(t, 0, q.readyQueue.Len())
|
|
require.Equal(t, 0, q.sharesQueue.Len())
|
|
require.Equal(t, 0, q.limitQueue.Len())
|
|
require.Equal(t, 0, q.reservationQueue.Len())
|
|
|
|
stats = q.Stats()
|
|
require.Equal(t, 2, len(stats))
|
|
for _, s := range stats {
|
|
switch s.Tag() {
|
|
case "class1":
|
|
require.Equal(t, uint64(reqCount/2), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
case "class2":
|
|
require.Equal(t, uint64(reqCount/2-100), s.InProgress())
|
|
require.Equal(t, uint64(0), s.Pending())
|
|
default:
|
|
require.Fail(t, "unknown tag:"+s.Tag())
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMClockIdleTag(t *testing.T) {
|
|
t.Parallel()
|
|
reqCount := 100
|
|
idleTimeout := 2 * time.Second
|
|
cl := &noopClock{}
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
"class2": {Share: 1},
|
|
}, idleTimeout)
|
|
require.NoError(t, err)
|
|
q.clock = cl
|
|
|
|
var requests []*request
|
|
tag := "class1"
|
|
for i := 0; i < reqCount/2; i++ {
|
|
cl.v += idleTimeout.Seconds() / 2
|
|
req, _, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
}
|
|
|
|
// class1 requests have shares [1.0; 2.0; 3.0; ... ]
|
|
|
|
cl.v += 2 * idleTimeout.Seconds()
|
|
|
|
tag = "class2"
|
|
req, _, err := q.pushRequest(tag)
|
|
require.NoError(t, err)
|
|
requests = append(requests, req)
|
|
|
|
// class2 must be defined as idle, so all shares tags must be adjusted.
|
|
|
|
for _, req := range requests {
|
|
select {
|
|
case <-req.scheduled:
|
|
default:
|
|
require.True(t, req.shares >= cl.v)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMClockClose(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
|
|
requestRunning := make(chan struct{})
|
|
checkDone := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
tag := "class1"
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer release()
|
|
close(requestRunning)
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
<-requestRunning
|
|
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
|
return nil
|
|
})
|
|
|
|
// wait until second request will be blocked on wait
|
|
for q.waitingCount() == 0 {
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
q.Close()
|
|
|
|
release, err := q.RequestArrival(context.Background(), tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerClosed)
|
|
|
|
close(checkDone)
|
|
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestMClockWaitLimit(t *testing.T) {
|
|
t.Parallel()
|
|
q, err := NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
q.clock = &noopClock{}
|
|
defer q.Close()
|
|
|
|
requestRunning := make(chan struct{})
|
|
checkDone := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
tag := "class1"
|
|
// running request
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer release()
|
|
close(requestRunning)
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
|
|
// waiting request
|
|
eg.Go(func() error {
|
|
<-requestRunning
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.NotNil(t, release)
|
|
require.NoError(t, err)
|
|
defer release()
|
|
<-checkDone
|
|
return nil
|
|
})
|
|
|
|
// wait until second request will be waiting
|
|
for q.waitingCount() == 0 {
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
release, err := q.RequestArrival(ctx, tag)
|
|
require.Nil(t, release)
|
|
require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded)
|
|
|
|
close(checkDone)
|
|
require.NoError(t, eg.Wait())
|
|
}
|
|
|
|
func TestMClockParameterValidation(t *testing.T) {
|
|
_, err := NewMClock(0, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidRunLimit)
|
|
_, err = NewMClock(1, 0, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 1000)
|
|
require.NoError(t, err)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, -1.0)
|
|
require.NoError(t, err)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1},
|
|
}, 0)
|
|
require.NoError(t, err)
|
|
negativeValue := -1.0
|
|
zeroValue := float64(0)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, ReservedIOPS: &zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, ReservedIOPS: &negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, LimitIOPS: &zeroValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
_, err = NewMClock(1, 1, map[string]TagInfo{
|
|
"class1": {Share: 1.0, LimitIOPS: &negativeValue},
|
|
}, 1000)
|
|
require.ErrorIs(t, err, ErrInvalidTagInfo)
|
|
}
|
|
|
|
func (q *MClock) waitingCount() int {
|
|
q.mtx.Lock()
|
|
defer q.mtx.Unlock()
|
|
|
|
return q.sharesQueue.Len()
|
|
}
|
|
|
|
func TestMClockTimeBasedSchedule(t *testing.T) {
|
|
t.Parallel()
|
|
limit := 1.0 // 1 request per second allowed
|
|
cl := &noopClock{v: float64(1.5)}
|
|
q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{
|
|
"class1": {Share: 1, LimitIOPS: &limit},
|
|
}, 100)
|
|
require.NoError(t, err)
|
|
defer q.Close()
|
|
q.clock = cl
|
|
|
|
running := make(chan struct{})
|
|
checked := make(chan struct{})
|
|
eg, ctx := errgroup.WithContext(context.Background())
|
|
eg.Go(func() error {
|
|
release, err := q.RequestArrival(ctx, "class1")
|
|
require.NoError(t, err)
|
|
defer release()
|
|
close(running)
|
|
<-checked
|
|
return nil
|
|
})
|
|
|
|
<-running
|
|
// request must be scheduled at 2.0
|
|
_, _, err = q.pushRequest("class1")
|
|
require.NoError(t, err)
|
|
require.NotNil(t, cl.runAtValue)
|
|
require.Equal(t, cl.v+1.0/limit, *cl.runAtValue)
|
|
close(checked)
|
|
require.NoError(t, eg.Wait())
|
|
}
|