Compare commits
No commits in common. "master" and "fix/revert_stat" have entirely different histories.
master
...
fix/revert
5 changed files with 41 additions and 148 deletions
|
@ -1,3 +1,3 @@
|
|||
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers
|
||||
.* @fyrchik
|
||||
.forgejo/.* @potyarkin
|
||||
Makefile @potyarkin
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package scheduling
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -37,7 +36,10 @@ func (c *systemClock) now() float64 {
|
|||
}
|
||||
|
||||
func (c *systemClock) runAt(ts float64, f func()) {
|
||||
c.schedule <- scheduleInfo{ts: ts, f: f}
|
||||
select {
|
||||
case c.schedule <- scheduleInfo{ts: ts, f: f}:
|
||||
default: // timer fired, scheduleRequest will call runAt again
|
||||
}
|
||||
}
|
||||
|
||||
func (c *systemClock) close() {
|
||||
|
@ -51,30 +53,18 @@ func (c *systemClock) start() {
|
|||
defer c.wg.Done()
|
||||
t := time.NewTimer(0)
|
||||
<-t.C
|
||||
currentTs := math.MaxFloat64
|
||||
var currentTask func()
|
||||
var f func()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if currentTask != nil {
|
||||
c.wg.Add(1)
|
||||
f := currentTask
|
||||
go func() {
|
||||
defer c.wg.Done()
|
||||
f()
|
||||
}()
|
||||
currentTask = nil
|
||||
if f != nil {
|
||||
f()
|
||||
f = nil
|
||||
}
|
||||
currentTs = math.MaxFloat64
|
||||
case s, ok := <-c.schedule:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if s.ts >= currentTs {
|
||||
// current timer will fire earlier
|
||||
// so next scheduleRequest will push new schedule event
|
||||
continue
|
||||
}
|
||||
var d time.Duration
|
||||
now := c.now()
|
||||
if now < s.ts {
|
||||
|
@ -87,8 +77,7 @@ func (c *systemClock) start() {
|
|||
}
|
||||
}
|
||||
t.Reset(d)
|
||||
currentTask = s.f
|
||||
currentTs = s.ts
|
||||
f = s.f
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -22,7 +22,6 @@ var (
|
|||
ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
|
||||
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
|
||||
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
|
||||
ErrTagRequestsProhibited = errors.New("tag requests are prohibited")
|
||||
)
|
||||
|
||||
type request struct {
|
||||
|
@ -50,7 +49,6 @@ type TagInfo struct {
|
|||
ReservedIOPS *float64
|
||||
LimitIOPS *float64
|
||||
Share float64
|
||||
Prohibited bool
|
||||
}
|
||||
|
||||
// MClock is mClock scheduling algorithm implementation.
|
||||
|
@ -63,14 +61,15 @@ type MClock struct {
|
|||
idleTimeout float64
|
||||
tagInfo map[string]TagInfo
|
||||
|
||||
mtx sync.Mutex
|
||||
previous map[string]*request
|
||||
inProgress uint64
|
||||
reservationQueue *queue
|
||||
limitQueue *queue
|
||||
sharesQueue *queue
|
||||
readyQueue *queue
|
||||
closed bool
|
||||
mtx sync.Mutex
|
||||
previous map[string]*request
|
||||
inProgress uint64
|
||||
timeBasedScheduleTs float64
|
||||
reservationQueue *queue
|
||||
limitQueue *queue
|
||||
sharesQueue *queue
|
||||
readyQueue *queue
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewMClock creates new MClock scheduler instance with
|
||||
|
@ -93,10 +92,11 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
|
|||
idleTimeout: idleTimeout.Seconds(),
|
||||
tagInfo: tagInfo,
|
||||
|
||||
reservationQueue: &queue{},
|
||||
limitQueue: &queue{},
|
||||
sharesQueue: &queue{},
|
||||
readyQueue: &queue{},
|
||||
reservationQueue: &queue{},
|
||||
limitQueue: &queue{},
|
||||
sharesQueue: &queue{},
|
||||
readyQueue: &queue{},
|
||||
timeBasedScheduleTs: math.MaxFloat64,
|
||||
}
|
||||
|
||||
previous := make(map[string]*request)
|
||||
|
@ -139,15 +139,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e
|
|||
// No new requests for scheduling will be accepted after the closing.
|
||||
func (q *MClock) Close() {
|
||||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
q.closed = true
|
||||
q.clock.close()
|
||||
for q.limitQueue.Len() > 0 {
|
||||
item := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||
close(item.r.canceled)
|
||||
q.removeFromQueues(item.r)
|
||||
}
|
||||
q.mtx.Unlock()
|
||||
|
||||
q.clock.close()
|
||||
}
|
||||
|
||||
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
|
||||
|
@ -198,9 +198,6 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
|
|||
if !ok {
|
||||
return nil, nil, ErrMClockSchedulerUnknownTag
|
||||
}
|
||||
if tagInfo.Prohibited {
|
||||
return nil, nil, ErrTagRequestsProhibited
|
||||
}
|
||||
prev, ok := q.previous[tag]
|
||||
assert.Cond(ok, "undefined previous:", tag)
|
||||
|
||||
|
@ -266,10 +263,6 @@ func (q *MClock) scheduleRequest() {
|
|||
q.mtx.Lock()
|
||||
defer q.mtx.Unlock()
|
||||
|
||||
if q.closed {
|
||||
return
|
||||
}
|
||||
|
||||
q.scheduleRequestUnsafe()
|
||||
}
|
||||
|
||||
|
@ -291,27 +284,27 @@ func (q *MClock) scheduleRequestUnsafe() {
|
|||
|
||||
func (q *MClock) setNextScheduleTimer(now float64) {
|
||||
nextTs := math.MaxFloat64
|
||||
var hasNext bool
|
||||
if q.reservationQueue.Len() > 0 {
|
||||
nextTs = q.reservationQueue.items[0].ts()
|
||||
hasNext = true
|
||||
}
|
||||
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
|
||||
nextTs = q.limitQueue.items[0].ts()
|
||||
hasNext = true
|
||||
}
|
||||
if nextTs <= now {
|
||||
// should not happen as we always compare .ts() <= now
|
||||
return
|
||||
}
|
||||
if !hasNext {
|
||||
return
|
||||
|
||||
if q.timeBasedScheduleTs > nextTs {
|
||||
q.clock.runAt(nextTs, func() {
|
||||
q.scheduleRequest()
|
||||
})
|
||||
q.timeBasedScheduleTs = nextTs
|
||||
}
|
||||
q.clock.runAt(nextTs, q.scheduleRequest)
|
||||
}
|
||||
|
||||
func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 {
|
||||
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
|
||||
ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
|
||||
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
|
||||
}
|
||||
|
@ -354,7 +347,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
|
|||
}
|
||||
|
||||
func (q *MClock) scheduleByReservation(now float64) {
|
||||
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 {
|
||||
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
|
||||
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
|
||||
q.removeFromQueues(next.r)
|
||||
|
||||
|
|
|
@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) {
|
|||
reqCount = (reqCount / 2) * 2
|
||||
limit := 0.01 // 1 request in 100 seconds
|
||||
resevation := 100.0 // 100 RPS
|
||||
cl := &noopClock{v: float64(1.0)}
|
||||
cl := &noopClock{}
|
||||
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
|
||||
"class1": {Share: 2, LimitIOPS: &limit},
|
||||
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
|
||||
|
@ -237,18 +237,15 @@ func TestMClockReservationScheduling(t *testing.T) {
|
|||
|
||||
q.scheduleRequest()
|
||||
|
||||
count := 0
|
||||
for _, req := range requests {
|
||||
select {
|
||||
case <-req.scheduled:
|
||||
require.Equal(t, req.tag, "class2")
|
||||
count++
|
||||
require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
|
||||
default:
|
||||
}
|
||||
}
|
||||
require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled")
|
||||
|
||||
cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy
|
||||
cl.v = 1.00001 // 1s elapsed
|
||||
q.scheduleRequest()
|
||||
|
||||
var result []string
|
||||
|
@ -261,7 +258,7 @@ func TestMClockReservationScheduling(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
require.Equal(t, 200, len(result))
|
||||
require.Equal(t, 100, len(result))
|
||||
for _, res := range result {
|
||||
require.Equal(t, "class2", res)
|
||||
}
|
||||
|
@ -496,89 +493,3 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
|
|||
close(checked)
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestMClockLowLimit(t *testing.T) {
|
||||
t.Parallel()
|
||||
limit := 2.0
|
||||
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||
"class1": {Share: 50, LimitIOPS: &limit},
|
||||
}, 5*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
eg.SetLimit(5)
|
||||
eg.Go(func() error {
|
||||
for range 3 {
|
||||
release, err := q.RequestArrival(ctx, "class1")
|
||||
require.NoError(t, err)
|
||||
release()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, eg.Wait())
|
||||
}
|
||||
|
||||
func TestMClockLimitTotalTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
limit := 10.0 // 10 RPS -> 1 request per 100 ms
|
||||
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||
"class1": {Share: 50, LimitIOPS: &limit},
|
||||
}, 5*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
// 10 requests, each request runs for 500 ms,
|
||||
// but they should be scheduled as soon as possible,
|
||||
// so total duration must be less than 1 second
|
||||
eg, ctx := errgroup.WithContext(context.Background())
|
||||
startedAt := time.Now()
|
||||
for range 10 {
|
||||
eg.Go(func() error {
|
||||
release, err := q.RequestArrival(ctx, "class1")
|
||||
require.NoError(t, err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
release()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
require.True(t, time.Since(startedAt) <= 1*time.Second)
|
||||
|
||||
// 11 requests, limit = 10 RPS, so 10 requests should be
|
||||
// scheduled as soon as possible, but last request should be
|
||||
// scheduled at now + 1.0 s
|
||||
eg, ctx = errgroup.WithContext(context.Background())
|
||||
startedAt = time.Now()
|
||||
for range 11 {
|
||||
eg.Go(func() error {
|
||||
release, err := q.RequestArrival(ctx, "class1")
|
||||
require.NoError(t, err)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
release()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
require.NoError(t, eg.Wait())
|
||||
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
|
||||
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
|
||||
}
|
||||
|
||||
func TestMClockRestictTagRequests(t *testing.T) {
|
||||
t.Parallel()
|
||||
limit := 10.0
|
||||
q, err := NewMClock(100, 100, map[string]TagInfo{
|
||||
"class1": {Share: 50, LimitIOPS: &limit},
|
||||
"class2": {Share: 50, LimitIOPS: &limit, Prohibited: true},
|
||||
}, 5*time.Second)
|
||||
require.NoError(t, err)
|
||||
defer q.Close()
|
||||
|
||||
release, err := q.RequestArrival(context.Background(), "class1")
|
||||
require.NoError(t, err)
|
||||
release()
|
||||
|
||||
release, err = q.RequestArrival(context.Background(), "class2")
|
||||
require.ErrorIs(t, err, ErrTagRequestsProhibited)
|
||||
require.Nil(t, release)
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ const (
|
|||
ioTagHeader = "x-frostfs-io-tag"
|
||||
)
|
||||
|
||||
// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
|
||||
func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor {
|
||||
// NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
|
||||
func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue