forked from TrueCloudLab/frostfs-qos
Compare commits
3 commits
fix/mclock
...
master
Author | SHA1 | Date | |
---|---|---|---|
b5ed0b6eff | |||
6c6e5bf4de | |||
57d895c321 |
3 changed files with 79 additions and 7 deletions
|
@ -1,3 +1,3 @@
|
||||||
.* @fyrchik
|
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
|
||||||
.forgejo/.* @potyarkin
|
.forgejo/.* @potyarkin
|
||||||
Makefile @potyarkin
|
Makefile @potyarkin
|
||||||
|
|
|
@ -22,6 +22,7 @@ var (
|
||||||
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
||||||
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
||||||
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
||||||
|
ErrTagRequestsProhibited = errors.New("tag requests are prohibited")
|
||||||
)
|
)
|
||||||
|
|
||||||
type request struct {
|
type request struct {
|
||||||
|
@ -49,6 +50,7 @@ type TagInfo struct {
|
||||||
ReservedIOPS *float64
|
ReservedIOPS *float64
|
||||||
LimitIOPS *float64
|
LimitIOPS *float64
|
||||||
Share float64
|
Share float64
|
||||||
|
Prohibited bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// MClock is mClock scheduling algorithm implementation.
|
// MClock is mClock scheduling algorithm implementation.
|
||||||
|
@ -196,6 +198,9 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, nil, ErrMClockSchedulerUnknownTag
|
return nil, nil, ErrMClockSchedulerUnknownTag
|
||||||
}
|
}
|
||||||
|
if tagInfo.Prohibited {
|
||||||
|
return nil, nil, ErrTagRequestsProhibited
|
||||||
|
}
|
||||||
prev, ok := q.previous[tag]
|
prev, ok := q.previous[tag]
|
||||||
assert.Cond(ok, "undefined previous:", tag)
|
assert.Cond(ok, "undefined previous:", tag)
|
||||||
|
|
||||||
|
@ -306,7 +311,7 @@ func (q *MClock) setNextScheduleTimer(now float64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 {
|
||||||
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||||
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
||||||
}
|
}
|
||||||
|
@ -349,7 +354,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *MClock) scheduleByReservation(now float64) {
|
func (q *MClock) scheduleByReservation(now float64) {
|
||||||
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 {
|
||||||
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||||
q.removeFromQueues(next.r)
|
q.removeFromQueues(next.r)
|
||||||
|
|
||||||
|
|
|
@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
reqCount = (reqCount / 2) * 2
|
reqCount = (reqCount / 2) * 2
|
||||||
limit := 0.01 // 1 request in 100 seconds
|
limit := 0.01 // 1 request in 100 seconds
|
||||||
resevation := 100.0 // 100 RPS
|
resevation := 100.0 // 100 RPS
|
||||||
cl := &noopClock{}
|
cl := &noopClock{v: float64(1.0)}
|
||||||
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
||||||
"class1": {Share: 2, LimitIOPS: &limit},
|
"class1": {Share: 2, LimitIOPS: &limit},
|
||||||
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
||||||
|
@ -237,15 +237,18 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
|
|
||||||
q.scheduleRequest()
|
q.scheduleRequest()
|
||||||
|
|
||||||
|
count := 0
|
||||||
for _, req := range requests {
|
for _, req := range requests {
|
||||||
select {
|
select {
|
||||||
case <-req.scheduled:
|
case <-req.scheduled:
|
||||||
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
require.Equal(t, req.tag, "class2")
|
||||||
|
count++
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled")
|
||||||
|
|
||||||
cl.v = 1.00001 // 1s elapsed
|
cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy
|
||||||
q.scheduleRequest()
|
q.scheduleRequest()
|
||||||
|
|
||||||
var result []string
|
var result []string
|
||||||
|
@ -258,7 +261,7 @@ func TestMClockReservationScheduling(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
require.Equal(t, 100, len(result))
|
require.Equal(t, 200, len(result))
|
||||||
for _, res := range result {
|
for _, res := range result {
|
||||||
require.Equal(t, "class2", res)
|
require.Equal(t, "class2", res)
|
||||||
}
|
}
|
||||||
|
@ -515,3 +518,67 @@ func TestMClockLowLimit(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, eg.Wait())
|
require.NoError(t, eg.Wait())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMClockLimitTotalTime(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
limit := 10.0 // 10 RPS -> 1 request per 100 ms
|
||||||
|
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||||
|
"class1": {Share: 50, LimitIOPS: &limit},
|
||||||
|
}, 5*time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
// 10 requests, each request runs for 500 ms,
|
||||||
|
// but they should be scheduled as soon as possible,
|
||||||
|
// so total duration must be less than 1 second
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
startedAt := time.Now()
|
||||||
|
for range 10 {
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
release()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
require.True(t, time.Since(startedAt) <= 1*time.Second)
|
||||||
|
|
||||||
|
// 11 requests, limit = 10 RPS, so 10 requests should be
|
||||||
|
// scheduled as soon as possible, but last request should be
|
||||||
|
// scheduled at now + 1.0 s
|
||||||
|
eg, ctx = errgroup.WithContext(context.Background())
|
||||||
|
startedAt = time.Now()
|
||||||
|
for range 11 {
|
||||||
|
eg.Go(func() error {
|
||||||
|
release, err := q.RequestArrival(ctx, "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
release()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
|
||||||
|
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMClockRestictTagRequests(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
limit := 10.0
|
||||||
|
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||||
|
"class1": {Share: 50, LimitIOPS: &limit},
|
||||||
|
"class2": {Share: 50, LimitIOPS: &limit, Prohibited: true},
|
||||||
|
}, 5*time.Second)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer q.Close()
|
||||||
|
|
||||||
|
release, err := q.RequestArrival(context.Background(), "class1")
|
||||||
|
require.NoError(t, err)
|
||||||
|
release()
|
||||||
|
|
||||||
|
release, err = q.RequestArrival(context.Background(), "class2")
|
||||||
|
require.ErrorIs(t, err, ErrTagRequestsProhibited)
|
||||||
|
require.Nil(t, release)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue