Compare commits

..

No commits in common. "master" and "fix/revert_stat" have entirely different histories.

5 changed files with 41 additions and 148 deletions

View file

@ -1,3 +1,3 @@
.* @TrueCloudLab/storage-core-committers @TrueCloudLab/storage-core-developers .* @fyrchik
.forgejo/.* @potyarkin .forgejo/.* @potyarkin
Makefile @potyarkin Makefile @potyarkin

View file

@ -1,7 +1,6 @@
package scheduling package scheduling
import ( import (
"math"
"sync" "sync"
"time" "time"
) )
@ -37,7 +36,10 @@ func (c *systemClock) now() float64 {
} }
func (c *systemClock) runAt(ts float64, f func()) { func (c *systemClock) runAt(ts float64, f func()) {
c.schedule <- scheduleInfo{ts: ts, f: f} select {
case c.schedule <- scheduleInfo{ts: ts, f: f}:
default: // timer fired, scheduleRequest will call runAt again
}
} }
func (c *systemClock) close() { func (c *systemClock) close() {
@ -51,30 +53,18 @@ func (c *systemClock) start() {
defer c.wg.Done() defer c.wg.Done()
t := time.NewTimer(0) t := time.NewTimer(0)
<-t.C <-t.C
currentTs := math.MaxFloat64 var f func()
var currentTask func()
for { for {
select { select {
case <-t.C: case <-t.C:
if currentTask != nil { if f != nil {
c.wg.Add(1)
f := currentTask
go func() {
defer c.wg.Done()
f() f()
}() f = nil
currentTask = nil
} }
currentTs = math.MaxFloat64
case s, ok := <-c.schedule: case s, ok := <-c.schedule:
if !ok { if !ok {
return return
} }
if s.ts >= currentTs {
// current timer will fire earlier
// so next scheduleRequest will push new schedule event
continue
}
var d time.Duration var d time.Duration
now := c.now() now := c.now()
if now < s.ts { if now < s.ts {
@ -87,8 +77,7 @@ func (c *systemClock) start() {
} }
} }
t.Reset(d) t.Reset(d)
currentTask = s.f f = s.f
currentTs = s.ts
} }
} }
}() }()

View file

@ -22,7 +22,6 @@ var (
ErrMClockSchedulerUnknownTag = errors.New("unknown tag") ErrMClockSchedulerUnknownTag = errors.New("unknown tag")
ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero") ErrInvalidTagInfo = errors.New("invalid tag info: shares, limit and reservation must be greater than zero")
ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero") ErrInvalidRunLimit = errors.New("invalid run limit: must be greater than zero")
ErrTagRequestsProhibited = errors.New("tag requests are prohibited")
) )
type request struct { type request struct {
@ -50,7 +49,6 @@ type TagInfo struct {
ReservedIOPS *float64 ReservedIOPS *float64
LimitIOPS *float64 LimitIOPS *float64
Share float64 Share float64
Prohibited bool
} }
// MClock is mClock scheduling algorithm implementation. // MClock is mClock scheduling algorithm implementation.
@ -66,6 +64,7 @@ type MClock struct {
mtx sync.Mutex mtx sync.Mutex
previous map[string]*request previous map[string]*request
inProgress uint64 inProgress uint64
timeBasedScheduleTs float64
reservationQueue *queue reservationQueue *queue
limitQueue *queue limitQueue *queue
sharesQueue *queue sharesQueue *queue
@ -97,6 +96,7 @@ func NewMClock(runLimit, waitLimit uint64, tagInfo map[string]TagInfo, idleTimeo
limitQueue: &queue{}, limitQueue: &queue{},
sharesQueue: &queue{}, sharesQueue: &queue{},
readyQueue: &queue{}, readyQueue: &queue{},
timeBasedScheduleTs: math.MaxFloat64,
} }
previous := make(map[string]*request) previous := make(map[string]*request)
@ -139,15 +139,15 @@ func (q *MClock) RequestArrival(ctx context.Context, tag string) (ReleaseFunc, e
// No new requests for scheduling will be accepted after the closing. // No new requests for scheduling will be accepted after the closing.
func (q *MClock) Close() { func (q *MClock) Close() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock()
q.closed = true q.closed = true
q.clock.close()
for q.limitQueue.Len() > 0 { for q.limitQueue.Len() > 0 {
item := heap.Pop(q.limitQueue).(*limitMQueueItem) item := heap.Pop(q.limitQueue).(*limitMQueueItem)
close(item.r.canceled) close(item.r.canceled)
q.removeFromQueues(item.r) q.removeFromQueues(item.r)
} }
q.mtx.Unlock()
q.clock.close()
} }
func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error { func validateParams(runLimit uint64, tagInfo map[string]TagInfo) error {
@ -198,9 +198,6 @@ func (q *MClock) pushRequest(tag string) (*request, ReleaseFunc, error) {
if !ok { if !ok {
return nil, nil, ErrMClockSchedulerUnknownTag return nil, nil, ErrMClockSchedulerUnknownTag
} }
if tagInfo.Prohibited {
return nil, nil, ErrTagRequestsProhibited
}
prev, ok := q.previous[tag] prev, ok := q.previous[tag]
assert.Cond(ok, "undefined previous:", tag) assert.Cond(ok, "undefined previous:", tag)
@ -266,10 +263,6 @@ func (q *MClock) scheduleRequest() {
q.mtx.Lock() q.mtx.Lock()
defer q.mtx.Unlock() defer q.mtx.Unlock()
if q.closed {
return
}
q.scheduleRequestUnsafe() q.scheduleRequestUnsafe()
} }
@ -291,27 +284,27 @@ func (q *MClock) scheduleRequestUnsafe() {
func (q *MClock) setNextScheduleTimer(now float64) { func (q *MClock) setNextScheduleTimer(now float64) {
nextTs := math.MaxFloat64 nextTs := math.MaxFloat64
var hasNext bool
if q.reservationQueue.Len() > 0 { if q.reservationQueue.Len() > 0 {
nextTs = q.reservationQueue.items[0].ts() nextTs = q.reservationQueue.items[0].ts()
hasNext = true
} }
if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs { if q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < nextTs {
nextTs = q.limitQueue.items[0].ts() nextTs = q.limitQueue.items[0].ts()
hasNext = true
} }
if nextTs <= now { if nextTs <= now {
// should not happen as we always compare .ts() <= now // should not happen as we always compare .ts() <= now
return return
} }
if !hasNext {
return if q.timeBasedScheduleTs > nextTs {
q.clock.runAt(nextTs, func() {
q.scheduleRequest()
})
q.timeBasedScheduleTs = nextTs
} }
q.clock.runAt(nextTs, q.scheduleRequest)
} }
func (q *MClock) scheduleByLimitAndWeight(now float64) { func (q *MClock) scheduleByLimitAndWeight(now float64) {
for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() < now+1.0 { for q.limitQueue.Len() > 0 && q.limitQueue.items[0].ts() <= now {
ready := heap.Pop(q.limitQueue).(*limitMQueueItem) ready := heap.Pop(q.limitQueue).(*limitMQueueItem)
heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r}) heap.Push(q.readyQueue, &readyMQueueItem{r: ready.r})
} }
@ -354,7 +347,7 @@ func (q *MClock) scheduleByLimitAndWeight(now float64) {
} }
func (q *MClock) scheduleByReservation(now float64) { func (q *MClock) scheduleByReservation(now float64) {
for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() < now+1.0 { for q.inProgress < q.runLimit && q.reservationQueue.Len() > 0 && q.reservationQueue.items[0].ts() <= now {
next := heap.Pop(q.reservationQueue).(*reservationMQueueItem) next := heap.Pop(q.reservationQueue).(*reservationMQueueItem)
q.removeFromQueues(next.r) q.removeFromQueues(next.r)

View file

@ -210,7 +210,7 @@ func TestMClockReservationScheduling(t *testing.T) {
reqCount = (reqCount / 2) * 2 reqCount = (reqCount / 2) * 2
limit := 0.01 // 1 request in 100 seconds limit := 0.01 // 1 request in 100 seconds
resevation := 100.0 // 100 RPS resevation := 100.0 // 100 RPS
cl := &noopClock{v: float64(1.0)} cl := &noopClock{}
q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{ q, err := NewMClock(uint64(reqCount), math.MaxUint64, map[string]TagInfo{
"class1": {Share: 2, LimitIOPS: &limit}, "class1": {Share: 2, LimitIOPS: &limit},
"class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation}, "class2": {Share: 1, LimitIOPS: &limit, ReservedIOPS: &resevation},
@ -237,18 +237,15 @@ func TestMClockReservationScheduling(t *testing.T) {
q.scheduleRequest() q.scheduleRequest()
count := 0
for _, req := range requests { for _, req := range requests {
select { select {
case <-req.scheduled: case <-req.scheduled:
require.Equal(t, req.tag, "class2") require.Fail(t, "no request must be scheduled because of time is 0.0 but limit values are greater than 0.0")
count++
default: default:
} }
} }
require.Equal(t, 100, count, "class2 has 100 requests reserved, so only 100 requests must be scheduled")
cl.v = 1.9999 // 1s elapsed - 0.999 to take into account float64 accuracy cl.v = 1.00001 // 1s elapsed
q.scheduleRequest() q.scheduleRequest()
var result []string var result []string
@ -261,7 +258,7 @@ func TestMClockReservationScheduling(t *testing.T) {
} }
} }
require.Equal(t, 200, len(result)) require.Equal(t, 100, len(result))
for _, res := range result { for _, res := range result {
require.Equal(t, "class2", res) require.Equal(t, "class2", res)
} }
@ -496,89 +493,3 @@ func TestMClockTimeBasedSchedule(t *testing.T) {
close(checked) close(checked)
require.NoError(t, eg.Wait()) require.NoError(t, eg.Wait())
} }
func TestMClockLowLimit(t *testing.T) {
t.Parallel()
limit := 2.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(5)
eg.Go(func() error {
for range 3 {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
release()
}
return nil
})
require.NoError(t, eg.Wait())
}
func TestMClockLimitTotalTime(t *testing.T) {
t.Parallel()
limit := 10.0 // 10 RPS -> 1 request per 100 ms
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
// 10 requests, each request runs for 500 ms,
// but they should be scheduled as soon as possible,
// so total duration must be less than 1 second
eg, ctx := errgroup.WithContext(context.Background())
startedAt := time.Now()
for range 10 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) <= 1*time.Second)
// 11 requests, limit = 10 RPS, so 10 requests should be
// scheduled as soon as possible, but last request should be
// scheduled at now + 1.0 s
eg, ctx = errgroup.WithContext(context.Background())
startedAt = time.Now()
for range 11 {
eg.Go(func() error {
release, err := q.RequestArrival(ctx, "class1")
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
release()
return nil
})
}
require.NoError(t, eg.Wait())
require.True(t, time.Since(startedAt) >= 1500*time.Millisecond)
require.True(t, time.Since(startedAt) <= 1600*time.Millisecond) // 100 ms offset to complete all requests
}
func TestMClockRestictTagRequests(t *testing.T) {
t.Parallel()
limit := 10.0
q, err := NewMClock(100, 100, map[string]TagInfo{
"class1": {Share: 50, LimitIOPS: &limit},
"class2": {Share: 50, LimitIOPS: &limit, Prohibited: true},
}, 5*time.Second)
require.NoError(t, err)
defer q.Close()
release, err := q.RequestArrival(context.Background(), "class1")
require.NoError(t, err)
release()
release, err = q.RequestArrival(context.Background(), "class2")
require.ErrorIs(t, err, ErrTagRequestsProhibited)
require.Nil(t, release)
}

View file

@ -11,8 +11,8 @@ const (
ioTagHeader = "x-frostfs-io-tag" ioTagHeader = "x-frostfs-io-tag"
) )
// NewUnaryClientInterceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata. // NewUnaryClientInteceptor creates new gRPC unary interceptor to set an IO tag to gRPC metadata.
func NewUnaryClientInterceptor() grpc.UnaryClientInterceptor { func NewUnaryClientInteceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...) return invoker(setIOTagToGRPCMetadata(ctx), method, req, reply, cc, opts...)
} }