package scheduling import ( "context" "math" "sync" "testing" "time" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) func TestMClockSharesScheduling(t *testing.T) { t.Parallel() reqCount := 1000 reqCount = (reqCount / 2) * 2 q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2}, "class2": {Share: 1}, }, 100) require.NoError(t, err) q.clock = &noopClock{} var releases []ReleaseFunc var requests []*request tag := "class1" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } tag = "class2" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } var result []string var wg sync.WaitGroup for i := 0; i < reqCount; i++ { wg.Add(1) go func() { defer wg.Done() <-requests[i].scheduled result = append(result, requests[i].tag) releases[i]() }() } wg.Wait() // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., // because the ratio is 2 to 1. // However, there may be deviations due to rounding and sorting. result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail var class1Count int var class2Count int var class2MaxSeq int for _, res := range result { switch res { case "class1": class1Count++ class2MaxSeq = 0 case "class2": class2Count++ class2MaxSeq++ require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row default: require.Fail(t, "unknown tag") } } require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) } var _ clock = &noopClock{} type noopClock struct { v float64 runAtValue *float64 } func (n *noopClock) now() float64 { return n.v } func (n *noopClock) runAt(ts float64, f func()) { n.runAtValue = &ts } func (n *noopClock) close() {} func TestMClockRequestCancel(t *testing.T) { t.Parallel() q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2}, "class2": {Share: 1}, }, 100) require.NoError(t, err) q.clock = &noopClock{} release1, err := q.RequestArrival(context.Background(), "class1") require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() release2, err := q.RequestArrival(ctx, "class1") require.Nil(t, release2) require.ErrorIs(t, err, context.DeadlineExceeded) require.Equal(t, 0, q.readyQueue.Len()) require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) release1() } func TestMClockLimitScheduling(t *testing.T) { t.Parallel() reqCount := 100 reqCount = (reqCount / 2) * 2 limit := 1.0 cl := &noopClock{} q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2, LimitIOPS: &limit}, "class2": {Share: 1, LimitIOPS: &limit}, }, 100) require.NoError(t, err) q.clock = cl var releases []ReleaseFunc var requests []*request tag := "class1" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } tag = "class2" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } q.scheduleRequest() for _, req := range requests { select { case <-req.scheduled: require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") default: } } cl.v = math.MaxFloat64 var result []string var wg sync.WaitGroup for i := 0; i < reqCount; i++ { wg.Add(1) go func() { defer wg.Done() <-requests[i].scheduled result = append(result, requests[i].tag) releases[i]() }() } q.scheduleRequest() wg.Wait() // Requests must be scheduled as class1->class1->class2->class1->class1->class2..., // because the ratio is 2 to 1. // However, there may be deviations due to rounding and sorting. result = result[:reqCount/2+(reqCount/2)/2] // last reqCount/4 requests is class2 tail var class1Count int var class2Count int var class2MaxSeq int for _, res := range result { switch res { case "class1": class1Count++ class2MaxSeq = 0 case "class2": class2Count++ class2MaxSeq++ require.Less(t, class2MaxSeq, 3) // not expected to have more than 2 class2 requests scheduled in row default: require.Fail(t, "unknown tag") } } require.True(t, (class1Count*100)/(class1Count+class2Count) == 66) require.Equal(t, 0, q.readyQueue.Len()) require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) } func TestMClockReservationScheduling(t *testing.T) { t.Parallel() reqCount := 1000 reqCount = (reqCount / 2) * 2 limit := 0.01 // 1 request in 100 seconds resevation := 100.0 // 100 RPS cl := &noopClock{} q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ "class1": {Share: 2, LimitIOPS: &limit}, "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, }, 100) require.NoError(t, err) q.clock = cl var releases []ReleaseFunc var requests []*request tag := "class1" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } tag = "class2" for i := 0; i < reqCount/2; i++ { req, release, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) releases = append(releases, release) } q.scheduleRequest() for _, req := range requests { select { case <-req.scheduled: require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0") default: } } cl.v = 1.00001 // 1s elapsed q.scheduleRequest() var result []string for i, req := range requests { select { case <-req.scheduled: result = append(result, requests[i].tag) releases[i]() default: } } require.Equal(t, 100, len(result)) for _, res := range result { require.Equal(t, "class2", res) } cl.v = math.MaxFloat64 q.scheduleRequest() require.Equal(t, 0, q.readyQueue.Len()) require.Equal(t, 0, q.sharesQueue.Len()) require.Equal(t, 0, q.limitQueue.Len()) require.Equal(t, 0, q.reservationQueue.Len()) } func TestMClockIdleTag(t *testing.T) { t.Parallel() reqCount := 100 idleTimeout := 2 * time.Second cl := &noopClock{} q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 1}, "class2": {Share: 1}, }, idleTimeout) require.NoError(t, err) q.clock = cl var requests []*request tag := "class1" for i := 0; i < reqCount/2; i++ { cl.v += idleTimeout.Seconds() / 2 req, _, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) } // class1 requests have shares [1.0; 2.0; 3.0; ... ] cl.v += 2 * idleTimeout.Seconds() tag = "class2" req, _, err := q.pushRequest(tag) require.NoError(t, err) requests = append(requests, req) // class2 must be defined as idle, so all shares tags must be adjusted. for _, req := range requests { select { case <-req.scheduled: default: require.True(t, req.shares >= cl.v) } } } func TestMClockClose(t *testing.T) { t.Parallel() q, err := NewMClock(1, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 1}, }, 1000) require.NoError(t, err) q.clock = &noopClock{} requestRunning := make(chan struct{}) checkDone := make(chan struct{}) eg, ctx := errgroup.WithContext(context.Background()) tag := "class1" eg.Go(func() error { release, err := q.RequestArrival(ctx, tag) if err != nil { return err } defer release() close(requestRunning) <-checkDone return nil }) <-requestRunning eg.Go(func() error { release, err := q.RequestArrival(ctx, tag) require.Nil(t, release) require.ErrorIs(t, err, ErrMClockSchedulerClosed) return nil }) // wait until second request will be blocked on wait for q.waitingCount() == 0 { time.Sleep(1 * time.Second) } q.Close() release, err := q.RequestArrival(context.Background(), tag) require.Nil(t, release) require.ErrorIs(t, err, ErrMClockSchedulerClosed) close(checkDone) require.NoError(t, eg.Wait()) } func TestMClockWaitLimit(t *testing.T) { t.Parallel() q, err := NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1}, }, 1000) require.NoError(t, err) q.clock = &noopClock{} defer q.Close() requestRunning := make(chan struct{}) checkDone := make(chan struct{}) eg, ctx := errgroup.WithContext(context.Background()) tag := "class1" // running request eg.Go(func() error { release, err := q.RequestArrival(ctx, tag) if err != nil { return err } defer release() close(requestRunning) <-checkDone return nil }) // waiting request eg.Go(func() error { <-requestRunning release, err := q.RequestArrival(ctx, tag) require.NotNil(t, release) require.NoError(t, err) defer release() <-checkDone return nil }) // wait until second request will be waiting for q.waitingCount() == 0 { time.Sleep(1 * time.Second) } release, err := q.RequestArrival(ctx, tag) require.Nil(t, release) require.ErrorIs(t, err, ErrMClockSchedulerRequestLimitExceeded) close(checkDone) require.NoError(t, eg.Wait()) } func TestMClockParameterValidation(t *testing.T) { _, err := NewMClock(0, 1, map[string]TagInfo{ "class1": {Share: 1}, }, 1000) require.ErrorIs(t, err, ErrInvalidRunLimit) _, err = NewMClock(1, 0, map[string]TagInfo{ "class1": {Share: 1}, }, 1000) require.NoError(t, err) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1}, }, -1.0) require.NoError(t, err) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1}, }, 0) require.NoError(t, err) negativeValue := -1.0 zeroValue := float64(0) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: negativeValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: zeroValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1.0, ReservedIOPS: &zeroValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1.0, ReservedIOPS: &negativeValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1.0, LimitIOPS: &zeroValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) _, err = NewMClock(1, 1, map[string]TagInfo{ "class1": {Share: 1.0, LimitIOPS: &negativeValue}, }, 1000) require.ErrorIs(t, err, ErrInvalidTagInfo) } func (q *MClock) waitingCount() int { q.mtx.Lock() defer q.mtx.Unlock() return q.sharesQueue.Len() } func TestMClockTimeBasedSchedule(t *testing.T) { t.Parallel() limit := 1.0 // 1 request per second allowed cl := &noopClock{v: float64(1.5)} q, err := NewMClock(100, math.MaxUint64, map[string]TagInfo{ "class1": {Share: 1, LimitIOPS: &limit}, }, 100) require.NoError(t, err) defer q.Close() q.clock = cl running := make(chan struct{}) checked := make(chan struct{}) eg, ctx := errgroup.WithContext(context.Background()) eg.Go(func() error { release, err := q.RequestArrival(ctx, "class1") require.NoError(t, err) defer release() close(running) <-checked return nil }) <-running // request must be scheduled at 2.0 _, _, err = q.pushRequest("class1") require.NoError(t, err) require.NotNil(t, cl.runAtValue) require.Equal(t, cl.v+1.0/limit, *cl.runAtValue) close(checked) require.NoError(t, eg.Wait()) }